| Index: appengine/tumble/service.go
|
| diff --git a/appengine/tumble/service.go b/appengine/tumble/service.go
|
| index fa1efbb6f4cbaee980b41c6fe53a377a06fb502e..f30aa0b255e4469cbb5ff03c74bb3bed26aafc24 100644
|
| --- a/appengine/tumble/service.go
|
| +++ b/appengine/tumble/service.go
|
| @@ -4,27 +4,27 @@
|
|
|
| package tumble
|
|
|
| import (
|
| "fmt"
|
| "net/http"
|
| "strconv"
|
| "sync"
|
| "time"
|
|
|
| - "github.com/julienschmidt/httprouter"
|
| "github.com/luci/gae/service/datastore"
|
| "github.com/luci/gae/service/info"
|
| "github.com/luci/luci-go/appengine/gaemiddleware"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/parallel"
|
| + "github.com/luci/luci-go/server/router"
|
| "golang.org/x/net/context"
|
| )
|
|
|
| const transientHTTPHeader = "X-LUCI-Tumble-Transient"
|
|
|
| // Service is an instance of a Tumble service. It installs its handlers into an
|
| // HTTP router and services Tumble request tasks.
|
| type Service struct {
|
| // Middleware is an optional function which allows your application to add
|
| // application-specific resources to the context used by ProcessShardHandler.
|
| @@ -34,41 +34,38 @@ type Service struct {
|
|
|
| // Namespaces is a function that returns the datastore namespaces that Tumble
|
| // will poll.
|
| //
|
| // If nil, Tumble will be executed against all namespaces registered in the
|
| // datastore.
|
| Namespaces func(context.Context) ([]string, error)
|
| }
|
|
|
| // InstallHandlers installs http handlers.
|
| -func (s *Service) InstallHandlers(r *httprouter.Router) {
|
| +func (s *Service) InstallHandlers(r *router.Router) {
|
| + mc := append(gaemiddleware.BaseProd(), gaemiddleware.RequireCron)
|
| // GET so that this can be invoked from cron
|
| - r.GET(fireAllTasksURL,
|
| - gaemiddleware.BaseProd(gaemiddleware.RequireCron(s.FireAllTasksHandler)))
|
| -
|
| - r.POST(processShardPattern,
|
| - gaemiddleware.BaseProd(gaemiddleware.RequireTaskQueue(baseName, s.ProcessShardHandler)))
|
| + r.GET(fireAllTasksURL, mc, s.FireAllTasksHandler)
|
| + r.POST(processShardPattern, append(mc, gaemiddleware.RequireTaskQueue(baseName)), s.ProcessShardHandler)
|
| }
|
|
|
| -// FireAllTasksHandler is a http handler suitable for installation into
|
| -// a httprouter. It expects `logging` and `luci/gae` services to be installed
|
| -// into the context.
|
| +// FireAllTasksHandler is an HTTP handler that expects `logging` and `luci/gae`
|
| +// services to be installed into the context.
|
| //
|
| // FireAllTasksHandler verifies that it was called within an Appengine Cron
|
| // request, and then invokes the FireAllTasks function.
|
| -func (s *Service) FireAllTasksHandler(c context.Context, rw http.ResponseWriter, r *http.Request, _ httprouter.Params) {
|
| - if err := s.FireAllTasks(c); err != nil {
|
| - rw.WriteHeader(http.StatusInternalServerError)
|
| - fmt.Fprintf(rw, "fire_all_tasks failed: %s", err)
|
| +func (s *Service) FireAllTasksHandler(c *router.Context) {
|
| + if err := s.FireAllTasks(c.Context); err != nil {
|
| + c.Writer.WriteHeader(http.StatusInternalServerError)
|
| + fmt.Fprintf(c.Writer, "fire_all_tasks failed: %s", err)
|
| } else {
|
| - rw.Write([]byte("ok"))
|
| + c.Writer.Write([]byte("ok"))
|
| }
|
| }
|
|
|
| // FireAllTasks searches for work in all namespaces, and fires off a process
|
| // task for any shards it finds that have at least one Mutation present to
|
| // ensure that no work languishes forever. This may not be needed in
|
| // a constantly-loaded system with good tumble key distribution.
|
| func (s *Service) FireAllTasks(c context.Context) error {
|
| cfg := getConfig(c)
|
| shards := make(map[taskShard]struct{}, cfg.NumShards)
|
| @@ -170,31 +167,32 @@ func (s *Service) getNamespaces(c context.Context, cfg *Config) (namespaces []st
|
| return
|
| }
|
| } else {
|
| // Namespacing is disabled, use a single empty string. Process will
|
| // interpret this as a signal to not use namesapces.
|
| namespaces = []string{""}
|
| }
|
| return
|
| }
|
|
|
| -// ProcessShardHandler is a http handler suitable for installation into
|
| -// a httprouter. It expects `logging` and `luci/gae` services to be installed
|
| -// into the context.
|
| +// ProcessShardHandler is an HTTP handler that expects `logging` and `luci/gae`
|
| +// services to be installed into the context.
|
| //
|
| // ProcessShardHandler verifies that its being run as a taskqueue task and that
|
| // the following parameters exist and are well-formed:
|
| // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
|
| // * shard_id: decimal-encoded shard identifier.
|
| //
|
| // ProcessShardHandler then invokes ProcessShard with the parsed parameters.
|
| -func (s *Service) ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| +func (s *Service) ProcessShardHandler(ctx *router.Context) {
|
| + c, rw, p := ctx.Context, ctx.Writer, ctx.Params
|
| +
|
| if s.Middleware != nil {
|
| c = s.Middleware(c)
|
| }
|
|
|
| tstampStr := p.ByName("timestamp")
|
| sidStr := p.ByName("shard_id")
|
|
|
| tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
|
| if err != nil {
|
| logging.Errorf(c, "bad timestamp %q", tstampStr)
|
|
|