| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package authdb | 5 package authdb |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "io/ioutil" | 9 "io/ioutil" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 | 12 |
| 13 "github.com/julienschmidt/httprouter" | |
| 14 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
| 15 "google.golang.org/appengine" | 14 "google.golang.org/appengine" |
| 16 | 15 |
| 17 "github.com/luci/gae/service/info" | 16 "github.com/luci/gae/service/info" |
| 18 | 17 |
| 19 "github.com/luci/luci-go/appengine/gaeauth/client" | 18 "github.com/luci/luci-go/appengine/gaeauth/client" |
| 20 "github.com/luci/luci-go/common/errors" | 19 "github.com/luci/luci-go/common/errors" |
| 21 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 22 "github.com/luci/luci-go/server/auth/service" | 21 "github.com/luci/luci-go/server/auth/service" |
| 23 » "github.com/luci/luci-go/server/middleware" | 22 » "github.com/luci/luci-go/server/router" |
| 24 ) | 23 ) |
| 25 | 24 |
| 26 const ( | 25 const ( |
| 27 pubSubPullURLPath = "/auth/pubsub/authdb:pull" // dev server only | 26 pubSubPullURLPath = "/auth/pubsub/authdb:pull" // dev server only |
| 28 pubSubPushURLPath = "/auth/pubsub/authdb:push" | 27 pubSubPushURLPath = "/auth/pubsub/authdb:push" |
| 29 ) | 28 ) |
| 30 | 29 |
| 31 // InstallHandlers installs PubSub related HTTP handlers. | 30 // InstallHandlers installs PubSub related HTTP handlers. |
| 32 func InstallHandlers(r *httprouter.Router, base middleware.Base) { | 31 func InstallHandlers(r *router.Router, handlers []router.Handler) { |
| 33 if appengine.IsDevAppServer() { | 32 if appengine.IsDevAppServer() { |
| 34 » » r.GET(pubSubPullURLPath, base(pubSubPull)) | 33 » » r.GET(pubSubPullURLPath, append(handlers, pubSubPull)...) |
| 35 } | 34 } |
| 36 » r.POST(pubSubPushURLPath, base(pubSubPush)) | 35 » r.POST(pubSubPushURLPath, append(handlers, pubSubPush)...) |
| 37 } | 36 } |
| 38 | 37 |
| 39 // authenticatePubSub injects into a context a transport that authenticates | 38 // authenticatePubSub injects into a context a transport that authenticates |
| 40 // calls with OAuth2 token with PubSub API scope needed for PubSub API calls. | 39 // calls with OAuth2 token with PubSub API scope needed for PubSub API calls. |
| 41 func authenticatePubSub(c context.Context) context.Context { | 40 func authenticatePubSub(c context.Context) context.Context { |
| 42 scopes := []string{ | 41 scopes := []string{ |
| 43 "https://www.googleapis.com/auth/userinfo.email", | 42 "https://www.googleapis.com/auth/userinfo.email", |
| 44 "https://www.googleapis.com/auth/pubsub", | 43 "https://www.googleapis.com/auth/pubsub", |
| 45 } | 44 } |
| 46 return client.UseServiceAccountTransport(c, scopes, nil) | 45 return client.UseServiceAccountTransport(c, scopes, nil) |
| (...skipping 28 matching lines...) Expand all Loading... |
| 75 panic(err) | 74 panic(err) |
| 76 } | 75 } |
| 77 return fmt.Sprintf("projects/%s/subscriptions/%s+%s", gaeInfo.AppID(), s
ubIDPrefix, serviceURL.Host) | 76 return fmt.Sprintf("projects/%s/subscriptions/%s+%s", gaeInfo.AppID(), s
ubIDPrefix, serviceURL.Host) |
| 78 } | 77 } |
| 79 | 78 |
| 80 // pubSubPull is HTTP handler that pulls PubSub messages from AuthDB change | 79 // pubSubPull is HTTP handler that pulls PubSub messages from AuthDB change |
| 81 // notification topic. | 80 // notification topic. |
| 82 // | 81 // |
| 83 // Used only on dev server for manual testing. Prod services use push-based | 82 // Used only on dev server for manual testing. Prod services use push-based |
| 84 // delivery. | 83 // delivery. |
| 85 func pubSubPull(c context.Context, rw http.ResponseWriter, r *http.Request, p ht
tprouter.Params) { | 84 func pubSubPull(ctx *router.Context) { |
| 86 if !appengine.IsDevAppServer() { | 85 if !appengine.IsDevAppServer() { |
| 87 » » replyError(c, rw, errors.New("not a dev server")) | 86 » » replyError(ctx.Context, ctx.Writer, errors.New("not a dev server
")) |
| 87 » » ctx.Abort() |
| 88 return | 88 return |
| 89 } | 89 } |
| 90 » processPubSubRequest(c, rw, r, func(c context.Context, srv authService,
serviceURL string) (*service.Notification, error) { | 90 » processPubSubRequest(ctx, func(c context.Context, srv authService, servi
ceURL string) (*service.Notification, error) { |
| 91 return srv.PullPubSub(c, subscriptionName(c, serviceURL)) | 91 return srv.PullPubSub(c, subscriptionName(c, serviceURL)) |
| 92 }) | 92 }) |
| 93 } | 93 } |
| 94 | 94 |
| 95 // pubSubPush is HTTP handler that processes incoming PubSub push notifications. | 95 // pubSubPush is HTTP handler that processes incoming PubSub push notifications. |
| 96 // | 96 // |
| 97 // It uses the signature inside PubSub message body for authentication. Skips | 97 // It uses the signature inside PubSub message body for authentication. Skips |
| 98 // messages not signed by currently configured auth service. | 98 // messages not signed by currently configured auth service. |
| 99 func pubSubPush(c context.Context, rw http.ResponseWriter, r *http.Request, p ht
tprouter.Params) { | 99 func pubSubPush(ctx *router.Context) { |
| 100 » processPubSubRequest(c, rw, r, func(c context.Context, srv authService,
serviceURL string) (*service.Notification, error) { | 100 » processPubSubRequest(ctx, func(c context.Context, srv authService, servi
ceURL string) (*service.Notification, error) { |
| 101 » » body, err := ioutil.ReadAll(r.Body) | 101 » » body, err := ioutil.ReadAll(ctx.Request.Body) |
| 102 if err != nil { | 102 if err != nil { |
| 103 return nil, err | 103 return nil, err |
| 104 } | 104 } |
| 105 return srv.ProcessPubSubPush(c, body) | 105 return srv.ProcessPubSubPush(c, body) |
| 106 }) | 106 }) |
| 107 } | 107 } |
| 108 | 108 |
| 109 type notifcationGetter func(context.Context, authService, string) (*service.Noti
fication, error) | 109 type notifcationGetter func(context.Context, authService, string) (*service.Noti
fication, error) |
| 110 | 110 |
| 111 // processPubSubRequest is common wrapper for pubSubPull and pubSubPush. | 111 // processPubSubRequest is common wrapper for pubSubPull and pubSubPush. |
| 112 // | 112 // |
| 113 // It implements most logic of notification handling. Calls supplied callback | 113 // It implements most logic of notification handling. Calls supplied callback |
| 114 // to actually get service.Notification, since this part is different from Pull | 114 // to actually get service.Notification, since this part is different from Pull |
| 115 // and Push subscriptions. | 115 // and Push subscriptions. |
| 116 func processPubSubRequest(c context.Context, rw http.ResponseWriter, r *http.Req
uest, callback notifcationGetter) { | 116 func processPubSubRequest(c *router.Context, callback notifcationGetter) { |
| 117 » c = defaultNS(c) | 117 » c.Context = defaultNS(c.Context) |
| 118 » c = authenticatePubSub(c) | 118 » c.Context = authenticatePubSub(c.Context) |
| 119 » info, err := GetLatestSnapshotInfo(c) | 119 » info, err := GetLatestSnapshotInfo(c.Context) |
| 120 if err != nil { | 120 if err != nil { |
| 121 » » replyError(c, rw, err) | 121 » » replyError(c.Context, c.Writer, err) |
| 122 » » c.Abort() |
| 122 return | 123 return |
| 123 } | 124 } |
| 124 if info == nil { | 125 if info == nil { |
| 125 // Return HTTP 200 to avoid a redelivery. | 126 // Return HTTP 200 to avoid a redelivery. |
| 126 » » replyOK(c, rw, "Auth Service URL is not configured, skipping the
message") | 127 » » replyOK(c.Context, c.Writer, "Auth Service URL is not configured
, skipping the message") |
| 127 return | 128 return |
| 128 } | 129 } |
| 129 » srv := getAuthService(c, info.AuthServiceURL) | 130 » srv := getAuthService(c.Context, info.AuthServiceURL) |
| 130 | 131 |
| 131 » notify, err := callback(c, srv, info.AuthServiceURL) | 132 » notify, err := callback(c.Context, srv, info.AuthServiceURL) |
| 132 if err != nil { | 133 if err != nil { |
| 133 » » replyError(c, rw, err) | 134 » » replyError(c.Context, c.Writer, err) |
| 135 » » c.Abort() |
| 134 return | 136 return |
| 135 } | 137 } |
| 136 | 138 |
| 137 // notify may be nil if PubSub messages didn't pass authentication. | 139 // notify may be nil if PubSub messages didn't pass authentication. |
| 138 if notify == nil { | 140 if notify == nil { |
| 139 » » replyOK(c, rw, "No new valid AuthDB change notifications") | 141 » » replyOK(c.Context, c.Writer, "No new valid AuthDB change notific
ations") |
| 140 return | 142 return |
| 141 } | 143 } |
| 142 | 144 |
| 143 // Don't bother processing late messages (ack them though). | 145 // Don't bother processing late messages (ack them though). |
| 144 latest := info | 146 latest := info |
| 145 if notify.Revision > info.Rev { | 147 if notify.Revision > info.Rev { |
| 146 var err error | 148 var err error |
| 147 » » if latest, err = syncAuthDB(c); err != nil { | 149 » » if latest, err = syncAuthDB(c.Context); err != nil { |
| 148 » » » replyError(c, rw, err) | 150 » » » replyError(c.Context, c.Writer, err) |
| 151 » » » c.Abort() |
| 149 return | 152 return |
| 150 } | 153 } |
| 151 } | 154 } |
| 152 | 155 |
| 153 » if err := notify.Acknowledge(c); err != nil { | 156 » if err := notify.Acknowledge(c.Context); err != nil { |
| 154 » » replyError(c, rw, err) | 157 » » replyError(c.Context, c.Writer, err) |
| 158 » » c.Abort() |
| 155 return | 159 return |
| 156 } | 160 } |
| 157 | 161 |
| 158 replyOK( | 162 replyOK( |
| 159 » » c, rw, "Processed PubSub notification for rev %d: %d -> %d", | 163 » » c.Context, c.Writer, "Processed PubSub notification for rev %d:
%d -> %d", |
| 160 notify.Revision, info.Rev, latest.Rev) | 164 notify.Revision, info.Rev, latest.Rev) |
| 161 } | 165 } |
| 162 | 166 |
| 163 // replyError sends HTTP 500 on transient errors, HTTP 400 on fatal ones. | 167 // replyError sends HTTP 500 on transient errors, HTTP 400 on fatal ones. |
| 164 func replyError(c context.Context, rw http.ResponseWriter, err error) { | 168 func replyError(c context.Context, rw http.ResponseWriter, err error) { |
| 165 logging.Errorf(c, "Error while processing PubSub notification - %s", err
) | 169 logging.Errorf(c, "Error while processing PubSub notification - %s", err
) |
| 166 if errors.IsTransient(err) { | 170 if errors.IsTransient(err) { |
| 167 http.Error(rw, err.Error(), http.StatusInternalServerError) | 171 http.Error(rw, err.Error(), http.StatusInternalServerError) |
| 168 } else { | 172 } else { |
| 169 http.Error(rw, err.Error(), http.StatusBadRequest) | 173 http.Error(rw, err.Error(), http.StatusBadRequest) |
| 170 } | 174 } |
| 171 } | 175 } |
| 172 | 176 |
| 173 // replyOK sends HTTP 200. | 177 // replyOK sends HTTP 200. |
| 174 func replyOK(c context.Context, rw http.ResponseWriter, msg string, args ...inte
rface{}) { | 178 func replyOK(c context.Context, rw http.ResponseWriter, msg string, args ...inte
rface{}) { |
| 175 logging.Infof(c, msg, args...) | 179 logging.Infof(c, msg, args...) |
| 176 rw.Write([]byte(fmt.Sprintf(msg, args...))) | 180 rw.Write([]byte(fmt.Sprintf(msg, args...))) |
| 177 } | 181 } |
| OLD | NEW |