Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(267)

Side by Side Diff: appengine/tumble/service.go

Issue 2043423004: Make HTTP middleware easier to use (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: gaemiddleware: add middleware func for WithProd Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/tsmon/middleware_test.go ('k') | appengine/tumble/tumbletest.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package tumble 5 package tumble
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "strconv" 10 "strconv"
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "github.com/julienschmidt/httprouter"
15 "github.com/luci/gae/service/datastore" 14 "github.com/luci/gae/service/datastore"
16 "github.com/luci/gae/service/info" 15 "github.com/luci/gae/service/info"
17 "github.com/luci/luci-go/appengine/gaemiddleware" 16 "github.com/luci/luci-go/appengine/gaemiddleware"
18 "github.com/luci/luci-go/common/errors" 17 "github.com/luci/luci-go/common/errors"
19 "github.com/luci/luci-go/common/logging" 18 "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/parallel" 19 "github.com/luci/luci-go/common/parallel"
20 "github.com/luci/luci-go/server/router"
21 "golang.org/x/net/context" 21 "golang.org/x/net/context"
22 ) 22 )
23 23
24 const transientHTTPHeader = "X-LUCI-Tumble-Transient" 24 const transientHTTPHeader = "X-LUCI-Tumble-Transient"
25 25
26 // Service is an instance of a Tumble service. It installs its handlers into an 26 // Service is an instance of a Tumble service. It installs its handlers into an
27 // HTTP router and services Tumble request tasks. 27 // HTTP router and services Tumble request tasks.
28 type Service struct { 28 type Service struct {
29 // Middleware is an optional function which allows your application to a dd 29 // Middleware is an optional function which allows your application to a dd
30 // application-specific resources to the context used by ProcessShardHan dler. 30 // application-specific resources to the context used by ProcessShardHan dler.
31 // 31 //
32 // Context will already be setup with BaseProd. 32 // Context will already be setup with BaseProd.
33 Middleware func(context.Context) context.Context 33 Middleware func(context.Context) context.Context
34 34
35 // Namespaces is a function that returns the datastore namespaces that T umble 35 // Namespaces is a function that returns the datastore namespaces that T umble
36 // will poll. 36 // will poll.
37 // 37 //
38 // If nil, Tumble will be executed against all namespaces registered in the 38 // If nil, Tumble will be executed against all namespaces registered in the
39 // datastore. 39 // datastore.
40 Namespaces func(context.Context) ([]string, error) 40 Namespaces func(context.Context) ([]string, error)
41 } 41 }
42 42
43 // InstallHandlers installs http handlers. 43 // InstallHandlers installs http handlers.
44 func (s *Service) InstallHandlers(r *httprouter.Router) { 44 func (s *Service) InstallHandlers(r *router.Router) {
45 » mc := append(gaemiddleware.BaseProd(), gaemiddleware.RequireCron)
45 // GET so that this can be invoked from cron 46 // GET so that this can be invoked from cron
46 » r.GET(fireAllTasksURL, 47 » r.GET(fireAllTasksURL, mc, s.FireAllTasksHandler)
47 » » gaemiddleware.BaseProd(gaemiddleware.RequireCron(s.FireAllTasksH andler))) 48 » r.POST(processShardPattern, append(mc, gaemiddleware.RequireTaskQueue(ba seName)), s.ProcessShardHandler)
48
49 » r.POST(processShardPattern,
50 » » gaemiddleware.BaseProd(gaemiddleware.RequireTaskQueue(baseName, s.ProcessShardHandler)))
51 } 49 }
52 50
53 // FireAllTasksHandler is a http handler suitable for installation into 51 // FireAllTasksHandler is an HTTP handler that expects `logging` and `luci/gae`
54 // a httprouter. It expects `logging` and `luci/gae` services to be installed 52 // services to be installed into the context.
55 // into the context.
56 // 53 //
57 // FireAllTasksHandler verifies that it was called within an Appengine Cron 54 // FireAllTasksHandler verifies that it was called within an Appengine Cron
58 // request, and then invokes the FireAllTasks function. 55 // request, and then invokes the FireAllTasks function.
59 func (s *Service) FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Request, _ httprouter.Params) { 56 func (s *Service) FireAllTasksHandler(c *router.Context) {
60 » if err := s.FireAllTasks(c); err != nil { 57 » if err := s.FireAllTasks(c.Context); err != nil {
61 » » rw.WriteHeader(http.StatusInternalServerError) 58 » » c.Writer.WriteHeader(http.StatusInternalServerError)
62 » » fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) 59 » » fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err)
63 } else { 60 } else {
64 » » rw.Write([]byte("ok")) 61 » » c.Writer.Write([]byte("ok"))
65 } 62 }
66 } 63 }
67 64
68 // FireAllTasks searches for work in all namespaces, and fires off a process 65 // FireAllTasks searches for work in all namespaces, and fires off a process
69 // task for any shards it finds that have at least one Mutation present to 66 // task for any shards it finds that have at least one Mutation present to
70 // ensure that no work languishes forever. This may not be needed in 67 // ensure that no work languishes forever. This may not be needed in
71 // a constantly-loaded system with good tumble key distribution. 68 // a constantly-loaded system with good tumble key distribution.
72 func (s *Service) FireAllTasks(c context.Context) error { 69 func (s *Service) FireAllTasks(c context.Context) error {
73 cfg := getConfig(c) 70 cfg := getConfig(c)
74 shards := make(map[taskShard]struct{}, cfg.NumShards) 71 shards := make(map[taskShard]struct{}, cfg.NumShards)
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
170 return 167 return
171 } 168 }
172 } else { 169 } else {
173 // Namespacing is disabled, use a single empty string. Process w ill 170 // Namespacing is disabled, use a single empty string. Process w ill
174 // interpret this as a signal to not use namesapces. 171 // interpret this as a signal to not use namesapces.
175 namespaces = []string{""} 172 namespaces = []string{""}
176 } 173 }
177 return 174 return
178 } 175 }
179 176
180 // ProcessShardHandler is a http handler suitable for installation into 177 // ProcessShardHandler is an HTTP handler that expects `logging` and `luci/gae`
181 // a httprouter. It expects `logging` and `luci/gae` services to be installed 178 // services to be installed into the context.
182 // into the context.
183 // 179 //
184 // ProcessShardHandler verifies that its being run as a taskqueue task and that 180 // ProcessShardHandler verifies that its being run as a taskqueue task and that
185 // the following parameters exist and are well-formed: 181 // the following parameters exist and are well-formed:
186 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. 182 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
187 // * shard_id: decimal-encoded shard identifier. 183 // * shard_id: decimal-encoded shard identifier.
188 // 184 //
189 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. 185 // ProcessShardHandler then invokes ProcessShard with the parsed parameters.
190 func (s *Service) ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) { 186 func (s *Service) ProcessShardHandler(ctx *router.Context) {
187 » c, rw, p := ctx.Context, ctx.Writer, ctx.Params
188
191 if s.Middleware != nil { 189 if s.Middleware != nil {
192 c = s.Middleware(c) 190 c = s.Middleware(c)
193 } 191 }
194 192
195 tstampStr := p.ByName("timestamp") 193 tstampStr := p.ByName("timestamp")
196 sidStr := p.ByName("shard_id") 194 sidStr := p.ByName("shard_id")
197 195
198 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) 196 tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
199 if err != nil { 197 if err != nil {
200 logging.Errorf(c, "bad timestamp %q", tstampStr) 198 logging.Errorf(c, "bad timestamp %q", tstampStr)
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
255 } 253 }
256 254
257 namespaces := make([]string, 0, len(namespaceKeys)) 255 namespaces := make([]string, 0, len(namespaceKeys))
258 for _, nk := range namespaceKeys { 256 for _, nk := range namespaceKeys {
259 if ns := nk.StringID(); ns != "" { 257 if ns := nk.StringID(); ns != "" {
260 namespaces = append(namespaces, ns) 258 namespaces = append(namespaces, ns)
261 } 259 }
262 } 260 }
263 return namespaces, nil 261 return namespaces, nil
264 } 262 }
OLDNEW
« no previous file with comments | « appengine/tsmon/middleware_test.go ('k') | appengine/tumble/tumbletest.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698