Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1298)

Unified Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: comment nit Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | scheduler/appengine/engine/internal/gen.go » ('j') | scheduler/appengine/engine/tq/tq.go » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/cron/demo/main.go
diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go
index e24f1be0978f157051a71004853be5a507801aab..381f63877e45c67b091bbb6fc92b9ae19331fcf5 100644
--- a/scheduler/appengine/engine/cron/demo/main.go
+++ b/scheduler/appengine/engine/cron/demo/main.go
@@ -16,15 +16,13 @@
package demo
import (
- "fmt"
"net/http"
- "strconv"
"time"
"golang.org/x/net/context"
+ "github.com/golang/protobuf/proto"
"github.com/luci/gae/service/datastore"
- "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/appengine/gaemiddleware"
"github.com/luci/luci-go/common/clock"
@@ -33,9 +31,13 @@ import (
"github.com/luci/luci-go/server/router"
"github.com/luci/luci-go/scheduler/appengine/engine/cron"
+ "github.com/luci/luci-go/scheduler/appengine/engine/internal"
+ "github.com/luci/luci-go/scheduler/appengine/engine/tq"
"github.com/luci/luci-go/scheduler/appengine/schedule"
)
+var tasks = tq.Dispatcher{}
+
type CronState struct {
_extra datastore.PropertyMap `gae:"-,extra"`
@@ -72,23 +74,23 @@ func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine
}
for _, action := range machine.Actions {
- var task *taskqueue.Task
+ var task tq.Task
switch a := action.(type) {
case cron.TickLaterAction:
logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
- task = &taskqueue.Task{
- Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce),
- ETA: a.When,
+ task = tq.Task{
+ Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
+ ETA: a.When,
}
case cron.StartInvocationAction:
- task = &taskqueue.Task{
- Path: fmt.Sprintf("/invocation/%s", id),
- Delay: time.Second, // give the transaction time to land
+ task = tq.Task{
+ Payload: &internal.StartInvocationTask{JobId: id},
+ Delay: time.Second, // give the transaction time to land
}
default:
panic("unknown action type")
}
- if err := taskqueue.Add(c, "default", task); err != nil {
+ if err := tasks.AddTask(c, &task); err != nil {
return err
}
}
@@ -112,15 +114,17 @@ func startJob(c context.Context, id string) error {
})
}
-func handleTick(c context.Context, id string, nonce int64) error {
- return evolve(c, id, func(c context.Context, m *cron.Machine) error {
- return m.OnTimerTick(nonce)
+func handleTick(c context.Context, task proto.Message, execCount int) error {
+ msg := task.(*internal.TickLaterTask)
+ return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error {
+ return m.OnTimerTick(msg.TickNonce)
})
}
-func handleInvocation(c context.Context, id string) error {
- logging.Infof(c, "INVOCATION of job %q has finished!", id)
- return evolve(c, id, func(c context.Context, m *cron.Machine) error {
+func handleInvocation(c context.Context, task proto.Message, execCount int) error {
+ msg := task.(*internal.StartInvocationTask)
+ logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId)
+ return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error {
m.RewindIfNecessary()
return nil
})
@@ -130,6 +134,10 @@ func init() {
r := router.New()
gaemiddleware.InstallHandlers(r)
+ tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil)
+ tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil)
+ tasks.InstallRoutes(r, gaemiddleware.BaseProd())
+
// Kick-start a bunch of jobs by visiting:
//
// http://localhost:8080/start/with 10s interval
@@ -145,23 +153,5 @@ func init() {
}
})
- r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) {
- jobID := c.Params.ByName("JobID")
- nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
- if err != nil {
- panic(err)
- }
- if err := handleTick(c.Context, jobID, nonce); err != nil {
- panic(err)
- }
- })
-
- r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
- jobID := c.Params.ByName("JobID")
- if err := handleInvocation(c.Context, jobID); err != nil {
- panic(err)
- }
- })
-
http.DefaultServeMux.Handle("/", r)
}
« no previous file with comments | « no previous file | scheduler/appengine/engine/internal/gen.go » ('j') | scheduler/appengine/engine/tq/tq.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698