| Index: appengine/cmd/cron/frontend/handler.go
|
| diff --git a/appengine/cmd/cron/frontend/handler.go b/appengine/cmd/cron/frontend/handler.go
|
| index 9846aaf48fbabcb7b12bbfefeaaf1c41080255ef..76a320c536ff090205d56c127b4a0b761ea3842c 100644
|
| --- a/appengine/cmd/cron/frontend/handler.go
|
| +++ b/appengine/cmd/cron/frontend/handler.go
|
| @@ -17,29 +17,28 @@ import (
|
| "encoding/binary"
|
| "fmt"
|
| "io/ioutil"
|
| "math/rand"
|
| "net/http"
|
| "net/url"
|
| "strconv"
|
| "sync"
|
| "time"
|
|
|
| - "github.com/julienschmidt/httprouter"
|
| "golang.org/x/net/context"
|
| "google.golang.org/appengine"
|
|
|
| "github.com/luci/gae/service/info"
|
| "github.com/luci/gae/service/taskqueue"
|
|
|
| "github.com/luci/luci-go/server/auth"
|
| - "github.com/luci/luci-go/server/middleware"
|
| + "github.com/luci/luci-go/server/router"
|
|
|
| "github.com/luci/luci-go/appengine/gaeauth/server"
|
| "github.com/luci/luci-go/appengine/gaeconfig"
|
| "github.com/luci/luci-go/appengine/gaemiddleware"
|
|
|
| "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/config/impl/memory"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logging"
|
|
|
| @@ -63,54 +62,47 @@ var (
|
| managers = []task.Manager{
|
| &buildbucket.TaskManager{},
|
| &noop.TaskManager{},
|
| &swarming.TaskManager{},
|
| &urlfetch.TaskManager{},
|
| }
|
| )
|
|
|
| //// Helpers.
|
|
|
| -type handler func(c *requestContext)
|
| -
|
| -type requestContext struct {
|
| - context.Context
|
| -
|
| - w http.ResponseWriter
|
| - r *http.Request
|
| - p httprouter.Params
|
| -}
|
| +// requestContext is used to add helper methods.
|
| +type requestContext router.Context
|
|
|
| // fail writes error message to the log and the response and sets status code.
|
| func (c *requestContext) fail(code int, msg string, args ...interface{}) {
|
| body := fmt.Sprintf(msg, args...)
|
| - logging.Errorf(c, "HTTP %d: %s", code, body)
|
| - http.Error(c.w, body, code)
|
| + logging.Errorf(c.Context, "HTTP %d: %s", code, body)
|
| + http.Error(c.Writer, body, code)
|
| }
|
|
|
| // err sets status to 500 on transient errors or 202 on fatal ones. Returning
|
| // status code in range [200–299] is the only way to tell Task Queues to stop
|
| // retrying the task.
|
| func (c *requestContext) err(e error, msg string, args ...interface{}) {
|
| code := 500
|
| if !errors.IsTransient(e) {
|
| code = 202
|
| }
|
| args = append(args, e)
|
| c.fail(code, msg+" - %s", args...)
|
| }
|
|
|
| // ok sets status to 200 and puts "OK" in response.
|
| func (c *requestContext) ok() {
|
| - c.w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
| - c.w.WriteHeader(200)
|
| - fmt.Fprintln(c.w, "OK")
|
| + c.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
| + c.Writer.WriteHeader(200)
|
| + fmt.Fprintln(c.Writer, "OK")
|
| }
|
|
|
| ///
|
|
|
| var globalInit sync.Once
|
|
|
| // initializeGlobalState does one time initialization for stuff that needs
|
| // active GAE context.
|
| func initializeGlobalState(c context.Context) {
|
| if info.Get(c).IsDevAppServer() {
|
| @@ -125,52 +117,36 @@ func initializeGlobalState(c context.Context) {
|
| // getConfigImpl returns config.Interface implementation to use from the
|
| // catalog.
|
| func getConfigImpl(c context.Context) (config.Interface, error) {
|
| // Use fake config data on dev server for simplicity.
|
| if info.Get(c).IsDevAppServer() {
|
| return memory.New(devServerConfigs()), nil
|
| }
|
| return gaeconfig.New(c)
|
| }
|
|
|
| -// wrap converts the handler to format accepted by middleware lib. It also adds
|
| -// context initialization code.
|
| -func wrap(h handler) middleware.Handler {
|
| - return func(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| - h(&requestContext{c, w, r, p})
|
| - }
|
| -}
|
| -
|
| -// base starts middleware chain. It initializes prod context and sets up
|
| +// base returns middleware chain. It initializes prod context and sets up
|
| // authentication config.
|
| -func base(h middleware.Handler) httprouter.Handle {
|
| +func base() router.MiddlewareChain {
|
| methods := auth.Authenticator{
|
| &server.OAuth2Method{Scopes: []string{server.EmailScope}},
|
| server.CookieAuth,
|
| &server.InboundAppIDAuthMethod{},
|
| }
|
| - wrapper := func(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
|
| - globalInit.Do(func() { initializeGlobalState(c) })
|
| - c = auth.SetAuthenticator(c, methods)
|
| - h(c, w, r, p)
|
| - }
|
| - return gaemiddleware.BaseProd(wrapper)
|
| -}
|
| -
|
| -// cronHandler returns handler intended for cron jobs.
|
| -func cronHandler(h handler) httprouter.Handle {
|
| - return base(gaemiddleware.RequireCron(wrap(h)))
|
| -}
|
| -
|
| -// taskQueueHandler returns handler intended for task queue calls.
|
| -func taskQueueHandler(name string, h handler) httprouter.Handle {
|
| - return base(gaemiddleware.RequireTaskQueue(name, wrap(h)))
|
| + return append(
|
| + gaemiddleware.BaseProd(),
|
| + func(c *router.Context, next router.Handler) {
|
| + globalInit.Do(func() { initializeGlobalState(c.Context) })
|
| + c.Context = auth.SetAuthenticator(c.Context, methods)
|
| + next(c)
|
| + },
|
| + )
|
| }
|
|
|
| //// Routes.
|
|
|
| func init() {
|
| // Dev server likes to restart a lot, and upon a restart math/rand seed is
|
| // always set to 1, resulting in lots of presumably "random" IDs not being
|
| // very random. Seed it with real randomness.
|
| var seed int64
|
| if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); err != nil {
|
| @@ -188,155 +164,162 @@ func init() {
|
| globalEngine = engine.NewEngine(engine.Config{
|
| Catalog: globalCatalog,
|
| TimersQueuePath: "/internal/tasks/timers",
|
| TimersQueueName: "timers",
|
| InvocationsQueuePath: "/internal/tasks/invocations",
|
| InvocationsQueueName: "invocations",
|
| PubSubPushPath: "/pubsub",
|
| })
|
|
|
| // Setup HTTP routes.
|
| - router := httprouter.New()
|
| + r := router.New()
|
| + basemw := base()
|
|
|
| - gaemiddleware.InstallHandlers(router, base)
|
| - ui.InstallHandlers(router, base, ui.Config{
|
| + gaemiddleware.InstallHandlers(r, basemw)
|
| + ui.InstallHandlers(r, basemw, ui.Config{
|
| Engine: globalEngine,
|
| TemplatesPath: "templates",
|
| })
|
|
|
| - router.GET("/_ah/warmup", base(wrap(warmupHandler)))
|
| - router.GET("/_ah/start", base(wrap(warmupHandler)))
|
| - router.POST("/pubsub", base(wrap(pubsubPushHandler)))
|
| - router.GET("/internal/cron/read-config", cronHandler(readConfigCron))
|
| - router.POST("/internal/tasks/read-project-config", taskQueueHandler("read-project-config", readProjectConfigTask))
|
| - router.POST("/internal/tasks/timers", taskQueueHandler("timers", actionTask))
|
| - router.POST("/internal/tasks/invocations", taskQueueHandler("invocations", actionTask))
|
| + r.GET("/_ah/warmup", basemw, warmupHandler)
|
| + r.GET("/_ah/start", basemw, warmupHandler)
|
| + r.POST("/pubsub", basemw, pubsubPushHandler)
|
| + r.GET("/internal/cron/read-config", append(basemw, gaemiddleware.RequireCron), readConfigCron)
|
| + r.POST("/internal/tasks/read-project-config", append(basemw, gaemiddleware.RequireTaskQueue("read-project-config")), readProjectConfigTask)
|
| + r.POST("/internal/tasks/timers", append(basemw, gaemiddleware.RequireTaskQueue("timers")), actionTask)
|
| + r.POST("/internal/tasks/invocations", append(basemw, gaemiddleware.RequireTaskQueue("invocations")), actionTask)
|
|
|
| // Devserver can't accept PubSub pushes, so allow manual pulls instead to
|
| // simplify local development.
|
| if appengine.IsDevAppServer() {
|
| - router.GET("/pubsub/pull/:ManagerName/:Publisher", base(wrap(pubsubPullHandler)))
|
| + r.GET("/pubsub/pull/:ManagerName/:Publisher", basemw, pubsubPullHandler)
|
| }
|
|
|
| - http.DefaultServeMux.Handle("/", router)
|
| + http.DefaultServeMux.Handle("/", r)
|
| }
|
|
|
| // warmupHandler warms in-memory caches.
|
| -func warmupHandler(rc *requestContext) {
|
| - if err := server.Warmup(rc); err != nil {
|
| +func warmupHandler(c *router.Context) {
|
| + rc := requestContext(*c)
|
| + if err := server.Warmup(rc.Context); err != nil {
|
| rc.fail(500, "Failed to warmup OpenID: %s", err)
|
| return
|
| }
|
| rc.ok()
|
| }
|
|
|
| // pubsubPushHandler handles incoming PubSub messages.
|
| -func pubsubPushHandler(rc *requestContext) {
|
| - body, err := ioutil.ReadAll(rc.r.Body)
|
| +func pubsubPushHandler(c *router.Context) {
|
| + rc := requestContext(*c)
|
| + body, err := ioutil.ReadAll(rc.Request.Body)
|
| if err != nil {
|
| rc.fail(500, "Failed to read the request: %s", err)
|
| return
|
| }
|
| - if err = globalEngine.ProcessPubSubPush(rc, body); err != nil {
|
| + if err = globalEngine.ProcessPubSubPush(rc.Context, body); err != nil {
|
| rc.err(err, "Failed to process incoming PubSub push")
|
| return
|
| }
|
| rc.ok()
|
| }
|
|
|
| // pubsubPullHandler is called on dev server by developer to pull pubsub
|
| // messages from a topic created for a publisher.
|
| -func pubsubPullHandler(rc *requestContext) {
|
| +func pubsubPullHandler(c *router.Context) {
|
| + rc := requestContext(*c)
|
| if !appengine.IsDevAppServer() {
|
| rc.fail(403, "Not a dev server")
|
| return
|
| }
|
| err := globalEngine.PullPubSubOnDevServer(
|
| - rc, rc.p.ByName("ManagerName"), rc.p.ByName("Publisher"))
|
| + rc.Context, rc.Params.ByName("ManagerName"), rc.Params.ByName("Publisher"))
|
| if err != nil {
|
| rc.err(err, "Failed to pull PubSub messages")
|
| } else {
|
| rc.ok()
|
| }
|
| }
|
|
|
| // readConfigCron grabs a list of projects from the catalog and datastore and
|
| // dispatches task queue tasks to update each project's cron jobs.
|
| -func readConfigCron(c *requestContext) {
|
| +func readConfigCron(c *router.Context) {
|
| + rc := requestContext(*c)
|
| projectsToVisit := map[string]bool{}
|
|
|
| // Visit all projects in the catalog.
|
| - ctx, _ := context.WithTimeout(c.Context, 150*time.Second)
|
| + ctx, _ := context.WithTimeout(rc.Context, 150*time.Second)
|
| projects, err := globalCatalog.GetAllProjects(ctx)
|
| if err != nil {
|
| - c.err(err, "Failed to grab a list of project IDs from catalog")
|
| + rc.err(err, "Failed to grab a list of project IDs from catalog")
|
| return
|
| }
|
| for _, id := range projects {
|
| projectsToVisit[id] = true
|
| }
|
|
|
| // Also visit all registered projects that do not show up in the catalog
|
| // listing anymore. It will unregister all crons belonging to them.
|
| - existing, err := globalEngine.GetAllProjects(c.Context)
|
| + existing, err := globalEngine.GetAllProjects(rc.Context)
|
| if err != nil {
|
| - c.err(err, "Failed to grab a list of project IDs from datastore")
|
| + rc.err(err, "Failed to grab a list of project IDs from datastore")
|
| return
|
| }
|
| for _, id := range existing {
|
| projectsToVisit[id] = true
|
| }
|
|
|
| // Handle each project in its own task to avoid "bad" projects (e.g. ones with
|
| // lots of crons) to slow down "good" ones.
|
| tasks := make([]*taskqueue.Task, 0, len(projectsToVisit))
|
| for projectID := range projectsToVisit {
|
| tasks = append(tasks, &taskqueue.Task{
|
| Path: "/internal/tasks/read-project-config?projectID=" + url.QueryEscape(projectID),
|
| })
|
| }
|
| - tq := taskqueue.Get(c)
|
| + tq := taskqueue.Get(rc.Context)
|
| if err = tq.AddMulti(tasks, "read-project-config"); err != nil {
|
| - c.err(errors.WrapTransient(err), "Failed to add tasks to task queue")
|
| + rc.err(errors.WrapTransient(err), "Failed to add tasks to task queue")
|
| } else {
|
| - c.ok()
|
| + rc.ok()
|
| }
|
| }
|
|
|
| // readProjectConfigTask grabs a list of cron jobs in a project from catalog,
|
| // updates all changed cron jobs, adds new ones, disables old ones.
|
| -func readProjectConfigTask(c *requestContext) {
|
| - projectID := c.r.URL.Query().Get("projectID")
|
| +func readProjectConfigTask(c *router.Context) {
|
| + rc := requestContext(*c)
|
| + projectID := rc.Request.URL.Query().Get("projectID")
|
| if projectID == "" {
|
| // Return 202 to avoid retry, it is fatal error.
|
| - c.fail(202, "Missing projectID query attribute")
|
| + rc.fail(202, "Missing projectID query attribute")
|
| return
|
| }
|
| - ctx, _ := context.WithTimeout(c.Context, 150*time.Second)
|
| + ctx, _ := context.WithTimeout(rc.Context, 150*time.Second)
|
| jobs, err := globalCatalog.GetProjectJobs(ctx, projectID)
|
| if err != nil {
|
| - c.err(err, "Failed to query for a list of jobs")
|
| + rc.err(err, "Failed to query for a list of jobs")
|
| return
|
| }
|
| - if err = globalEngine.UpdateProjectJobs(c.Context, projectID, jobs); err != nil {
|
| - c.err(err, "Failed to update some cron jobs")
|
| + if err = globalEngine.UpdateProjectJobs(rc.Context, projectID, jobs); err != nil {
|
| + rc.err(err, "Failed to update some cron jobs")
|
| return
|
| }
|
| - c.ok()
|
| + rc.ok()
|
| }
|
|
|
| // actionTask is used to route actions emitted by cron job state transitions
|
| // back into Engine (see enqueueActions).
|
| -func actionTask(c *requestContext) {
|
| - body, err := ioutil.ReadAll(c.r.Body)
|
| +func actionTask(c *router.Context) {
|
| + rc := requestContext(*c)
|
| + body, err := ioutil.ReadAll(rc.Request.Body)
|
| if err != nil {
|
| - c.fail(500, "Failed to read request body: %s", err)
|
| + rc.fail(500, "Failed to read request body: %s", err)
|
| return
|
| }
|
| - count, _ := strconv.Atoi(c.r.Header.Get("X-AppEngine-TaskExecutionCount"))
|
| - err = globalEngine.ExecuteSerializedAction(c.Context, body, count)
|
| + count, _ := strconv.Atoi(rc.Request.Header.Get("X-AppEngine-TaskExecutionCount"))
|
| + err = globalEngine.ExecuteSerializedAction(rc.Context, body, count)
|
| if err != nil {
|
| - c.err(err, "Error when executing the action")
|
| + rc.err(err, "Error when executing the action")
|
| return
|
| }
|
| - c.ok()
|
| + rc.ok()
|
| }
|
|
|