| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // Package frontend implements GAE web server for luci-cron service. | 5 // Package frontend implements GAE web server for luci-cron service. |
| 6 // | 6 // |
| 7 // Due to the way classic GAE imports work, this package can not have | 7 // Due to the way classic GAE imports work, this package can not have |
| 8 // subpackages (or at least subpackages referenced via absolute import path). | 8 // subpackages (or at least subpackages referenced via absolute import path). |
| 9 // We can't use relative imports because luci-go will then become unbuildable | 9 // We can't use relative imports because luci-go will then become unbuildable |
| 10 // by regular (non GAE) toolset. | 10 // by regular (non GAE) toolset. |
| 11 // | 11 // |
| 12 // See https://groups.google.com/forum/#!topic/google-appengine-go/dNhqV6PBqVc. | 12 // See https://groups.google.com/forum/#!topic/google-appengine-go/dNhqV6PBqVc. |
| 13 package frontend | 13 package frontend |
| 14 | 14 |
| 15 import ( | 15 import ( |
| 16 cryptorand "crypto/rand" | 16 cryptorand "crypto/rand" |
| 17 "encoding/binary" | 17 "encoding/binary" |
| 18 "fmt" | 18 "fmt" |
| 19 "io/ioutil" | 19 "io/ioutil" |
| 20 "math/rand" | 20 "math/rand" |
| 21 "net/http" | 21 "net/http" |
| 22 "net/url" | 22 "net/url" |
| 23 "strconv" | 23 "strconv" |
| 24 "sync" | 24 "sync" |
| 25 "time" | 25 "time" |
| 26 | 26 |
| 27 "github.com/julienschmidt/httprouter" | |
| 28 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
| 29 "google.golang.org/appengine" | 28 "google.golang.org/appengine" |
| 30 | 29 |
| 31 "github.com/luci/gae/service/info" | 30 "github.com/luci/gae/service/info" |
| 32 "github.com/luci/gae/service/taskqueue" | 31 "github.com/luci/gae/service/taskqueue" |
| 33 | 32 |
| 34 "github.com/luci/luci-go/server/auth" | 33 "github.com/luci/luci-go/server/auth" |
| 35 » "github.com/luci/luci-go/server/middleware" | 34 » "github.com/luci/luci-go/server/router" |
| 36 | 35 |
| 37 "github.com/luci/luci-go/appengine/gaeauth/server" | 36 "github.com/luci/luci-go/appengine/gaeauth/server" |
| 38 "github.com/luci/luci-go/appengine/gaeconfig" | 37 "github.com/luci/luci-go/appengine/gaeconfig" |
| 39 "github.com/luci/luci-go/appengine/gaemiddleware" | 38 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 40 | 39 |
| 41 "github.com/luci/luci-go/common/config" | 40 "github.com/luci/luci-go/common/config" |
| 42 "github.com/luci/luci-go/common/config/impl/memory" | 41 "github.com/luci/luci-go/common/config/impl/memory" |
| 43 "github.com/luci/luci-go/common/errors" | 42 "github.com/luci/luci-go/common/errors" |
| 44 "github.com/luci/luci-go/common/logging" | 43 "github.com/luci/luci-go/common/logging" |
| 45 | 44 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 63 managers = []task.Manager{ | 62 managers = []task.Manager{ |
| 64 &buildbucket.TaskManager{}, | 63 &buildbucket.TaskManager{}, |
| 65 &noop.TaskManager{}, | 64 &noop.TaskManager{}, |
| 66 &swarming.TaskManager{}, | 65 &swarming.TaskManager{}, |
| 67 &urlfetch.TaskManager{}, | 66 &urlfetch.TaskManager{}, |
| 68 } | 67 } |
| 69 ) | 68 ) |
| 70 | 69 |
| 71 //// Helpers. | 70 //// Helpers. |
| 72 | 71 |
| 73 type handler func(c *requestContext) | 72 // requestContext is used to add helper methods. |
| 74 | 73 type requestContext router.Context |
| 75 type requestContext struct { | |
| 76 » context.Context | |
| 77 | |
| 78 » w http.ResponseWriter | |
| 79 » r *http.Request | |
| 80 » p httprouter.Params | |
| 81 } | |
| 82 | 74 |
| 83 // fail writes error message to the log and the response and sets status code. | 75 // fail writes error message to the log and the response and sets status code. |
| 84 func (c *requestContext) fail(code int, msg string, args ...interface{}) { | 76 func (c *requestContext) fail(code int, msg string, args ...interface{}) { |
| 85 body := fmt.Sprintf(msg, args...) | 77 body := fmt.Sprintf(msg, args...) |
| 86 » logging.Errorf(c, "HTTP %d: %s", code, body) | 78 » logging.Errorf(c.Context, "HTTP %d: %s", code, body) |
| 87 » http.Error(c.w, body, code) | 79 » http.Error(c.Writer, body, code) |
| 88 } | 80 } |
| 89 | 81 |
| 90 // err sets status to 500 on transient errors or 202 on fatal ones. Returning | 82 // err sets status to 500 on transient errors or 202 on fatal ones. Returning |
| 91 // status code in range [200–299] is the only way to tell Task Queues to stop | 83 // status code in range [200–299] is the only way to tell Task Queues to stop |
| 92 // retrying the task. | 84 // retrying the task. |
| 93 func (c *requestContext) err(e error, msg string, args ...interface{}) { | 85 func (c *requestContext) err(e error, msg string, args ...interface{}) { |
| 94 code := 500 | 86 code := 500 |
| 95 if !errors.IsTransient(e) { | 87 if !errors.IsTransient(e) { |
| 96 code = 202 | 88 code = 202 |
| 97 } | 89 } |
| 98 args = append(args, e) | 90 args = append(args, e) |
| 99 c.fail(code, msg+" - %s", args...) | 91 c.fail(code, msg+" - %s", args...) |
| 100 } | 92 } |
| 101 | 93 |
| 102 // ok sets status to 200 and puts "OK" in response. | 94 // ok sets status to 200 and puts "OK" in response. |
| 103 func (c *requestContext) ok() { | 95 func (c *requestContext) ok() { |
| 104 » c.w.Header().Set("Content-Type", "text/plain; charset=utf-8") | 96 » c.Writer.Header().Set("Content-Type", "text/plain; charset=utf-8") |
| 105 » c.w.WriteHeader(200) | 97 » c.Writer.WriteHeader(200) |
| 106 » fmt.Fprintln(c.w, "OK") | 98 » fmt.Fprintln(c.Writer, "OK") |
| 107 } | 99 } |
| 108 | 100 |
| 109 /// | 101 /// |
| 110 | 102 |
| 111 var globalInit sync.Once | 103 var globalInit sync.Once |
| 112 | 104 |
| 113 // initializeGlobalState does one time initialization for stuff that needs | 105 // initializeGlobalState does one time initialization for stuff that needs |
| 114 // active GAE context. | 106 // active GAE context. |
| 115 func initializeGlobalState(c context.Context) { | 107 func initializeGlobalState(c context.Context) { |
| 116 if info.Get(c).IsDevAppServer() { | 108 if info.Get(c).IsDevAppServer() { |
| 117 // Dev app server doesn't preserve the state of task queues acro
ss restarts, | 109 // Dev app server doesn't preserve the state of task queues acro
ss restarts, |
| 118 // need to reset datastore state accordingly, otherwise everythi
ng gets stuck. | 110 // need to reset datastore state accordingly, otherwise everythi
ng gets stuck. |
| 119 if err := globalEngine.ResetAllJobsOnDevServer(c); err != nil { | 111 if err := globalEngine.ResetAllJobsOnDevServer(c); err != nil { |
| 120 logging.Errorf(c, "Failed to reset jobs: %s", err) | 112 logging.Errorf(c, "Failed to reset jobs: %s", err) |
| 121 } | 113 } |
| 122 } | 114 } |
| 123 } | 115 } |
| 124 | 116 |
| 125 // getConfigImpl returns config.Interface implementation to use from the | 117 // getConfigImpl returns config.Interface implementation to use from the |
| 126 // catalog. | 118 // catalog. |
| 127 func getConfigImpl(c context.Context) (config.Interface, error) { | 119 func getConfigImpl(c context.Context) (config.Interface, error) { |
| 128 // Use fake config data on dev server for simplicity. | 120 // Use fake config data on dev server for simplicity. |
| 129 if info.Get(c).IsDevAppServer() { | 121 if info.Get(c).IsDevAppServer() { |
| 130 return memory.New(devServerConfigs()), nil | 122 return memory.New(devServerConfigs()), nil |
| 131 } | 123 } |
| 132 return gaeconfig.New(c) | 124 return gaeconfig.New(c) |
| 133 } | 125 } |
| 134 | 126 |
| 135 // wrap converts the handler to format accepted by middleware lib. It also adds | 127 // base returns middleware chain. It initializes prod context and sets up |
| 136 // context initialization code. | |
| 137 func wrap(h handler) middleware.Handler { | |
| 138 » return func(c context.Context, w http.ResponseWriter, r *http.Request, p
httprouter.Params) { | |
| 139 » » h(&requestContext{c, w, r, p}) | |
| 140 » } | |
| 141 } | |
| 142 | |
| 143 // base starts middleware chain. It initializes prod context and sets up | |
| 144 // authentication config. | 128 // authentication config. |
| 145 func base(h middleware.Handler) httprouter.Handle { | 129 func base() router.MiddlewareChain { |
| 146 methods := auth.Authenticator{ | 130 methods := auth.Authenticator{ |
| 147 &server.OAuth2Method{Scopes: []string{server.EmailScope}}, | 131 &server.OAuth2Method{Scopes: []string{server.EmailScope}}, |
| 148 server.CookieAuth, | 132 server.CookieAuth, |
| 149 &server.InboundAppIDAuthMethod{}, | 133 &server.InboundAppIDAuthMethod{}, |
| 150 } | 134 } |
| 151 » wrapper := func(c context.Context, w http.ResponseWriter, r *http.Reques
t, p httprouter.Params) { | 135 » return append( |
| 152 » » globalInit.Do(func() { initializeGlobalState(c) }) | 136 » » gaemiddleware.BaseProd(), |
| 153 » » c = auth.SetAuthenticator(c, methods) | 137 » » func(c *router.Context, next router.Handler) { |
| 154 » » h(c, w, r, p) | 138 » » » globalInit.Do(func() { initializeGlobalState(c.Context)
}) |
| 155 » } | 139 » » » c.Context = auth.SetAuthenticator(c.Context, methods) |
| 156 » return gaemiddleware.BaseProd(wrapper) | 140 » » » next(c) |
| 157 } | 141 » » }, |
| 158 | 142 » ) |
| 159 // cronHandler returns handler intended for cron jobs. | |
| 160 func cronHandler(h handler) httprouter.Handle { | |
| 161 » return base(gaemiddleware.RequireCron(wrap(h))) | |
| 162 } | |
| 163 | |
| 164 // taskQueueHandler returns handler intended for task queue calls. | |
| 165 func taskQueueHandler(name string, h handler) httprouter.Handle { | |
| 166 » return base(gaemiddleware.RequireTaskQueue(name, wrap(h))) | |
| 167 } | 143 } |
| 168 | 144 |
| 169 //// Routes. | 145 //// Routes. |
| 170 | 146 |
| 171 func init() { | 147 func init() { |
| 172 // Dev server likes to restart a lot, and upon a restart math/rand seed
is | 148 // Dev server likes to restart a lot, and upon a restart math/rand seed
is |
| 173 // always set to 1, resulting in lots of presumably "random" IDs not bei
ng | 149 // always set to 1, resulting in lots of presumably "random" IDs not bei
ng |
| 174 // very random. Seed it with real randomness. | 150 // very random. Seed it with real randomness. |
| 175 var seed int64 | 151 var seed int64 |
| 176 if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); er
r != nil { | 152 if err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed); er
r != nil { |
| (...skipping 11 matching lines...) Expand all Loading... |
| 188 globalEngine = engine.NewEngine(engine.Config{ | 164 globalEngine = engine.NewEngine(engine.Config{ |
| 189 Catalog: globalCatalog, | 165 Catalog: globalCatalog, |
| 190 TimersQueuePath: "/internal/tasks/timers", | 166 TimersQueuePath: "/internal/tasks/timers", |
| 191 TimersQueueName: "timers", | 167 TimersQueueName: "timers", |
| 192 InvocationsQueuePath: "/internal/tasks/invocations", | 168 InvocationsQueuePath: "/internal/tasks/invocations", |
| 193 InvocationsQueueName: "invocations", | 169 InvocationsQueueName: "invocations", |
| 194 PubSubPushPath: "/pubsub", | 170 PubSubPushPath: "/pubsub", |
| 195 }) | 171 }) |
| 196 | 172 |
| 197 // Setup HTTP routes. | 173 // Setup HTTP routes. |
| 198 » router := httprouter.New() | 174 » r := router.New() |
| 175 » basemw := base() |
| 199 | 176 |
| 200 » gaemiddleware.InstallHandlers(router, base) | 177 » gaemiddleware.InstallHandlers(r, basemw) |
| 201 » ui.InstallHandlers(router, base, ui.Config{ | 178 » ui.InstallHandlers(r, basemw, ui.Config{ |
| 202 Engine: globalEngine, | 179 Engine: globalEngine, |
| 203 TemplatesPath: "templates", | 180 TemplatesPath: "templates", |
| 204 }) | 181 }) |
| 205 | 182 |
| 206 » router.GET("/_ah/warmup", base(wrap(warmupHandler))) | 183 » r.GET("/_ah/warmup", basemw, warmupHandler) |
| 207 » router.GET("/_ah/start", base(wrap(warmupHandler))) | 184 » r.GET("/_ah/start", basemw, warmupHandler) |
| 208 » router.POST("/pubsub", base(wrap(pubsubPushHandler))) | 185 » r.POST("/pubsub", basemw, pubsubPushHandler) |
| 209 » router.GET("/internal/cron/read-config", cronHandler(readConfigCron)) | 186 » r.GET("/internal/cron/read-config", append(basemw, gaemiddleware.Require
Cron), readConfigCron) |
| 210 » router.POST("/internal/tasks/read-project-config", taskQueueHandler("rea
d-project-config", readProjectConfigTask)) | 187 » r.POST("/internal/tasks/read-project-config", append(basemw, gaemiddlewa
re.RequireTaskQueue("read-project-config")), readProjectConfigTask) |
| 211 » router.POST("/internal/tasks/timers", taskQueueHandler("timers", actionT
ask)) | 188 » r.POST("/internal/tasks/timers", append(basemw, gaemiddleware.RequireTas
kQueue("timers")), actionTask) |
| 212 » router.POST("/internal/tasks/invocations", taskQueueHandler("invocations
", actionTask)) | 189 » r.POST("/internal/tasks/invocations", append(basemw, gaemiddleware.Requi
reTaskQueue("invocations")), actionTask) |
| 213 | 190 |
| 214 // Devserver can't accept PubSub pushes, so allow manual pulls instead t
o | 191 // Devserver can't accept PubSub pushes, so allow manual pulls instead t
o |
| 215 // simplify local development. | 192 // simplify local development. |
| 216 if appengine.IsDevAppServer() { | 193 if appengine.IsDevAppServer() { |
| 217 » » router.GET("/pubsub/pull/:ManagerName/:Publisher", base(wrap(pub
subPullHandler))) | 194 » » r.GET("/pubsub/pull/:ManagerName/:Publisher", basemw, pubsubPull
Handler) |
| 218 } | 195 } |
| 219 | 196 |
| 220 » http.DefaultServeMux.Handle("/", router) | 197 » http.DefaultServeMux.Handle("/", r) |
| 221 } | 198 } |
| 222 | 199 |
| 223 // warmupHandler warms in-memory caches. | 200 // warmupHandler warms in-memory caches. |
| 224 func warmupHandler(rc *requestContext) { | 201 func warmupHandler(c *router.Context) { |
| 225 » if err := server.Warmup(rc); err != nil { | 202 » rc := requestContext(*c) |
| 203 » if err := server.Warmup(rc.Context); err != nil { |
| 226 rc.fail(500, "Failed to warmup OpenID: %s", err) | 204 rc.fail(500, "Failed to warmup OpenID: %s", err) |
| 227 return | 205 return |
| 228 } | 206 } |
| 229 rc.ok() | 207 rc.ok() |
| 230 } | 208 } |
| 231 | 209 |
| 232 // pubsubPushHandler handles incoming PubSub messages. | 210 // pubsubPushHandler handles incoming PubSub messages. |
| 233 func pubsubPushHandler(rc *requestContext) { | 211 func pubsubPushHandler(c *router.Context) { |
| 234 » body, err := ioutil.ReadAll(rc.r.Body) | 212 » rc := requestContext(*c) |
| 213 » body, err := ioutil.ReadAll(rc.Request.Body) |
| 235 if err != nil { | 214 if err != nil { |
| 236 rc.fail(500, "Failed to read the request: %s", err) | 215 rc.fail(500, "Failed to read the request: %s", err) |
| 237 return | 216 return |
| 238 } | 217 } |
| 239 » if err = globalEngine.ProcessPubSubPush(rc, body); err != nil { | 218 » if err = globalEngine.ProcessPubSubPush(rc.Context, body); err != nil { |
| 240 rc.err(err, "Failed to process incoming PubSub push") | 219 rc.err(err, "Failed to process incoming PubSub push") |
| 241 return | 220 return |
| 242 } | 221 } |
| 243 rc.ok() | 222 rc.ok() |
| 244 } | 223 } |
| 245 | 224 |
| 246 // pubsubPullHandler is called on dev server by developer to pull pubsub | 225 // pubsubPullHandler is called on dev server by developer to pull pubsub |
| 247 // messages from a topic created for a publisher. | 226 // messages from a topic created for a publisher. |
| 248 func pubsubPullHandler(rc *requestContext) { | 227 func pubsubPullHandler(c *router.Context) { |
| 228 » rc := requestContext(*c) |
| 249 if !appengine.IsDevAppServer() { | 229 if !appengine.IsDevAppServer() { |
| 250 rc.fail(403, "Not a dev server") | 230 rc.fail(403, "Not a dev server") |
| 251 return | 231 return |
| 252 } | 232 } |
| 253 err := globalEngine.PullPubSubOnDevServer( | 233 err := globalEngine.PullPubSubOnDevServer( |
| 254 » » rc, rc.p.ByName("ManagerName"), rc.p.ByName("Publisher")) | 234 » » rc.Context, rc.Params.ByName("ManagerName"), rc.Params.ByName("P
ublisher")) |
| 255 if err != nil { | 235 if err != nil { |
| 256 rc.err(err, "Failed to pull PubSub messages") | 236 rc.err(err, "Failed to pull PubSub messages") |
| 257 } else { | 237 } else { |
| 258 rc.ok() | 238 rc.ok() |
| 259 } | 239 } |
| 260 } | 240 } |
| 261 | 241 |
| 262 // readConfigCron grabs a list of projects from the catalog and datastore and | 242 // readConfigCron grabs a list of projects from the catalog and datastore and |
| 263 // dispatches task queue tasks to update each project's cron jobs. | 243 // dispatches task queue tasks to update each project's cron jobs. |
| 264 func readConfigCron(c *requestContext) { | 244 func readConfigCron(c *router.Context) { |
| 245 » rc := requestContext(*c) |
| 265 projectsToVisit := map[string]bool{} | 246 projectsToVisit := map[string]bool{} |
| 266 | 247 |
| 267 // Visit all projects in the catalog. | 248 // Visit all projects in the catalog. |
| 268 » ctx, _ := context.WithTimeout(c.Context, 150*time.Second) | 249 » ctx, _ := context.WithTimeout(rc.Context, 150*time.Second) |
| 269 projects, err := globalCatalog.GetAllProjects(ctx) | 250 projects, err := globalCatalog.GetAllProjects(ctx) |
| 270 if err != nil { | 251 if err != nil { |
| 271 » » c.err(err, "Failed to grab a list of project IDs from catalog") | 252 » » rc.err(err, "Failed to grab a list of project IDs from catalog") |
| 272 return | 253 return |
| 273 } | 254 } |
| 274 for _, id := range projects { | 255 for _, id := range projects { |
| 275 projectsToVisit[id] = true | 256 projectsToVisit[id] = true |
| 276 } | 257 } |
| 277 | 258 |
| 278 // Also visit all registered projects that do not show up in the catalog | 259 // Also visit all registered projects that do not show up in the catalog |
| 279 // listing anymore. It will unregister all crons belonging to them. | 260 // listing anymore. It will unregister all crons belonging to them. |
| 280 » existing, err := globalEngine.GetAllProjects(c.Context) | 261 » existing, err := globalEngine.GetAllProjects(rc.Context) |
| 281 if err != nil { | 262 if err != nil { |
| 282 » » c.err(err, "Failed to grab a list of project IDs from datastore"
) | 263 » » rc.err(err, "Failed to grab a list of project IDs from datastore
") |
| 283 return | 264 return |
| 284 } | 265 } |
| 285 for _, id := range existing { | 266 for _, id := range existing { |
| 286 projectsToVisit[id] = true | 267 projectsToVisit[id] = true |
| 287 } | 268 } |
| 288 | 269 |
| 289 // Handle each project in its own task to avoid "bad" projects (e.g. one
s with | 270 // Handle each project in its own task to avoid "bad" projects (e.g. one
s with |
| 290 // lots of crons) to slow down "good" ones. | 271 // lots of crons) to slow down "good" ones. |
| 291 tasks := make([]*taskqueue.Task, 0, len(projectsToVisit)) | 272 tasks := make([]*taskqueue.Task, 0, len(projectsToVisit)) |
| 292 for projectID := range projectsToVisit { | 273 for projectID := range projectsToVisit { |
| 293 tasks = append(tasks, &taskqueue.Task{ | 274 tasks = append(tasks, &taskqueue.Task{ |
| 294 Path: "/internal/tasks/read-project-config?projectID=" +
url.QueryEscape(projectID), | 275 Path: "/internal/tasks/read-project-config?projectID=" +
url.QueryEscape(projectID), |
| 295 }) | 276 }) |
| 296 } | 277 } |
| 297 » tq := taskqueue.Get(c) | 278 » tq := taskqueue.Get(rc.Context) |
| 298 if err = tq.AddMulti(tasks, "read-project-config"); err != nil { | 279 if err = tq.AddMulti(tasks, "read-project-config"); err != nil { |
| 299 » » c.err(errors.WrapTransient(err), "Failed to add tasks to task qu
eue") | 280 » » rc.err(errors.WrapTransient(err), "Failed to add tasks to task q
ueue") |
| 300 } else { | 281 } else { |
| 301 » » c.ok() | 282 » » rc.ok() |
| 302 } | 283 } |
| 303 } | 284 } |
| 304 | 285 |
| 305 // readProjectConfigTask grabs a list of cron jobs in a project from catalog, | 286 // readProjectConfigTask grabs a list of cron jobs in a project from catalog, |
| 306 // updates all changed cron jobs, adds new ones, disables old ones. | 287 // updates all changed cron jobs, adds new ones, disables old ones. |
| 307 func readProjectConfigTask(c *requestContext) { | 288 func readProjectConfigTask(c *router.Context) { |
| 308 » projectID := c.r.URL.Query().Get("projectID") | 289 » rc := requestContext(*c) |
| 290 » projectID := rc.Request.URL.Query().Get("projectID") |
| 309 if projectID == "" { | 291 if projectID == "" { |
| 310 // Return 202 to avoid retry, it is fatal error. | 292 // Return 202 to avoid retry, it is fatal error. |
| 311 » » c.fail(202, "Missing projectID query attribute") | 293 » » rc.fail(202, "Missing projectID query attribute") |
| 312 return | 294 return |
| 313 } | 295 } |
| 314 » ctx, _ := context.WithTimeout(c.Context, 150*time.Second) | 296 » ctx, _ := context.WithTimeout(rc.Context, 150*time.Second) |
| 315 jobs, err := globalCatalog.GetProjectJobs(ctx, projectID) | 297 jobs, err := globalCatalog.GetProjectJobs(ctx, projectID) |
| 316 if err != nil { | 298 if err != nil { |
| 317 » » c.err(err, "Failed to query for a list of jobs") | 299 » » rc.err(err, "Failed to query for a list of jobs") |
| 318 return | 300 return |
| 319 } | 301 } |
| 320 » if err = globalEngine.UpdateProjectJobs(c.Context, projectID, jobs); err
!= nil { | 302 » if err = globalEngine.UpdateProjectJobs(rc.Context, projectID, jobs); er
r != nil { |
| 321 » » c.err(err, "Failed to update some cron jobs") | 303 » » rc.err(err, "Failed to update some cron jobs") |
| 322 return | 304 return |
| 323 } | 305 } |
| 324 » c.ok() | 306 » rc.ok() |
| 325 } | 307 } |
| 326 | 308 |
| 327 // actionTask is used to route actions emitted by cron job state transitions | 309 // actionTask is used to route actions emitted by cron job state transitions |
| 328 // back into Engine (see enqueueActions). | 310 // back into Engine (see enqueueActions). |
| 329 func actionTask(c *requestContext) { | 311 func actionTask(c *router.Context) { |
| 330 » body, err := ioutil.ReadAll(c.r.Body) | 312 » rc := requestContext(*c) |
| 313 » body, err := ioutil.ReadAll(rc.Request.Body) |
| 331 if err != nil { | 314 if err != nil { |
| 332 » » c.fail(500, "Failed to read request body: %s", err) | 315 » » rc.fail(500, "Failed to read request body: %s", err) |
| 333 return | 316 return |
| 334 } | 317 } |
| 335 » count, _ := strconv.Atoi(c.r.Header.Get("X-AppEngine-TaskExecutionCount"
)) | 318 » count, _ := strconv.Atoi(rc.Request.Header.Get("X-AppEngine-TaskExecutio
nCount")) |
| 336 » err = globalEngine.ExecuteSerializedAction(c.Context, body, count) | 319 » err = globalEngine.ExecuteSerializedAction(rc.Context, body, count) |
| 337 if err != nil { | 320 if err != nil { |
| 338 » » c.err(err, "Error when executing the action") | 321 » » rc.err(err, "Error when executing the action") |
| 339 return | 322 return |
| 340 } | 323 } |
| 341 » c.ok() | 324 » rc.ok() |
| 342 } | 325 } |
| OLD | NEW |