Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(275)

Side by Side Diff: appengine/tumble/service.go

Issue 2043423004: Make HTTP middleware easier to use (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Update tests Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package tumble 5 package tumble
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "strconv" 10 "strconv"
11 "sync" 11 "sync"
12 "time" 12 "time"
13 13
14 "github.com/julienschmidt/httprouter"
15 "github.com/luci/gae/service/datastore" 14 "github.com/luci/gae/service/datastore"
16 "github.com/luci/gae/service/info" 15 "github.com/luci/gae/service/info"
17 "github.com/luci/luci-go/appengine/gaemiddleware" 16 "github.com/luci/luci-go/appengine/gaemiddleware"
18 "github.com/luci/luci-go/common/errors" 17 "github.com/luci/luci-go/common/errors"
19 "github.com/luci/luci-go/common/logging" 18 "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/parallel" 19 "github.com/luci/luci-go/common/parallel"
20 "github.com/luci/luci-go/server/router"
21 "golang.org/x/net/context" 21 "golang.org/x/net/context"
22 ) 22 )
23 23
24 const transientHTTPHeader = "X-LUCI-Tumble-Transient" 24 const transientHTTPHeader = "X-LUCI-Tumble-Transient"
25 25
26 // Service is an instance of a Tumble service. It installs its handlers into an 26 // Service is an instance of a Tumble service. It installs its handlers into an
27 // HTTP router and services Tumble request tasks. 27 // HTTP router and services Tumble request tasks.
28 type Service struct { 28 type Service struct {
29 // Middleware is an optional function which allows your application to a dd 29 // Middleware is an optional function which allows your application to a dd
30 // application-specific resources to the context used by ProcessShardHan dler. 30 // application-specific resources to the context used by ProcessShardHan dler.
31 // 31 //
32 // Context will already be setup with BaseProd. 32 // Context will already be setup with BaseProd.
33 Middleware func(context.Context) context.Context 33 Middleware func(context.Context) context.Context
34 34
35 // Namespaces is a function that returns the datastore namespaces that T umble 35 // Namespaces is a function that returns the datastore namespaces that T umble
36 // will poll. 36 // will poll.
37 // 37 //
38 // If nil, Tumble will be executed against all namespaces registered in the 38 // If nil, Tumble will be executed against all namespaces registered in the
39 // datastore. 39 // datastore.
40 Namespaces func(context.Context) ([]string, error) 40 Namespaces func(context.Context) ([]string, error)
41 } 41 }
42 42
43 // InstallHandlers installs http handlers. 43 // InstallHandlers installs http handlers.
44 func (s *Service) InstallHandlers(r *httprouter.Router) { 44 func (s *Service) InstallHandlers(r *router.Router) {
45 // GET so that this can be invoked from cron 45 // GET so that this can be invoked from cron
46 r.GET(fireAllTasksURL, 46 r.GET(fireAllTasksURL,
47 » » gaemiddleware.BaseProd(gaemiddleware.RequireCron(s.FireAllTasksH andler))) 47 » » append(gaemiddleware.BaseProd(), gaemiddleware.RequireCron(), s. FireAllTasksHandler)...)
48 48
49 » r.POST(processShardPattern, 49 » r.POST(processShardPattern, append(gaemiddleware.BaseProd(), gaemiddlewa re.RequireTaskQueue(baseName), s.ProcessShardHandler)...)
50 » » gaemiddleware.BaseProd(gaemiddleware.RequireTaskQueue(baseName, s.ProcessShardHandler)))
51 } 50 }
52 51
53 // FireAllTasksHandler is a http handler suitable for installation into 52 // FireAllTasksHandler is a http handler suitable for installation into
54 // a httprouter. It expects `logging` and `luci/gae` services to be installed 53 // a httprouter. It expects `logging` and `luci/gae` services to be installed
55 // into the context. 54 // into the context.
56 // 55 //
57 // FireAllTasksHandler verifies that it was called within an Appengine Cron 56 // FireAllTasksHandler verifies that it was called within an Appengine Cron
58 // request, and then invokes the FireAllTasks function. 57 // request, and then invokes the FireAllTasks function.
59 func (s *Service) FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Request, _ httprouter.Params) { 58 func (s *Service) FireAllTasksHandler(c *router.Context) {
60 » if err := s.FireAllTasks(c); err != nil { 59 » if err := s.FireAllTasks(c.Context); err != nil {
61 » » rw.WriteHeader(http.StatusInternalServerError) 60 » » c.Writer.WriteHeader(http.StatusInternalServerError)
62 » » fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) 61 » » fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err)
63 } else { 62 } else {
64 » » rw.Write([]byte("ok")) 63 » » c.Writer.Write([]byte("ok"))
65 } 64 }
66 } 65 }
67 66
68 // FireAllTasks searches for work in all namespaces, and fires off a process 67 // FireAllTasks searches for work in all namespaces, and fires off a process
69 // task for any shards it finds that have at least one Mutation present to 68 // task for any shards it finds that have at least one Mutation present to
70 // ensure that no work languishes forever. This may not be needed in 69 // ensure that no work languishes forever. This may not be needed in
71 // a constantly-loaded system with good tumble key distribution. 70 // a constantly-loaded system with good tumble key distribution.
72 func (s *Service) FireAllTasks(c context.Context) error { 71 func (s *Service) FireAllTasks(c context.Context) error {
73 cfg := getConfig(c) 72 cfg := getConfig(c)
74 shards := make(map[taskShard]struct{}, cfg.NumShards) 73 shards := make(map[taskShard]struct{}, cfg.NumShards)
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after
180 // ProcessShardHandler is a http handler suitable for installation into 179 // ProcessShardHandler is a http handler suitable for installation into
181 // a httprouter. It expects `logging` and `luci/gae` services to be installed 180 // a httprouter. It expects `logging` and `luci/gae` services to be installed
182 // into the context. 181 // into the context.
183 // 182 //
184 // ProcessShardHandler verifies that its being run as a taskqueue task and that 183 // ProcessShardHandler verifies that its being run as a taskqueue task and that
185 // the following parameters exist and are well-formed: 184 // the following parameters exist and are well-formed:
186 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. 185 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
187 // * shard_id: decimal-encoded shard identifier. 186 // * shard_id: decimal-encoded shard identifier.
188 // 187 //
189 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. 188 // ProcessShardHandler then invokes ProcessShard with the parsed parameters.
190 func (s *Service) ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) { 189 func (s *Service) ProcessShardHandler(c *router.Context) {
191 if s.Middleware != nil { 190 if s.Middleware != nil {
192 » » c = s.Middleware(c) 191 » » c.Context = s.Middleware(c.Context)
193 } 192 }
194 193
195 » tstampStr := p.ByName("timestamp") 194 » tstampStr := c.Params.ByName("timestamp")
196 » sidStr := p.ByName("shard_id") 195 » sidStr := c.Params.ByName("shard_id")
197 196
198 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) 197 tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
199 if err != nil { 198 if err != nil {
200 » » logging.Errorf(c, "bad timestamp %q", tstampStr) 199 » » logging.Errorf(c.Context, "bad timestamp %q", tstampStr)
201 » » rw.WriteHeader(http.StatusNotFound) 200 » » c.Writer.WriteHeader(http.StatusNotFound)
202 » » fmt.Fprintf(rw, "bad timestamp") 201 » » fmt.Fprintf(c.Writer, "bad timestamp")
203 return 202 return
204 } 203 }
205 204
206 sid, err := strconv.ParseUint(sidStr, 10, 64) 205 sid, err := strconv.ParseUint(sidStr, 10, 64)
207 if err != nil { 206 if err != nil {
208 » » logging.Errorf(c, "bad shardID %q", tstampStr) 207 » » logging.Errorf(c.Context, "bad shardID %q", tstampStr)
209 » » rw.WriteHeader(http.StatusNotFound) 208 » » c.Writer.WriteHeader(http.StatusNotFound)
210 » » fmt.Fprintf(rw, "bad shardID") 209 » » fmt.Fprintf(c.Writer, "bad shardID")
211 return 210 return
212 } 211 }
213 212
214 » cfg := getConfig(c) 213 » cfg := getConfig(c.Context)
215 214
216 // Get the set of namespaces to handle. 215 // Get the set of namespaces to handle.
217 » namespaces, err := s.getNamespaces(c, cfg) 216 » namespaces, err := s.getNamespaces(c.Context, cfg)
218 if err != nil { 217 if err != nil {
219 » » rw.WriteHeader(http.StatusInternalServerError) 218 » » c.Writer.WriteHeader(http.StatusInternalServerError)
220 return 219 return
221 } 220 }
222 221
223 » err = processShard(c, cfg, namespaces, time.Unix(tstamp, 0).UTC(), sid) 222 » err = processShard(c.Context, cfg, namespaces, time.Unix(tstamp, 0).UTC( ), sid)
224 if err != nil { 223 if err != nil {
225 » » logging.Errorf(c, "failure! %s", err) 224 » » logging.Errorf(c.Context, "failure! %s", err)
226 225
227 if errors.IsTransient(err) { 226 if errors.IsTransient(err) {
228 » » » rw.Header().Add(transientHTTPHeader, "true") 227 » » » c.Writer.Header().Add(transientHTTPHeader, "true")
229 } 228 }
230 » » rw.WriteHeader(http.StatusInternalServerError) 229 » » c.Writer.WriteHeader(http.StatusInternalServerError)
231 » » fmt.Fprintf(rw, "error: %s", err) 230 » » fmt.Fprintf(c.Writer, "error: %s", err)
232 } else { 231 } else {
233 » » rw.Write([]byte("ok")) 232 » » c.Writer.Write([]byte("ok"))
234 } 233 }
235 } 234 }
236 235
237 // getDatastoreNamespaces returns a list of all of the namespaces in the 236 // getDatastoreNamespaces returns a list of all of the namespaces in the
238 // datastore. 237 // datastore.
239 // 238 //
240 // This is done by issuing a datastore query for kind "__namespace__". The 239 // This is done by issuing a datastore query for kind "__namespace__". The
241 // resulting keys will have IDs for the namespaces, namely: 240 // resulting keys will have IDs for the namespaces, namely:
242 // - The default namespace will have integer ID 1. We ignore this because i f 241 // - The default namespace will have integer ID 1. We ignore this because i f
243 // we're using tumble with namespaces, we don't process the default 242 // we're using tumble with namespaces, we don't process the default
(...skipping 11 matching lines...) Expand all
255 } 254 }
256 255
257 namespaces := make([]string, 0, len(namespaceKeys)) 256 namespaces := make([]string, 0, len(namespaceKeys))
258 for _, nk := range namespaceKeys { 257 for _, nk := range namespaceKeys {
259 if ns := nk.StringID(); ns != "" { 258 if ns := nk.StringID(); ns != "" {
260 namespaces = append(namespaces, ns) 259 namespaces = append(namespaces, ns)
261 } 260 }
262 } 261 }
263 return namespaces, nil 262 return namespaces, nil
264 } 263 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698