| Index: scheduler/appengine/engine/cron/demo/main.go
|
| diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go
|
| index e24f1be0978f157051a71004853be5a507801aab..381f63877e45c67b091bbb6fc92b9ae19331fcf5 100644
|
| --- a/scheduler/appengine/engine/cron/demo/main.go
|
| +++ b/scheduler/appengine/engine/cron/demo/main.go
|
| @@ -16,15 +16,13 @@
|
| package demo
|
|
|
| import (
|
| - "fmt"
|
| "net/http"
|
| - "strconv"
|
| "time"
|
|
|
| "golang.org/x/net/context"
|
|
|
| + "github.com/golang/protobuf/proto"
|
| "github.com/luci/gae/service/datastore"
|
| - "github.com/luci/gae/service/taskqueue"
|
|
|
| "github.com/luci/luci-go/appengine/gaemiddleware"
|
| "github.com/luci/luci-go/common/clock"
|
| @@ -33,9 +31,13 @@ import (
|
| "github.com/luci/luci-go/server/router"
|
|
|
| "github.com/luci/luci-go/scheduler/appengine/engine/cron"
|
| + "github.com/luci/luci-go/scheduler/appengine/engine/internal"
|
| + "github.com/luci/luci-go/scheduler/appengine/engine/tq"
|
| "github.com/luci/luci-go/scheduler/appengine/schedule"
|
| )
|
|
|
| +var tasks = tq.Dispatcher{}
|
| +
|
| type CronState struct {
|
| _extra datastore.PropertyMap `gae:"-,extra"`
|
|
|
| @@ -72,23 +74,23 @@ func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine
|
| }
|
|
|
| for _, action := range machine.Actions {
|
| - var task *taskqueue.Task
|
| + var task tq.Task
|
| switch a := action.(type) {
|
| case cron.TickLaterAction:
|
| logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
|
| - task = &taskqueue.Task{
|
| - Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce),
|
| - ETA: a.When,
|
| + task = tq.Task{
|
| + Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
|
| + ETA: a.When,
|
| }
|
| case cron.StartInvocationAction:
|
| - task = &taskqueue.Task{
|
| - Path: fmt.Sprintf("/invocation/%s", id),
|
| - Delay: time.Second, // give the transaction time to land
|
| + task = tq.Task{
|
| + Payload: &internal.StartInvocationTask{JobId: id},
|
| + Delay: time.Second, // give the transaction time to land
|
| }
|
| default:
|
| panic("unknown action type")
|
| }
|
| - if err := taskqueue.Add(c, "default", task); err != nil {
|
| + if err := tasks.AddTask(c, &task); err != nil {
|
| return err
|
| }
|
| }
|
| @@ -112,15 +114,17 @@ func startJob(c context.Context, id string) error {
|
| })
|
| }
|
|
|
| -func handleTick(c context.Context, id string, nonce int64) error {
|
| - return evolve(c, id, func(c context.Context, m *cron.Machine) error {
|
| - return m.OnTimerTick(nonce)
|
| +func handleTick(c context.Context, task proto.Message, execCount int) error {
|
| + msg := task.(*internal.TickLaterTask)
|
| + return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error {
|
| + return m.OnTimerTick(msg.TickNonce)
|
| })
|
| }
|
|
|
| -func handleInvocation(c context.Context, id string) error {
|
| - logging.Infof(c, "INVOCATION of job %q has finished!", id)
|
| - return evolve(c, id, func(c context.Context, m *cron.Machine) error {
|
| +func handleInvocation(c context.Context, task proto.Message, execCount int) error {
|
| + msg := task.(*internal.StartInvocationTask)
|
| + logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId)
|
| + return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error {
|
| m.RewindIfNecessary()
|
| return nil
|
| })
|
| @@ -130,6 +134,10 @@ func init() {
|
| r := router.New()
|
| gaemiddleware.InstallHandlers(r)
|
|
|
| + tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil)
|
| + tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil)
|
| + tasks.InstallRoutes(r, gaemiddleware.BaseProd())
|
| +
|
| // Kick-start a bunch of jobs by visiting:
|
| //
|
| // http://localhost:8080/start/with 10s interval
|
| @@ -145,23 +153,5 @@ func init() {
|
| }
|
| })
|
|
|
| - r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) {
|
| - jobID := c.Params.ByName("JobID")
|
| - nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
|
| - if err != nil {
|
| - panic(err)
|
| - }
|
| - if err := handleTick(c.Context, jobID, nonce); err != nil {
|
| - panic(err)
|
| - }
|
| - })
|
| -
|
| - r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
|
| - jobID := c.Params.ByName("JobID")
|
| - if err := handleInvocation(c.Context, jobID); err != nil {
|
| - panic(err)
|
| - }
|
| - })
|
| -
|
| http.DefaultServeMux.Handle("/", r)
|
| }
|
|
|