| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package tumble | 5 package tumble |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "strconv" | 10 "strconv" |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/julienschmidt/httprouter" | |
| 15 "github.com/luci/gae/service/datastore" | 14 "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/gae/service/info" | 15 "github.com/luci/gae/service/info" |
| 17 "github.com/luci/luci-go/appengine/gaemiddleware" | 16 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 18 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 19 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/parallel" | 19 "github.com/luci/luci-go/common/parallel" |
| 20 "github.com/luci/luci-go/server/router" |
| 21 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 22 ) | 22 ) |
| 23 | 23 |
| 24 const transientHTTPHeader = "X-LUCI-Tumble-Transient" | 24 const transientHTTPHeader = "X-LUCI-Tumble-Transient" |
| 25 | 25 |
| 26 // Service is an instance of a Tumble service. It installs its handlers into an | 26 // Service is an instance of a Tumble service. It installs its handlers into an |
| 27 // HTTP router and services Tumble request tasks. | 27 // HTTP router and services Tumble request tasks. |
| 28 type Service struct { | 28 type Service struct { |
| 29 // Middleware is an optional function which allows your application to a
dd | 29 // Middleware is an optional function which allows your application to a
dd |
| 30 // application-specific resources to the context used by ProcessShardHan
dler. | 30 // application-specific resources to the context used by ProcessShardHan
dler. |
| 31 // | 31 // |
| 32 // Context will already be setup with BaseProd. | 32 // Context will already be setup with BaseProd. |
| 33 Middleware func(context.Context) context.Context | 33 Middleware func(context.Context) context.Context |
| 34 | 34 |
| 35 // Namespaces is a function that returns the datastore namespaces that T
umble | 35 // Namespaces is a function that returns the datastore namespaces that T
umble |
| 36 // will poll. | 36 // will poll. |
| 37 // | 37 // |
| 38 // If nil, Tumble will be executed against all namespaces registered in
the | 38 // If nil, Tumble will be executed against all namespaces registered in
the |
| 39 // datastore. | 39 // datastore. |
| 40 Namespaces func(context.Context) ([]string, error) | 40 Namespaces func(context.Context) ([]string, error) |
| 41 } | 41 } |
| 42 | 42 |
| 43 // InstallHandlers installs http handlers. | 43 // InstallHandlers installs http handlers. |
| 44 func (s *Service) InstallHandlers(r *httprouter.Router) { | 44 func (s *Service) InstallHandlers(r *router.Router) { |
| 45 » mc := append(gaemiddleware.BaseProd(), gaemiddleware.RequireCron) |
| 45 // GET so that this can be invoked from cron | 46 // GET so that this can be invoked from cron |
| 46 » r.GET(fireAllTasksURL, | 47 » r.GET(fireAllTasksURL, mc, s.FireAllTasksHandler) |
| 47 » » gaemiddleware.BaseProd(gaemiddleware.RequireCron(s.FireAllTasksH
andler))) | 48 » r.POST(processShardPattern, append(mc, gaemiddleware.RequireTaskQueue(ba
seName)), s.ProcessShardHandler) |
| 48 | |
| 49 » r.POST(processShardPattern, | |
| 50 » » gaemiddleware.BaseProd(gaemiddleware.RequireTaskQueue(baseName,
s.ProcessShardHandler))) | |
| 51 } | 49 } |
| 52 | 50 |
| 53 // FireAllTasksHandler is a http handler suitable for installation into | 51 // FireAllTasksHandler is an HTTP handler that expects `logging` and `luci/gae` |
| 54 // a httprouter. It expects `logging` and `luci/gae` services to be installed | 52 // services to be installed into the context. |
| 55 // into the context. | |
| 56 // | 53 // |
| 57 // FireAllTasksHandler verifies that it was called within an Appengine Cron | 54 // FireAllTasksHandler verifies that it was called within an Appengine Cron |
| 58 // request, and then invokes the FireAllTasks function. | 55 // request, and then invokes the FireAllTasks function. |
| 59 func (s *Service) FireAllTasksHandler(c context.Context, rw http.ResponseWriter,
r *http.Request, _ httprouter.Params) { | 56 func (s *Service) FireAllTasksHandler(c *router.Context) { |
| 60 » if err := s.FireAllTasks(c); err != nil { | 57 » if err := s.FireAllTasks(c.Context); err != nil { |
| 61 » » rw.WriteHeader(http.StatusInternalServerError) | 58 » » c.Writer.WriteHeader(http.StatusInternalServerError) |
| 62 » » fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) | 59 » » fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err) |
| 63 } else { | 60 } else { |
| 64 » » rw.Write([]byte("ok")) | 61 » » c.Writer.Write([]byte("ok")) |
| 65 } | 62 } |
| 66 } | 63 } |
| 67 | 64 |
| 68 // FireAllTasks searches for work in all namespaces, and fires off a process | 65 // FireAllTasks searches for work in all namespaces, and fires off a process |
| 69 // task for any shards it finds that have at least one Mutation present to | 66 // task for any shards it finds that have at least one Mutation present to |
| 70 // ensure that no work languishes forever. This may not be needed in | 67 // ensure that no work languishes forever. This may not be needed in |
| 71 // a constantly-loaded system with good tumble key distribution. | 68 // a constantly-loaded system with good tumble key distribution. |
| 72 func (s *Service) FireAllTasks(c context.Context) error { | 69 func (s *Service) FireAllTasks(c context.Context) error { |
| 73 cfg := getConfig(c) | 70 cfg := getConfig(c) |
| 74 shards := make(map[taskShard]struct{}, cfg.NumShards) | 71 shards := make(map[taskShard]struct{}, cfg.NumShards) |
| (...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 170 return | 167 return |
| 171 } | 168 } |
| 172 } else { | 169 } else { |
| 173 // Namespacing is disabled, use a single empty string. Process w
ill | 170 // Namespacing is disabled, use a single empty string. Process w
ill |
| 174 // interpret this as a signal to not use namesapces. | 171 // interpret this as a signal to not use namesapces. |
| 175 namespaces = []string{""} | 172 namespaces = []string{""} |
| 176 } | 173 } |
| 177 return | 174 return |
| 178 } | 175 } |
| 179 | 176 |
| 180 // ProcessShardHandler is a http handler suitable for installation into | 177 // ProcessShardHandler is an HTTP handler that expects `logging` and `luci/gae` |
| 181 // a httprouter. It expects `logging` and `luci/gae` services to be installed | 178 // services to be installed into the context. |
| 182 // into the context. | |
| 183 // | 179 // |
| 184 // ProcessShardHandler verifies that its being run as a taskqueue task and that | 180 // ProcessShardHandler verifies that its being run as a taskqueue task and that |
| 185 // the following parameters exist and are well-formed: | 181 // the following parameters exist and are well-formed: |
| 186 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. | 182 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. |
| 187 // * shard_id: decimal-encoded shard identifier. | 183 // * shard_id: decimal-encoded shard identifier. |
| 188 // | 184 // |
| 189 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. | 185 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. |
| 190 func (s *Service) ProcessShardHandler(c context.Context, rw http.ResponseWriter,
r *http.Request, p httprouter.Params) { | 186 func (s *Service) ProcessShardHandler(ctx *router.Context) { |
| 187 » c, rw, p := ctx.Context, ctx.Writer, ctx.Params |
| 188 |
| 191 if s.Middleware != nil { | 189 if s.Middleware != nil { |
| 192 c = s.Middleware(c) | 190 c = s.Middleware(c) |
| 193 } | 191 } |
| 194 | 192 |
| 195 tstampStr := p.ByName("timestamp") | 193 tstampStr := p.ByName("timestamp") |
| 196 sidStr := p.ByName("shard_id") | 194 sidStr := p.ByName("shard_id") |
| 197 | 195 |
| 198 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) | 196 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) |
| 199 if err != nil { | 197 if err != nil { |
| 200 logging.Errorf(c, "bad timestamp %q", tstampStr) | 198 logging.Errorf(c, "bad timestamp %q", tstampStr) |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 255 } | 253 } |
| 256 | 254 |
| 257 namespaces := make([]string, 0, len(namespaceKeys)) | 255 namespaces := make([]string, 0, len(namespaceKeys)) |
| 258 for _, nk := range namespaceKeys { | 256 for _, nk := range namespaceKeys { |
| 259 if ns := nk.StringID(); ns != "" { | 257 if ns := nk.StringID(); ns != "" { |
| 260 namespaces = append(namespaces, ns) | 258 namespaces = append(namespaces, ns) |
| 261 } | 259 } |
| 262 } | 260 } |
| 263 return namespaces, nil | 261 return namespaces, nil |
| 264 } | 262 } |
| OLD | NEW |