| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package authdb | 5 package authdb |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "io/ioutil" | 9 "io/ioutil" |
| 10 "net/http" | 10 "net/http" |
| 11 "net/url" | 11 "net/url" |
| 12 | 12 |
| 13 "github.com/julienschmidt/httprouter" | |
| 14 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
| 15 "google.golang.org/appengine" | 14 "google.golang.org/appengine" |
| 16 | 15 |
| 17 "github.com/luci/gae/service/info" | 16 "github.com/luci/gae/service/info" |
| 18 | 17 |
| 19 "github.com/luci/luci-go/appengine/gaeauth/client" | 18 "github.com/luci/luci-go/appengine/gaeauth/client" |
| 20 "github.com/luci/luci-go/common/errors" | 19 "github.com/luci/luci-go/common/errors" |
| 21 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 22 "github.com/luci/luci-go/server/auth/service" | 21 "github.com/luci/luci-go/server/auth/service" |
| 23 » "github.com/luci/luci-go/server/middleware" | 22 » "github.com/luci/luci-go/server/router" |
| 24 ) | 23 ) |
| 25 | 24 |
| 26 const ( | 25 const ( |
| 27 pubSubPullURLPath = "/auth/pubsub/authdb:pull" // dev server only | 26 pubSubPullURLPath = "/auth/pubsub/authdb:pull" // dev server only |
| 28 pubSubPushURLPath = "/auth/pubsub/authdb:push" | 27 pubSubPushURLPath = "/auth/pubsub/authdb:push" |
| 29 ) | 28 ) |
| 30 | 29 |
| 31 // InstallHandlers installs PubSub related HTTP handlers. | 30 // InstallHandlers installs PubSub related HTTP handlers. |
| 32 func InstallHandlers(r *httprouter.Router, base middleware.Base) { | 31 func InstallHandlers(r *router.Router, base router.MiddlewareChain) { |
| 33 if appengine.IsDevAppServer() { | 32 if appengine.IsDevAppServer() { |
| 34 » » r.GET(pubSubPullURLPath, base(pubSubPull)) | 33 » » r.GET(pubSubPullURLPath, base, pubSubPull) |
| 35 } | 34 } |
| 36 » r.POST(pubSubPushURLPath, base(pubSubPush)) | 35 » r.POST(pubSubPushURLPath, base, pubSubPush) |
| 37 } | 36 } |
| 38 | 37 |
| 39 // authenticatePubSub injects into a context a transport that authenticates | 38 // authenticatePubSub injects into a context a transport that authenticates |
| 40 // calls with OAuth2 token with PubSub API scope needed for PubSub API calls. | 39 // calls with OAuth2 token with PubSub API scope needed for PubSub API calls. |
| 41 func authenticatePubSub(c context.Context) context.Context { | 40 func authenticatePubSub(c context.Context) context.Context { |
| 42 scopes := []string{ | 41 scopes := []string{ |
| 43 "https://www.googleapis.com/auth/userinfo.email", | 42 "https://www.googleapis.com/auth/userinfo.email", |
| 44 "https://www.googleapis.com/auth/pubsub", | 43 "https://www.googleapis.com/auth/pubsub", |
| 45 } | 44 } |
| 46 return client.UseServiceAccountTransport(c, scopes, nil) | 45 return client.UseServiceAccountTransport(c, scopes, nil) |
| (...skipping 28 matching lines...) Expand all Loading... |
| 75 panic(err) | 74 panic(err) |
| 76 } | 75 } |
| 77 return fmt.Sprintf("projects/%s/subscriptions/%s+%s", gaeInfo.AppID(), s
ubIDPrefix, serviceURL.Host) | 76 return fmt.Sprintf("projects/%s/subscriptions/%s+%s", gaeInfo.AppID(), s
ubIDPrefix, serviceURL.Host) |
| 78 } | 77 } |
| 79 | 78 |
| 80 // pubSubPull is HTTP handler that pulls PubSub messages from AuthDB change | 79 // pubSubPull is HTTP handler that pulls PubSub messages from AuthDB change |
| 81 // notification topic. | 80 // notification topic. |
| 82 // | 81 // |
| 83 // Used only on dev server for manual testing. Prod services use push-based | 82 // Used only on dev server for manual testing. Prod services use push-based |
| 84 // delivery. | 83 // delivery. |
| 85 func pubSubPull(c context.Context, rw http.ResponseWriter, r *http.Request, p ht
tprouter.Params) { | 84 func pubSubPull(c *router.Context) { |
| 86 if !appengine.IsDevAppServer() { | 85 if !appengine.IsDevAppServer() { |
| 87 » » replyError(c, rw, errors.New("not a dev server")) | 86 » » replyError(c.Context, c.Writer, errors.New("not a dev server")) |
| 88 return | 87 return |
| 89 } | 88 } |
| 90 » processPubSubRequest(c, rw, r, func(c context.Context, srv authService,
serviceURL string) (*service.Notification, error) { | 89 » processPubSubRequest(c.Context, c.Writer, c.Request, func(c context.Cont
ext, srv authService, serviceURL string) (*service.Notification, error) { |
| 91 return srv.PullPubSub(c, subscriptionName(c, serviceURL)) | 90 return srv.PullPubSub(c, subscriptionName(c, serviceURL)) |
| 92 }) | 91 }) |
| 93 } | 92 } |
| 94 | 93 |
| 95 // pubSubPush is HTTP handler that processes incoming PubSub push notifications. | 94 // pubSubPush is HTTP handler that processes incoming PubSub push notifications. |
| 96 // | 95 // |
| 97 // It uses the signature inside PubSub message body for authentication. Skips | 96 // It uses the signature inside PubSub message body for authentication. Skips |
| 98 // messages not signed by currently configured auth service. | 97 // messages not signed by currently configured auth service. |
| 99 func pubSubPush(c context.Context, rw http.ResponseWriter, r *http.Request, p ht
tprouter.Params) { | 98 func pubSubPush(c *router.Context) { |
| 100 » processPubSubRequest(c, rw, r, func(c context.Context, srv authService,
serviceURL string) (*service.Notification, error) { | 99 » processPubSubRequest(c.Context, c.Writer, c.Request, func(ctx context.Co
ntext, srv authService, serviceURL string) (*service.Notification, error) { |
| 101 » » body, err := ioutil.ReadAll(r.Body) | 100 » » body, err := ioutil.ReadAll(c.Request.Body) |
| 102 if err != nil { | 101 if err != nil { |
| 103 return nil, err | 102 return nil, err |
| 104 } | 103 } |
| 105 » » return srv.ProcessPubSubPush(c, body) | 104 » » return srv.ProcessPubSubPush(ctx, body) |
| 106 }) | 105 }) |
| 107 } | 106 } |
| 108 | 107 |
| 109 type notifcationGetter func(context.Context, authService, string) (*service.Noti
fication, error) | 108 type notifcationGetter func(context.Context, authService, string) (*service.Noti
fication, error) |
| 110 | 109 |
| 111 // processPubSubRequest is common wrapper for pubSubPull and pubSubPush. | 110 // processPubSubRequest is common wrapper for pubSubPull and pubSubPush. |
| 112 // | 111 // |
| 113 // It implements most logic of notification handling. Calls supplied callback | 112 // It implements most logic of notification handling. Calls supplied callback |
| 114 // to actually get service.Notification, since this part is different from Pull | 113 // to actually get service.Notification, since this part is different from Pull |
| 115 // and Push subscriptions. | 114 // and Push subscriptions. |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 168 } else { | 167 } else { |
| 169 http.Error(rw, err.Error(), http.StatusBadRequest) | 168 http.Error(rw, err.Error(), http.StatusBadRequest) |
| 170 } | 169 } |
| 171 } | 170 } |
| 172 | 171 |
| 173 // replyOK sends HTTP 200. | 172 // replyOK sends HTTP 200. |
| 174 func replyOK(c context.Context, rw http.ResponseWriter, msg string, args ...inte
rface{}) { | 173 func replyOK(c context.Context, rw http.ResponseWriter, msg string, args ...inte
rface{}) { |
| 175 logging.Infof(c, msg, args...) | 174 logging.Infof(c, msg, args...) |
| 176 rw.Write([]byte(fmt.Sprintf(msg, args...))) | 175 rw.Write([]byte(fmt.Sprintf(msg, args...))) |
| 177 } | 176 } |
| OLD | NEW |