Chromium Code Reviews| Index: scheduler/appengine/engine/cron/demo/main.go |
| diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go |
| index e24f1be0978f157051a71004853be5a507801aab..002cd3f376defeb7709060f52b814d76e5ec543a 100644 |
| --- a/scheduler/appengine/engine/cron/demo/main.go |
| +++ b/scheduler/appengine/engine/cron/demo/main.go |
| @@ -16,15 +16,13 @@ |
| package demo |
| import ( |
| - "fmt" |
| "net/http" |
| - "strconv" |
| "time" |
| "golang.org/x/net/context" |
| + "github.com/golang/protobuf/proto" |
| "github.com/luci/gae/service/datastore" |
| - "github.com/luci/gae/service/taskqueue" |
| "github.com/luci/luci-go/appengine/gaemiddleware" |
| "github.com/luci/luci-go/common/clock" |
| @@ -33,6 +31,8 @@ import ( |
| "github.com/luci/luci-go/server/router" |
| "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
| + "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
| + "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
| "github.com/luci/luci-go/scheduler/appengine/schedule" |
| ) |
| @@ -72,23 +72,23 @@ func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine |
| } |
| for _, action := range machine.Actions { |
| - var task *taskqueue.Task |
| + var task tq.Task |
| switch a := action.(type) { |
| case cron.TickLaterAction: |
| logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) |
| - task = &taskqueue.Task{ |
| - Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce), |
| - ETA: a.When, |
| + task = tq.Task{ |
| + Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, |
| + ETA: a.When, |
| } |
| case cron.StartInvocationAction: |
| - task = &taskqueue.Task{ |
| - Path: fmt.Sprintf("/invocation/%s", id), |
| - Delay: time.Second, // give the transaction time to land |
| + task = tq.Task{ |
| + Payload: &internal.StartInvocationTask{JobId: id}, |
| + Delay: time.Second, // give the transaction time to land |
| } |
| default: |
| panic("unknown action type") |
| } |
| - if err := taskqueue.Add(c, "default", task); err != nil { |
| + if err := tq.AddTask(c, task); err != nil { |
| return err |
| } |
| } |
| @@ -112,15 +112,17 @@ func startJob(c context.Context, id string) error { |
| }) |
| } |
| -func handleTick(c context.Context, id string, nonce int64) error { |
| - return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| - return m.OnTimerTick(nonce) |
| +func handleTick(c context.Context, task proto.Message, execCount int) error { |
| + msg := task.(*internal.TickLaterTask) |
| + return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
| + return m.OnTimerTick(msg.TickNonce) |
| }) |
| } |
| -func handleInvocation(c context.Context, id string) error { |
| - logging.Infof(c, "INVOCATION of job %q has finished!", id) |
| - return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| +func handleInvocation(c context.Context, task proto.Message, execCount int) error { |
| + msg := task.(*internal.StartInvocationTask) |
| + logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) |
| + return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
| m.RewindIfNecessary() |
| return nil |
| }) |
| @@ -130,6 +132,10 @@ func init() { |
| r := router.New() |
| gaemiddleware.InstallHandlers(r) |
| + tq.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil) |
|
Vadim Sh.
2017/07/15 01:16:29
task type itself is used as routing key
|
| + tq.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil) |
| + tq.DefaultDispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) |
| + |
| // Kick-start a bunch of jobs by visiting: |
| // |
| // http://localhost:8080/start/with 10s interval |
| @@ -145,23 +151,5 @@ func init() { |
| } |
| }) |
| - r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) { |
| - jobID := c.Params.ByName("JobID") |
| - nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64) |
| - if err != nil { |
| - panic(err) |
| - } |
| - if err := handleTick(c.Context, jobID, nonce); err != nil { |
| - panic(err) |
| - } |
| - }) |
| - |
| - r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
| - jobID := c.Params.ByName("JobID") |
| - if err := handleInvocation(c.Context, jobID); err != nil { |
| - panic(err) |
| - } |
| - }) |
| - |
| http.DefaultServeMux.Handle("/", r) |
| } |