Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1828)

Unified Diff: appengine/cmd/cron/frontend/handler.go

Issue 2043423004: Make HTTP middleware easier to use (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: gaemiddleware: add middleware func for WithProd Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | appengine/cmd/cron/ui/common.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/cron/frontend/handler.go
diff --git a/appengine/cmd/cron/frontend/handler.go b/appengine/cmd/cron/frontend/handler.go
index 9846aaf48fbabcb7b12bbfefeaaf1c41080255ef..76a320c536ff090205d56c127b4a0b761ea3842c 100644
--- a/appengine/cmd/cron/frontend/handler.go
+++ b/appengine/cmd/cron/frontend/handler.go
@@ -17,29 +17,28 @@ import (
"encoding/binary"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"strconv"
"sync"
"time"
- "github.com/julienschmidt/httprouter"
"golang.org/x/net/context"
"google.golang.org/appengine"
"github.com/luci/gae/service/info"
"github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/server/auth"
- "github.com/luci/luci-go/server/middleware"
+ "github.com/luci/luci-go/server/router"
"github.com/luci/luci-go/appengine/gaeauth/server"
"github.com/luci/luci-go/appengine/gaeconfig"
"github.com/luci/luci-go/appengine/gaemiddleware"
"github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/config/impl/memory"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
@@ -63,54 +62,47 @@ var (
managers = []task.Manager{
&buildbucket.TaskManager{},
&noop.TaskManager{},
&swarming.TaskManager{},
&urlfetch.TaskManager{},
}
)
//// Helpers.
-type handler func(c *requestContext)
-
-type requestContext struct {
- context.Context
-
- w http.ResponseWriter
- r *http.Request
- p httprouter.Params
-}
+// requestContext is used to add helper methods.
+type requestContext router.Context
// fail writes error message to the log and the response and sets status code.
func (c *requestContext) fail(code int, msg string, args ...interface{}) {
body := fmt.Sprintf(msg, args...)
- logging.Errorf(c, "HTTP %d: %s", code, body)
- http.Error(c.w, body, code)
+ logging.Errorf(c.Context, "HTTP %d: %s", code, body)
+ http.Error(c.Writer, body, code)
}
// err sets status to 500 on transient errors or 202 on fatal ones. Returning
// status code in range [200–299] is the only way to tell Task Queues to stop
// retrying the task.
func (c *requestContext) err(e error, msg string, args ...interface{}) {
code := 500
if !errors.IsTransient(e) {
code = 202
}
args = append(args, e)
c.fail(code, msg+" - %s", args...)
}
// ok sets status to 200 and puts "OK" in response.
func (c *requestContext) ok() {
- c.w.Header().Set("Content-Type", "text/plain; charset=utf-8")
- c.w.WriteHeader(200)
- fmt.Fprintln(c.w, "OK")
+ c.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8")
+ c.Writer.WriteHeader(200)
+ fmt.Fprintln(c.Writer, "OK")
}
///
var globalInit sync.Once
// initializeGlobalState does one time initialization for stuff that needs
// active GAE context.
func initializeGlobalState(c context.Context) {
if info.Get(c).IsDevAppServer() {
@@ -125,52 +117,36 @@ func initializeGlobalState(c context.Context) {
// getConfigImpl returns config.Interface implementation to use from the
// catalog.
func getConfigImpl(c context.Context) (config.Interface, error) {
// Use fake config data on dev server for simplicity.
if info.Get(c).IsDevAppServer() {
return memory.New(devServerConfigs()), nil
}
return gaeconfig.New(c)
}
-// wrap converts the handler to format accepted by middleware lib. It also adds
-// context initialization code.
-func wrap(h handler) middleware.Handler {
- return func(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
- h(&requestContext{c, w, r, p})
- }
-}
-
-// base starts middleware chain. It initializes prod context and sets up
+// base returns middleware chain. It initializes prod context and sets up
// authentication config.
-func base(h middleware.Handler) httprouter.Handle {
+func base() router.MiddlewareChain {
methods := auth.Authenticator{
&server.OAuth2Method{Scopes: []string{server.EmailScope}},
server.CookieAuth,
&server.InboundAppIDAuthMethod{},
}
- wrapper := func(c context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
- globalInit.Do(func() { initializeGlobalState(c) })
- c = auth.SetAuthenticator(c, methods)
- h(c, w, r, p)
- }
- return gaemiddleware.BaseProd(wrapper)
-}
-
-// cronHandler returns handler intended for cron jobs.
-func cronHandler(h handler) httprouter.Handle {
- return base(gaemiddleware.RequireCron(wrap(h)))
-}
-
-// taskQueueHandler returns handler intended for task queue calls.
-func taskQueueHandler(name string, h handler) httprouter.Handle {
- return base(gaemiddleware.RequireTaskQueue(name, wrap(h)))
+ return append(
+ gaemiddleware.BaseProd(),
+ func(c *router.Context, next router.Handler) {
+ globalInit.Do(func() { initializeGlobalState(c.Context) })
+ c.Context = auth.SetAuthenticator(c.Context, methods)
+ next(c)
+ },
+ )
}
//// Routes.
func init() {
// Dev server likes to restart a lot, and upon a restart math/rand seed is
// always set to 1, resulting in lots of presumably "random" IDs not being
// very random. Seed it with real randomness.
var seed int64
if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); err != nil {
@@ -188,155 +164,162 @@ func init() {
globalEngine = engine.NewEngine(engine.Config{
Catalog: globalCatalog,
TimersQueuePath: "/internal/tasks/timers",
TimersQueueName: "timers",
InvocationsQueuePath: "/internal/tasks/invocations",
InvocationsQueueName: "invocations",
PubSubPushPath: "/pubsub",
})
// Setup HTTP routes.
- router := httprouter.New()
+ r := router.New()
+ basemw := base()
- gaemiddleware.InstallHandlers(router, base)
- ui.InstallHandlers(router, base, ui.Config{
+ gaemiddleware.InstallHandlers(r, basemw)
+ ui.InstallHandlers(r, basemw, ui.Config{
Engine: globalEngine,
TemplatesPath: "templates",
})
- router.GET("/_ah/warmup", base(wrap(warmupHandler)))
- router.GET("/_ah/start", base(wrap(warmupHandler)))
- router.POST("/pubsub", base(wrap(pubsubPushHandler)))
- router.GET("/internal/cron/read-config", cronHandler(readConfigCron))
- router.POST("/internal/tasks/read-project-config", taskQueueHandler("read-project-config", readProjectConfigTask))
- router.POST("/internal/tasks/timers", taskQueueHandler("timers", actionTask))
- router.POST("/internal/tasks/invocations", taskQueueHandler("invocations", actionTask))
+ r.GET("/_ah/warmup", basemw, warmupHandler)
+ r.GET("/_ah/start", basemw, warmupHandler)
+ r.POST("/pubsub", basemw, pubsubPushHandler)
+ r.GET("/internal/cron/read-config", append(basemw, gaemiddleware.RequireCron), readConfigCron)
+ r.POST("/internal/tasks/read-project-config", append(basemw, gaemiddleware.RequireTaskQueue("read-project-config")), readProjectConfigTask)
+ r.POST("/internal/tasks/timers", append(basemw, gaemiddleware.RequireTaskQueue("timers")), actionTask)
+ r.POST("/internal/tasks/invocations", append(basemw, gaemiddleware.RequireTaskQueue("invocations")), actionTask)
// Devserver can't accept PubSub pushes, so allow manual pulls instead to
// simplify local development.
if appengine.IsDevAppServer() {
- router.GET("/pubsub/pull/:ManagerName/:Publisher", base(wrap(pubsubPullHandler)))
+ r.GET("/pubsub/pull/:ManagerName/:Publisher", basemw, pubsubPullHandler)
}
- http.DefaultServeMux.Handle("/", router)
+ http.DefaultServeMux.Handle("/", r)
}
// warmupHandler warms in-memory caches.
-func warmupHandler(rc *requestContext) {
- if err := server.Warmup(rc); err != nil {
+func warmupHandler(c *router.Context) {
+ rc := requestContext(*c)
+ if err := server.Warmup(rc.Context); err != nil {
rc.fail(500, "Failed to warmup OpenID: %s", err)
return
}
rc.ok()
}
// pubsubPushHandler handles incoming PubSub messages.
-func pubsubPushHandler(rc *requestContext) {
- body, err := ioutil.ReadAll(rc.r.Body)
+func pubsubPushHandler(c *router.Context) {
+ rc := requestContext(*c)
+ body, err := ioutil.ReadAll(rc.Request.Body)
if err != nil {
rc.fail(500, "Failed to read the request: %s", err)
return
}
- if err = globalEngine.ProcessPubSubPush(rc, body); err != nil {
+ if err = globalEngine.ProcessPubSubPush(rc.Context, body); err != nil {
rc.err(err, "Failed to process incoming PubSub push")
return
}
rc.ok()
}
// pubsubPullHandler is called on dev server by developer to pull pubsub
// messages from a topic created for a publisher.
-func pubsubPullHandler(rc *requestContext) {
+func pubsubPullHandler(c *router.Context) {
+ rc := requestContext(*c)
if !appengine.IsDevAppServer() {
rc.fail(403, "Not a dev server")
return
}
err := globalEngine.PullPubSubOnDevServer(
- rc, rc.p.ByName("ManagerName"), rc.p.ByName("Publisher"))
+ rc.Context, rc.Params.ByName("ManagerName"), rc.Params.ByName("Publisher"))
if err != nil {
rc.err(err, "Failed to pull PubSub messages")
} else {
rc.ok()
}
}
// readConfigCron grabs a list of projects from the catalog and datastore and
// dispatches task queue tasks to update each project's cron jobs.
-func readConfigCron(c *requestContext) {
+func readConfigCron(c *router.Context) {
+ rc := requestContext(*c)
projectsToVisit := map[string]bool{}
// Visit all projects in the catalog.
- ctx, _ := context.WithTimeout(c.Context, 150*time.Second)
+ ctx, _ := context.WithTimeout(rc.Context, 150*time.Second)
projects, err := globalCatalog.GetAllProjects(ctx)
if err != nil {
- c.err(err, "Failed to grab a list of project IDs from catalog")
+ rc.err(err, "Failed to grab a list of project IDs from catalog")
return
}
for _, id := range projects {
projectsToVisit[id] = true
}
// Also visit all registered projects that do not show up in the catalog
// listing anymore. It will unregister all crons belonging to them.
- existing, err := globalEngine.GetAllProjects(c.Context)
+ existing, err := globalEngine.GetAllProjects(rc.Context)
if err != nil {
- c.err(err, "Failed to grab a list of project IDs from datastore")
+ rc.err(err, "Failed to grab a list of project IDs from datastore")
return
}
for _, id := range existing {
projectsToVisit[id] = true
}
// Handle each project in its own task to avoid "bad" projects (e.g. ones with
// lots of crons) to slow down "good" ones.
tasks := make([]*taskqueue.Task, 0, len(projectsToVisit))
for projectID := range projectsToVisit {
tasks = append(tasks, &taskqueue.Task{
Path: "/internal/tasks/read-project-config?projectID=" + url.QueryEscape(projectID),
})
}
- tq := taskqueue.Get(c)
+ tq := taskqueue.Get(rc.Context)
if err = tq.AddMulti(tasks, "read-project-config"); err != nil {
- c.err(errors.WrapTransient(err), "Failed to add tasks to task queue")
+ rc.err(errors.WrapTransient(err), "Failed to add tasks to task queue")
} else {
- c.ok()
+ rc.ok()
}
}
// readProjectConfigTask grabs a list of cron jobs in a project from catalog,
// updates all changed cron jobs, adds new ones, disables old ones.
-func readProjectConfigTask(c *requestContext) {
- projectID := c.r.URL.Query().Get("projectID")
+func readProjectConfigTask(c *router.Context) {
+ rc := requestContext(*c)
+ projectID := rc.Request.URL.Query().Get("projectID")
if projectID == "" {
// Return 202 to avoid retry, it is fatal error.
- c.fail(202, "Missing projectID query attribute")
+ rc.fail(202, "Missing projectID query attribute")
return
}
- ctx, _ := context.WithTimeout(c.Context, 150*time.Second)
+ ctx, _ := context.WithTimeout(rc.Context, 150*time.Second)
jobs, err := globalCatalog.GetProjectJobs(ctx, projectID)
if err != nil {
- c.err(err, "Failed to query for a list of jobs")
+ rc.err(err, "Failed to query for a list of jobs")
return
}
- if err = globalEngine.UpdateProjectJobs(c.Context, projectID, jobs); err != nil {
- c.err(err, "Failed to update some cron jobs")
+ if err = globalEngine.UpdateProjectJobs(rc.Context, projectID, jobs); err != nil {
+ rc.err(err, "Failed to update some cron jobs")
return
}
- c.ok()
+ rc.ok()
}
// actionTask is used to route actions emitted by cron job state transitions
// back into Engine (see enqueueActions).
-func actionTask(c *requestContext) {
- body, err := ioutil.ReadAll(c.r.Body)
+func actionTask(c *router.Context) {
+ rc := requestContext(*c)
+ body, err := ioutil.ReadAll(rc.Request.Body)
if err != nil {
- c.fail(500, "Failed to read request body: %s", err)
+ rc.fail(500, "Failed to read request body: %s", err)
return
}
- count, _ := strconv.Atoi(c.r.Header.Get("X-AppEngine-TaskExecutionCount"))
- err = globalEngine.ExecuteSerializedAction(c.Context, body, count)
+ count, _ := strconv.Atoi(rc.Request.Header.Get("X-AppEngine-TaskExecutionCount"))
+ err = globalEngine.ExecuteSerializedAction(rc.Context, body, count)
if err != nil {
- c.err(err, "Error when executing the action")
+ rc.err(err, "Error when executing the action")
return
}
- c.ok()
+ rc.ok()
}
« no previous file with comments | « no previous file | appengine/cmd/cron/ui/common.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698