| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package tumble | 5 package tumble |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "strconv" | 10 "strconv" |
| 11 "sync" | 11 "sync" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/julienschmidt/httprouter" | |
| 15 "github.com/luci/gae/service/datastore" | 14 "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/gae/service/info" | 15 "github.com/luci/gae/service/info" |
| 17 "github.com/luci/luci-go/appengine/gaemiddleware" | 16 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 18 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 19 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/parallel" | 19 "github.com/luci/luci-go/common/parallel" |
| 20 "github.com/luci/luci-go/server/router" |
| 21 "golang.org/x/net/context" | 21 "golang.org/x/net/context" |
| 22 ) | 22 ) |
| 23 | 23 |
| 24 const transientHTTPHeader = "X-LUCI-Tumble-Transient" | 24 const transientHTTPHeader = "X-LUCI-Tumble-Transient" |
| 25 | 25 |
| 26 // Service is an instance of a Tumble service. It installs its handlers into an | 26 // Service is an instance of a Tumble service. It installs its handlers into an |
| 27 // HTTP router and services Tumble request tasks. | 27 // HTTP router and services Tumble request tasks. |
| 28 type Service struct { | 28 type Service struct { |
| 29 // Middleware is an optional function which allows your application to a
dd | 29 // Middleware is an optional function which allows your application to a
dd |
| 30 // application-specific resources to the context used by ProcessShardHan
dler. | 30 // application-specific resources to the context used by ProcessShardHan
dler. |
| 31 // | 31 // |
| 32 // Context will already be setup with BaseProd. | 32 // Context will already be setup with BaseProd. |
| 33 Middleware func(context.Context) context.Context | 33 Middleware func(context.Context) context.Context |
| 34 | 34 |
| 35 // Namespaces is a function that returns the datastore namespaces that T
umble | 35 // Namespaces is a function that returns the datastore namespaces that T
umble |
| 36 // will poll. | 36 // will poll. |
| 37 // | 37 // |
| 38 // If nil, Tumble will be executed against all namespaces registered in
the | 38 // If nil, Tumble will be executed against all namespaces registered in
the |
| 39 // datastore. | 39 // datastore. |
| 40 Namespaces func(context.Context) ([]string, error) | 40 Namespaces func(context.Context) ([]string, error) |
| 41 } | 41 } |
| 42 | 42 |
| 43 // InstallHandlers installs http handlers. | 43 // InstallHandlers installs http handlers. |
| 44 func (s *Service) InstallHandlers(r *httprouter.Router) { | 44 func (s *Service) InstallHandlers(r *router.Router) { |
| 45 // GET so that this can be invoked from cron | 45 // GET so that this can be invoked from cron |
| 46 r.GET(fireAllTasksURL, | 46 r.GET(fireAllTasksURL, |
| 47 » » gaemiddleware.BaseProd(gaemiddleware.RequireCron(s.FireAllTasksH
andler))) | 47 » » append(gaemiddleware.BaseProd(), gaemiddleware.RequireCron(), s.
FireAllTasksHandler)...) |
| 48 | 48 |
| 49 » r.POST(processShardPattern, | 49 » r.POST(processShardPattern, append(gaemiddleware.BaseProd(), gaemiddlewa
re.RequireTaskQueue(baseName), s.ProcessShardHandler)...) |
| 50 » » gaemiddleware.BaseProd(gaemiddleware.RequireTaskQueue(baseName,
s.ProcessShardHandler))) | |
| 51 } | 50 } |
| 52 | 51 |
| 53 // FireAllTasksHandler is a http handler suitable for installation into | 52 // FireAllTasksHandler is a http handler suitable for installation into |
| 54 // a httprouter. It expects `logging` and `luci/gae` services to be installed | 53 // a httprouter. It expects `logging` and `luci/gae` services to be installed |
| 55 // into the context. | 54 // into the context. |
| 56 // | 55 // |
| 57 // FireAllTasksHandler verifies that it was called within an Appengine Cron | 56 // FireAllTasksHandler verifies that it was called within an Appengine Cron |
| 58 // request, and then invokes the FireAllTasks function. | 57 // request, and then invokes the FireAllTasks function. |
| 59 func (s *Service) FireAllTasksHandler(c context.Context, rw http.ResponseWriter,
r *http.Request, _ httprouter.Params) { | 58 func (s *Service) FireAllTasksHandler(c *router.Context) { |
| 60 » if err := s.FireAllTasks(c); err != nil { | 59 » if err := s.FireAllTasks(c.Context); err != nil { |
| 61 » » rw.WriteHeader(http.StatusInternalServerError) | 60 » » c.Writer.WriteHeader(http.StatusInternalServerError) |
| 62 » » fmt.Fprintf(rw, "fire_all_tasks failed: %s", err) | 61 » » fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err) |
| 63 } else { | 62 } else { |
| 64 » » rw.Write([]byte("ok")) | 63 » » c.Writer.Write([]byte("ok")) |
| 65 } | 64 } |
| 66 } | 65 } |
| 67 | 66 |
| 68 // FireAllTasks searches for work in all namespaces, and fires off a process | 67 // FireAllTasks searches for work in all namespaces, and fires off a process |
| 69 // task for any shards it finds that have at least one Mutation present to | 68 // task for any shards it finds that have at least one Mutation present to |
| 70 // ensure that no work languishes forever. This may not be needed in | 69 // ensure that no work languishes forever. This may not be needed in |
| 71 // a constantly-loaded system with good tumble key distribution. | 70 // a constantly-loaded system with good tumble key distribution. |
| 72 func (s *Service) FireAllTasks(c context.Context) error { | 71 func (s *Service) FireAllTasks(c context.Context) error { |
| 73 cfg := getConfig(c) | 72 cfg := getConfig(c) |
| 74 shards := make(map[taskShard]struct{}, cfg.NumShards) | 73 shards := make(map[taskShard]struct{}, cfg.NumShards) |
| (...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 180 // ProcessShardHandler is a http handler suitable for installation into | 179 // ProcessShardHandler is a http handler suitable for installation into |
| 181 // a httprouter. It expects `logging` and `luci/gae` services to be installed | 180 // a httprouter. It expects `logging` and `luci/gae` services to be installed |
| 182 // into the context. | 181 // into the context. |
| 183 // | 182 // |
| 184 // ProcessShardHandler verifies that its being run as a taskqueue task and that | 183 // ProcessShardHandler verifies that its being run as a taskqueue task and that |
| 185 // the following parameters exist and are well-formed: | 184 // the following parameters exist and are well-formed: |
| 186 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. | 185 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. |
| 187 // * shard_id: decimal-encoded shard identifier. | 186 // * shard_id: decimal-encoded shard identifier. |
| 188 // | 187 // |
| 189 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. | 188 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. |
| 190 func (s *Service) ProcessShardHandler(c context.Context, rw http.ResponseWriter,
r *http.Request, p httprouter.Params) { | 189 func (s *Service) ProcessShardHandler(c *router.Context) { |
| 191 if s.Middleware != nil { | 190 if s.Middleware != nil { |
| 192 » » c = s.Middleware(c) | 191 » » c.Context = s.Middleware(c.Context) |
| 193 } | 192 } |
| 194 | 193 |
| 195 » tstampStr := p.ByName("timestamp") | 194 » tstampStr := c.Params.ByName("timestamp") |
| 196 » sidStr := p.ByName("shard_id") | 195 » sidStr := c.Params.ByName("shard_id") |
| 197 | 196 |
| 198 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) | 197 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) |
| 199 if err != nil { | 198 if err != nil { |
| 200 » » logging.Errorf(c, "bad timestamp %q", tstampStr) | 199 » » logging.Errorf(c.Context, "bad timestamp %q", tstampStr) |
| 201 » » rw.WriteHeader(http.StatusNotFound) | 200 » » c.Writer.WriteHeader(http.StatusNotFound) |
| 202 » » fmt.Fprintf(rw, "bad timestamp") | 201 » » fmt.Fprintf(c.Writer, "bad timestamp") |
| 203 return | 202 return |
| 204 } | 203 } |
| 205 | 204 |
| 206 sid, err := strconv.ParseUint(sidStr, 10, 64) | 205 sid, err := strconv.ParseUint(sidStr, 10, 64) |
| 207 if err != nil { | 206 if err != nil { |
| 208 » » logging.Errorf(c, "bad shardID %q", tstampStr) | 207 » » logging.Errorf(c.Context, "bad shardID %q", tstampStr) |
| 209 » » rw.WriteHeader(http.StatusNotFound) | 208 » » c.Writer.WriteHeader(http.StatusNotFound) |
| 210 » » fmt.Fprintf(rw, "bad shardID") | 209 » » fmt.Fprintf(c.Writer, "bad shardID") |
| 211 return | 210 return |
| 212 } | 211 } |
| 213 | 212 |
| 214 » cfg := getConfig(c) | 213 » cfg := getConfig(c.Context) |
| 215 | 214 |
| 216 // Get the set of namespaces to handle. | 215 // Get the set of namespaces to handle. |
| 217 » namespaces, err := s.getNamespaces(c, cfg) | 216 » namespaces, err := s.getNamespaces(c.Context, cfg) |
| 218 if err != nil { | 217 if err != nil { |
| 219 » » rw.WriteHeader(http.StatusInternalServerError) | 218 » » c.Writer.WriteHeader(http.StatusInternalServerError) |
| 220 return | 219 return |
| 221 } | 220 } |
| 222 | 221 |
| 223 » err = processShard(c, cfg, namespaces, time.Unix(tstamp, 0).UTC(), sid) | 222 » err = processShard(c.Context, cfg, namespaces, time.Unix(tstamp, 0).UTC(
), sid) |
| 224 if err != nil { | 223 if err != nil { |
| 225 » » logging.Errorf(c, "failure! %s", err) | 224 » » logging.Errorf(c.Context, "failure! %s", err) |
| 226 | 225 |
| 227 if errors.IsTransient(err) { | 226 if errors.IsTransient(err) { |
| 228 » » » rw.Header().Add(transientHTTPHeader, "true") | 227 » » » c.Writer.Header().Add(transientHTTPHeader, "true") |
| 229 } | 228 } |
| 230 » » rw.WriteHeader(http.StatusInternalServerError) | 229 » » c.Writer.WriteHeader(http.StatusInternalServerError) |
| 231 » » fmt.Fprintf(rw, "error: %s", err) | 230 » » fmt.Fprintf(c.Writer, "error: %s", err) |
| 232 } else { | 231 } else { |
| 233 » » rw.Write([]byte("ok")) | 232 » » c.Writer.Write([]byte("ok")) |
| 234 } | 233 } |
| 235 } | 234 } |
| 236 | 235 |
| 237 // getDatastoreNamespaces returns a list of all of the namespaces in the | 236 // getDatastoreNamespaces returns a list of all of the namespaces in the |
| 238 // datastore. | 237 // datastore. |
| 239 // | 238 // |
| 240 // This is done by issuing a datastore query for kind "__namespace__". The | 239 // This is done by issuing a datastore query for kind "__namespace__". The |
| 241 // resulting keys will have IDs for the namespaces, namely: | 240 // resulting keys will have IDs for the namespaces, namely: |
| 242 // - The default namespace will have integer ID 1. We ignore this because i
f | 241 // - The default namespace will have integer ID 1. We ignore this because i
f |
| 243 // we're using tumble with namespaces, we don't process the default | 242 // we're using tumble with namespaces, we don't process the default |
| (...skipping 11 matching lines...) Expand all Loading... |
| 255 } | 254 } |
| 256 | 255 |
| 257 namespaces := make([]string, 0, len(namespaceKeys)) | 256 namespaces := make([]string, 0, len(namespaceKeys)) |
| 258 for _, nk := range namespaceKeys { | 257 for _, nk := range namespaceKeys { |
| 259 if ns := nk.StringID(); ns != "" { | 258 if ns := nk.StringID(); ns != "" { |
| 260 namespaces = append(namespaces, ns) | 259 namespaces = append(namespaces, ns) |
| 261 } | 260 } |
| 262 } | 261 } |
| 263 return namespaces, nil | 262 return namespaces, nil |
| 264 } | 263 } |
| OLD | NEW |