Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(285)

Side by Side Diff: appengine/cmd/cron/frontend/handler.go

Issue 2043423004: Make HTTP middleware easier to use (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: gaemiddleware: add middleware func for WithProd Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/cmd/cron/ui/common.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 // Package frontend implements GAE web server for luci-cron service. 5 // Package frontend implements GAE web server for luci-cron service.
6 // 6 //
7 // Due to the way classic GAE imports work, this package can not have 7 // Due to the way classic GAE imports work, this package can not have
8 // subpackages (or at least subpackages referenced via absolute import path). 8 // subpackages (or at least subpackages referenced via absolute import path).
9 // We can't use relative imports because luci-go will then become unbuildable 9 // We can't use relative imports because luci-go will then become unbuildable
10 // by regular (non GAE) toolset. 10 // by regular (non GAE) toolset.
11 // 11 //
12 // See https://groups.google.com/forum/#!topic/google-appengine-go/dNhqV6PBqVc. 12 // See https://groups.google.com/forum/#!topic/google-appengine-go/dNhqV6PBqVc.
13 package frontend 13 package frontend
14 14
15 import ( 15 import (
16 cryptorand "crypto/rand" 16 cryptorand "crypto/rand"
17 "encoding/binary" 17 "encoding/binary"
18 "fmt" 18 "fmt"
19 "io/ioutil" 19 "io/ioutil"
20 "math/rand" 20 "math/rand"
21 "net/http" 21 "net/http"
22 "net/url" 22 "net/url"
23 "strconv" 23 "strconv"
24 "sync" 24 "sync"
25 "time" 25 "time"
26 26
27 "github.com/julienschmidt/httprouter"
28 "golang.org/x/net/context" 27 "golang.org/x/net/context"
29 "google.golang.org/appengine" 28 "google.golang.org/appengine"
30 29
31 "github.com/luci/gae/service/info" 30 "github.com/luci/gae/service/info"
32 "github.com/luci/gae/service/taskqueue" 31 "github.com/luci/gae/service/taskqueue"
33 32
34 "github.com/luci/luci-go/server/auth" 33 "github.com/luci/luci-go/server/auth"
35 » "github.com/luci/luci-go/server/middleware" 34 » "github.com/luci/luci-go/server/router"
36 35
37 "github.com/luci/luci-go/appengine/gaeauth/server" 36 "github.com/luci/luci-go/appengine/gaeauth/server"
38 "github.com/luci/luci-go/appengine/gaeconfig" 37 "github.com/luci/luci-go/appengine/gaeconfig"
39 "github.com/luci/luci-go/appengine/gaemiddleware" 38 "github.com/luci/luci-go/appengine/gaemiddleware"
40 39
41 "github.com/luci/luci-go/common/config" 40 "github.com/luci/luci-go/common/config"
42 "github.com/luci/luci-go/common/config/impl/memory" 41 "github.com/luci/luci-go/common/config/impl/memory"
43 "github.com/luci/luci-go/common/errors" 42 "github.com/luci/luci-go/common/errors"
44 "github.com/luci/luci-go/common/logging" 43 "github.com/luci/luci-go/common/logging"
45 44
(...skipping 17 matching lines...) Expand all
63 managers = []task.Manager{ 62 managers = []task.Manager{
64 &buildbucket.TaskManager{}, 63 &buildbucket.TaskManager{},
65 &noop.TaskManager{}, 64 &noop.TaskManager{},
66 &swarming.TaskManager{}, 65 &swarming.TaskManager{},
67 &urlfetch.TaskManager{}, 66 &urlfetch.TaskManager{},
68 } 67 }
69 ) 68 )
70 69
71 //// Helpers. 70 //// Helpers.
72 71
73 type handler func(c *requestContext) 72 // requestContext is used to add helper methods.
74 73 type requestContext router.Context
75 type requestContext struct {
76 » context.Context
77
78 » w http.ResponseWriter
79 » r *http.Request
80 » p httprouter.Params
81 }
82 74
83 // fail writes error message to the log and the response and sets status code. 75 // fail writes error message to the log and the response and sets status code.
84 func (c *requestContext) fail(code int, msg string, args ...interface{}) { 76 func (c *requestContext) fail(code int, msg string, args ...interface{}) {
85 body := fmt.Sprintf(msg, args...) 77 body := fmt.Sprintf(msg, args...)
86 » logging.Errorf(c, "HTTP %d: %s", code, body) 78 » logging.Errorf(c.Context, "HTTP %d: %s", code, body)
87 » http.Error(c.w, body, code) 79 » http.Error(c.Writer, body, code)
88 } 80 }
89 81
90 // err sets status to 500 on transient errors or 202 on fatal ones. Returning 82 // err sets status to 500 on transient errors or 202 on fatal ones. Returning
91 // status code in range [200–299] is the only way to tell Task Queues to stop 83 // status code in range [200–299] is the only way to tell Task Queues to stop
92 // retrying the task. 84 // retrying the task.
93 func (c *requestContext) err(e error, msg string, args ...interface{}) { 85 func (c *requestContext) err(e error, msg string, args ...interface{}) {
94 code := 500 86 code := 500
95 if !errors.IsTransient(e) { 87 if !errors.IsTransient(e) {
96 code = 202 88 code = 202
97 } 89 }
98 args = append(args, e) 90 args = append(args, e)
99 c.fail(code, msg+" - %s", args...) 91 c.fail(code, msg+" - %s", args...)
100 } 92 }
101 93
102 // ok sets status to 200 and puts "OK" in response. 94 // ok sets status to 200 and puts "OK" in response.
103 func (c *requestContext) ok() { 95 func (c *requestContext) ok() {
104 » c.w.Header().Set("Content-Type", "text/plain; charset=utf-8") 96 » c.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
105 » c.w.WriteHeader(200) 97 » c.Writer.WriteHeader(200)
106 » fmt.Fprintln(c.w, "OK") 98 » fmt.Fprintln(c.Writer, "OK")
107 } 99 }
108 100
109 /// 101 ///
110 102
111 var globalInit sync.Once 103 var globalInit sync.Once
112 104
113 // initializeGlobalState does one time initialization for stuff that needs 105 // initializeGlobalState does one time initialization for stuff that needs
114 // active GAE context. 106 // active GAE context.
115 func initializeGlobalState(c context.Context) { 107 func initializeGlobalState(c context.Context) {
116 if info.Get(c).IsDevAppServer() { 108 if info.Get(c).IsDevAppServer() {
117 // Dev app server doesn't preserve the state of task queues acro ss restarts, 109 // Dev app server doesn't preserve the state of task queues acro ss restarts,
118 // need to reset datastore state accordingly, otherwise everythi ng gets stuck. 110 // need to reset datastore state accordingly, otherwise everythi ng gets stuck.
119 if err := globalEngine.ResetAllJobsOnDevServer(c); err != nil { 111 if err := globalEngine.ResetAllJobsOnDevServer(c); err != nil {
120 logging.Errorf(c, "Failed to reset jobs: %s", err) 112 logging.Errorf(c, "Failed to reset jobs: %s", err)
121 } 113 }
122 } 114 }
123 } 115 }
124 116
125 // getConfigImpl returns config.Interface implementation to use from the 117 // getConfigImpl returns config.Interface implementation to use from the
126 // catalog. 118 // catalog.
127 func getConfigImpl(c context.Context) (config.Interface, error) { 119 func getConfigImpl(c context.Context) (config.Interface, error) {
128 // Use fake config data on dev server for simplicity. 120 // Use fake config data on dev server for simplicity.
129 if info.Get(c).IsDevAppServer() { 121 if info.Get(c).IsDevAppServer() {
130 return memory.New(devServerConfigs()), nil 122 return memory.New(devServerConfigs()), nil
131 } 123 }
132 return gaeconfig.New(c) 124 return gaeconfig.New(c)
133 } 125 }
134 126
135 // wrap converts the handler to format accepted by middleware lib. It also adds 127 // base returns middleware chain. It initializes prod context and sets up
136 // context initialization code.
137 func wrap(h handler) middleware.Handler {
138 » return func(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
139 » » h(&requestContext{c, w, r, p})
140 » }
141 }
142
143 // base starts middleware chain. It initializes prod context and sets up
144 // authentication config. 128 // authentication config.
145 func base(h middleware.Handler) httprouter.Handle { 129 func base() router.MiddlewareChain {
146 methods := auth.Authenticator{ 130 methods := auth.Authenticator{
147 &server.OAuth2Method{Scopes: []string{server.EmailScope}}, 131 &server.OAuth2Method{Scopes: []string{server.EmailScope}},
148 server.CookieAuth, 132 server.CookieAuth,
149 &server.InboundAppIDAuthMethod{}, 133 &server.InboundAppIDAuthMethod{},
150 } 134 }
151 » wrapper := func(c context.Context, w http.ResponseWriter, r *http.Reques t, p httprouter.Params) { 135 » return append(
152 » » globalInit.Do(func() { initializeGlobalState(c) }) 136 » » gaemiddleware.BaseProd(),
153 » » c = auth.SetAuthenticator(c, methods) 137 » » func(c *router.Context, next router.Handler) {
154 » » h(c, w, r, p) 138 » » » globalInit.Do(func() { initializeGlobalState(c.Context) })
155 » } 139 » » » c.Context = auth.SetAuthenticator(c.Context, methods)
156 » return gaemiddleware.BaseProd(wrapper) 140 » » » next(c)
157 } 141 » » },
158 142 » )
159 // cronHandler returns handler intended for cron jobs.
160 func cronHandler(h handler) httprouter.Handle {
161 » return base(gaemiddleware.RequireCron(wrap(h)))
162 }
163
164 // taskQueueHandler returns handler intended for task queue calls.
165 func taskQueueHandler(name string, h handler) httprouter.Handle {
166 » return base(gaemiddleware.RequireTaskQueue(name, wrap(h)))
167 } 143 }
168 144
169 //// Routes. 145 //// Routes.
170 146
171 func init() { 147 func init() {
172 // Dev server likes to restart a lot, and upon a restart math/rand seed is 148 // Dev server likes to restart a lot, and upon a restart math/rand seed is
173 // always set to 1, resulting in lots of presumably "random" IDs not bei ng 149 // always set to 1, resulting in lots of presumably "random" IDs not bei ng
174 // very random. Seed it with real randomness. 150 // very random. Seed it with real randomness.
175 var seed int64 151 var seed int64
176 if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); er r != nil { 152 if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); er r != nil {
(...skipping 11 matching lines...) Expand all
188 globalEngine = engine.NewEngine(engine.Config{ 164 globalEngine = engine.NewEngine(engine.Config{
189 Catalog: globalCatalog, 165 Catalog: globalCatalog,
190 TimersQueuePath: "/internal/tasks/timers", 166 TimersQueuePath: "/internal/tasks/timers",
191 TimersQueueName: "timers", 167 TimersQueueName: "timers",
192 InvocationsQueuePath: "/internal/tasks/invocations", 168 InvocationsQueuePath: "/internal/tasks/invocations",
193 InvocationsQueueName: "invocations", 169 InvocationsQueueName: "invocations",
194 PubSubPushPath: "/pubsub", 170 PubSubPushPath: "/pubsub",
195 }) 171 })
196 172
197 // Setup HTTP routes. 173 // Setup HTTP routes.
198 » router := httprouter.New() 174 » r := router.New()
175 » basemw := base()
199 176
200 » gaemiddleware.InstallHandlers(router, base) 177 » gaemiddleware.InstallHandlers(r, basemw)
201 » ui.InstallHandlers(router, base, ui.Config{ 178 » ui.InstallHandlers(r, basemw, ui.Config{
202 Engine: globalEngine, 179 Engine: globalEngine,
203 TemplatesPath: "templates", 180 TemplatesPath: "templates",
204 }) 181 })
205 182
206 » router.GET("/_ah/warmup", base(wrap(warmupHandler))) 183 » r.GET("/_ah/warmup", basemw, warmupHandler)
207 » router.GET("/_ah/start", base(wrap(warmupHandler))) 184 » r.GET("/_ah/start", basemw, warmupHandler)
208 » router.POST("/pubsub", base(wrap(pubsubPushHandler))) 185 » r.POST("/pubsub", basemw, pubsubPushHandler)
209 » router.GET("/internal/cron/read-config", cronHandler(readConfigCron)) 186 » r.GET("/internal/cron/read-config", append(basemw, gaemiddleware.Require Cron), readConfigCron)
210 » router.POST("/internal/tasks/read-project-config", taskQueueHandler("rea d-project-config", readProjectConfigTask)) 187 » r.POST("/internal/tasks/read-project-config", append(basemw, gaemiddlewa re.RequireTaskQueue("read-project-config")), readProjectConfigTask)
211 » router.POST("/internal/tasks/timers", taskQueueHandler("timers", actionT ask)) 188 » r.POST("/internal/tasks/timers", append(basemw, gaemiddleware.RequireTas kQueue("timers")), actionTask)
212 » router.POST("/internal/tasks/invocations", taskQueueHandler("invocations ", actionTask)) 189 » r.POST("/internal/tasks/invocations", append(basemw, gaemiddleware.Requi reTaskQueue("invocations")), actionTask)
213 190
214 // Devserver can't accept PubSub pushes, so allow manual pulls instead t o 191 // Devserver can't accept PubSub pushes, so allow manual pulls instead t o
215 // simplify local development. 192 // simplify local development.
216 if appengine.IsDevAppServer() { 193 if appengine.IsDevAppServer() {
217 » » router.GET("/pubsub/pull/:ManagerName/:Publisher", base(wrap(pub subPullHandler))) 194 » » r.GET("/pubsub/pull/:ManagerName/:Publisher", basemw, pubsubPull Handler)
218 } 195 }
219 196
220 » http.DefaultServeMux.Handle("/", router) 197 » http.DefaultServeMux.Handle("/", r)
221 } 198 }
222 199
223 // warmupHandler warms in-memory caches. 200 // warmupHandler warms in-memory caches.
224 func warmupHandler(rc *requestContext) { 201 func warmupHandler(c *router.Context) {
225 » if err := server.Warmup(rc); err != nil { 202 » rc := requestContext(*c)
203 » if err := server.Warmup(rc.Context); err != nil {
226 rc.fail(500, "Failed to warmup OpenID: %s", err) 204 rc.fail(500, "Failed to warmup OpenID: %s", err)
227 return 205 return
228 } 206 }
229 rc.ok() 207 rc.ok()
230 } 208 }
231 209
232 // pubsubPushHandler handles incoming PubSub messages. 210 // pubsubPushHandler handles incoming PubSub messages.
233 func pubsubPushHandler(rc *requestContext) { 211 func pubsubPushHandler(c *router.Context) {
234 » body, err := ioutil.ReadAll(rc.r.Body) 212 » rc := requestContext(*c)
213 » body, err := ioutil.ReadAll(rc.Request.Body)
235 if err != nil { 214 if err != nil {
236 rc.fail(500, "Failed to read the request: %s", err) 215 rc.fail(500, "Failed to read the request: %s", err)
237 return 216 return
238 } 217 }
239 » if err = globalEngine.ProcessPubSubPush(rc, body); err != nil { 218 » if err = globalEngine.ProcessPubSubPush(rc.Context, body); err != nil {
240 rc.err(err, "Failed to process incoming PubSub push") 219 rc.err(err, "Failed to process incoming PubSub push")
241 return 220 return
242 } 221 }
243 rc.ok() 222 rc.ok()
244 } 223 }
245 224
246 // pubsubPullHandler is called on dev server by developer to pull pubsub 225 // pubsubPullHandler is called on dev server by developer to pull pubsub
247 // messages from a topic created for a publisher. 226 // messages from a topic created for a publisher.
248 func pubsubPullHandler(rc *requestContext) { 227 func pubsubPullHandler(c *router.Context) {
228 » rc := requestContext(*c)
249 if !appengine.IsDevAppServer() { 229 if !appengine.IsDevAppServer() {
250 rc.fail(403, "Not a dev server") 230 rc.fail(403, "Not a dev server")
251 return 231 return
252 } 232 }
253 err := globalEngine.PullPubSubOnDevServer( 233 err := globalEngine.PullPubSubOnDevServer(
254 » » rc, rc.p.ByName("ManagerName"), rc.p.ByName("Publisher")) 234 » » rc.Context, rc.Params.ByName("ManagerName"), rc.Params.ByName("P ublisher"))
255 if err != nil { 235 if err != nil {
256 rc.err(err, "Failed to pull PubSub messages") 236 rc.err(err, "Failed to pull PubSub messages")
257 } else { 237 } else {
258 rc.ok() 238 rc.ok()
259 } 239 }
260 } 240 }
261 241
262 // readConfigCron grabs a list of projects from the catalog and datastore and 242 // readConfigCron grabs a list of projects from the catalog and datastore and
263 // dispatches task queue tasks to update each project's cron jobs. 243 // dispatches task queue tasks to update each project's cron jobs.
264 func readConfigCron(c *requestContext) { 244 func readConfigCron(c *router.Context) {
245 » rc := requestContext(*c)
265 projectsToVisit := map[string]bool{} 246 projectsToVisit := map[string]bool{}
266 247
267 // Visit all projects in the catalog. 248 // Visit all projects in the catalog.
268 » ctx, _ := context.WithTimeout(c.Context, 150*time.Second) 249 » ctx, _ := context.WithTimeout(rc.Context, 150*time.Second)
269 projects, err := globalCatalog.GetAllProjects(ctx) 250 projects, err := globalCatalog.GetAllProjects(ctx)
270 if err != nil { 251 if err != nil {
271 » » c.err(err, "Failed to grab a list of project IDs from catalog") 252 » » rc.err(err, "Failed to grab a list of project IDs from catalog")
272 return 253 return
273 } 254 }
274 for _, id := range projects { 255 for _, id := range projects {
275 projectsToVisit[id] = true 256 projectsToVisit[id] = true
276 } 257 }
277 258
278 // Also visit all registered projects that do not show up in the catalog 259 // Also visit all registered projects that do not show up in the catalog
279 // listing anymore. It will unregister all crons belonging to them. 260 // listing anymore. It will unregister all crons belonging to them.
280 » existing, err := globalEngine.GetAllProjects(c.Context) 261 » existing, err := globalEngine.GetAllProjects(rc.Context)
281 if err != nil { 262 if err != nil {
282 » » c.err(err, "Failed to grab a list of project IDs from datastore" ) 263 » » rc.err(err, "Failed to grab a list of project IDs from datastore ")
283 return 264 return
284 } 265 }
285 for _, id := range existing { 266 for _, id := range existing {
286 projectsToVisit[id] = true 267 projectsToVisit[id] = true
287 } 268 }
288 269
289 // Handle each project in its own task to avoid "bad" projects (e.g. one s with 270 // Handle each project in its own task to avoid "bad" projects (e.g. one s with
290 // lots of crons) to slow down "good" ones. 271 // lots of crons) to slow down "good" ones.
291 tasks := make([]*taskqueue.Task, 0, len(projectsToVisit)) 272 tasks := make([]*taskqueue.Task, 0, len(projectsToVisit))
292 for projectID := range projectsToVisit { 273 for projectID := range projectsToVisit {
293 tasks = append(tasks, &taskqueue.Task{ 274 tasks = append(tasks, &taskqueue.Task{
294 Path: "/internal/tasks/read-project-config?projectID=" + url.QueryEscape(projectID), 275 Path: "/internal/tasks/read-project-config?projectID=" + url.QueryEscape(projectID),
295 }) 276 })
296 } 277 }
297 » tq := taskqueue.Get(c) 278 » tq := taskqueue.Get(rc.Context)
298 if err = tq.AddMulti(tasks, "read-project-config"); err != nil { 279 if err = tq.AddMulti(tasks, "read-project-config"); err != nil {
299 » » c.err(errors.WrapTransient(err), "Failed to add tasks to task qu eue") 280 » » rc.err(errors.WrapTransient(err), "Failed to add tasks to task q ueue")
300 } else { 281 } else {
301 » » c.ok() 282 » » rc.ok()
302 } 283 }
303 } 284 }
304 285
305 // readProjectConfigTask grabs a list of cron jobs in a project from catalog, 286 // readProjectConfigTask grabs a list of cron jobs in a project from catalog,
306 // updates all changed cron jobs, adds new ones, disables old ones. 287 // updates all changed cron jobs, adds new ones, disables old ones.
307 func readProjectConfigTask(c *requestContext) { 288 func readProjectConfigTask(c *router.Context) {
308 » projectID := c.r.URL.Query().Get("projectID") 289 » rc := requestContext(*c)
290 » projectID := rc.Request.URL.Query().Get("projectID")
309 if projectID == "" { 291 if projectID == "" {
310 // Return 202 to avoid retry, it is fatal error. 292 // Return 202 to avoid retry, it is fatal error.
311 » » c.fail(202, "Missing projectID query attribute") 293 » » rc.fail(202, "Missing projectID query attribute")
312 return 294 return
313 } 295 }
314 » ctx, _ := context.WithTimeout(c.Context, 150*time.Second) 296 » ctx, _ := context.WithTimeout(rc.Context, 150*time.Second)
315 jobs, err := globalCatalog.GetProjectJobs(ctx, projectID) 297 jobs, err := globalCatalog.GetProjectJobs(ctx, projectID)
316 if err != nil { 298 if err != nil {
317 » » c.err(err, "Failed to query for a list of jobs") 299 » » rc.err(err, "Failed to query for a list of jobs")
318 return 300 return
319 } 301 }
320 » if err = globalEngine.UpdateProjectJobs(c.Context, projectID, jobs); err != nil { 302 » if err = globalEngine.UpdateProjectJobs(rc.Context, projectID, jobs); er r != nil {
321 » » c.err(err, "Failed to update some cron jobs") 303 » » rc.err(err, "Failed to update some cron jobs")
322 return 304 return
323 } 305 }
324 » c.ok() 306 » rc.ok()
325 } 307 }
326 308
327 // actionTask is used to route actions emitted by cron job state transitions 309 // actionTask is used to route actions emitted by cron job state transitions
328 // back into Engine (see enqueueActions). 310 // back into Engine (see enqueueActions).
329 func actionTask(c *requestContext) { 311 func actionTask(c *router.Context) {
330 » body, err := ioutil.ReadAll(c.r.Body) 312 » rc := requestContext(*c)
313 » body, err := ioutil.ReadAll(rc.Request.Body)
331 if err != nil { 314 if err != nil {
332 » » c.fail(500, "Failed to read request body: %s", err) 315 » » rc.fail(500, "Failed to read request body: %s", err)
333 return 316 return
334 } 317 }
335 » count, _ := strconv.Atoi(c.r.Header.Get("X-AppEngine-TaskExecutionCount" )) 318 » count, _ := strconv.Atoi(rc.Request.Header.Get("X-AppEngine-TaskExecutio nCount"))
336 » err = globalEngine.ExecuteSerializedAction(c.Context, body, count) 319 » err = globalEngine.ExecuteSerializedAction(rc.Context, body, count)
337 if err != nil { 320 if err != nil {
338 » » c.err(err, "Error when executing the action") 321 » » rc.err(err, "Error when executing the action")
339 return 322 return
340 } 323 }
341 » c.ok() 324 » rc.ok()
342 } 325 }
OLDNEW
« no previous file with comments | « no previous file | appengine/cmd/cron/ui/common.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698