Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(798)

Unified Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2980943002: Add cron.Machine state machine. (Closed)
Patch Set: typos Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « scheduler/appengine/engine/cron/demo/app.yaml ('k') | scheduler/appengine/engine/cron/machine.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/cron/demo/main.go
diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go
new file mode 100644
index 0000000000000000000000000000000000000000..e24f1be0978f157051a71004853be5a507801aab
--- /dev/null
+++ b/scheduler/appengine/engine/cron/demo/main.go
@@ -0,0 +1,167 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package demo shows how cron.Machines can be hosted with Datastore and TQ.
+package demo
+
+import (
+ "fmt"
+ "net/http"
+ "strconv"
+ "time"
+
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/taskqueue"
+
+ "github.com/luci/luci-go/appengine/gaemiddleware"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/data/rand/mathrand"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/server/router"
+
+ "github.com/luci/luci-go/scheduler/appengine/engine/cron"
+ "github.com/luci/luci-go/scheduler/appengine/schedule"
+)
+
+type CronState struct {
+ _extra datastore.PropertyMap `gae:"-,extra"`
+
+ ID string `gae:"$id"`
+ State cron.State `gae:",noindex"`
+}
+
+func (s *CronState) schedule() *schedule.Schedule {
+ parsed, err := schedule.Parse(s.ID, 0)
+ if err != nil {
+ panic(err)
+ }
+ return parsed
+}
+
+// evolve instantiates cron.Machine, calls the callback and submits emitted
+// actions.
+func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error {
+ err := datastore.RunInTransaction(c, func(c context.Context) error {
+ entity := CronState{ID: id}
+ if err := datastore.Get(c, &entity); err != nil && err != datastore.ErrNoSuchEntity {
+ return err
+ }
+
+ machine := &cron.Machine{
+ Now: clock.Now(c),
+ Schedule: entity.schedule(),
+ Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
+ State: entity.State,
+ }
+
+ if err := cb(c, machine); err != nil {
+ return err
+ }
+
+ for _, action := range machine.Actions {
+ var task *taskqueue.Task
+ switch a := action.(type) {
+ case cron.TickLaterAction:
+ logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
+ task = &taskqueue.Task{
+ Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce),
+ ETA: a.When,
+ }
+ case cron.StartInvocationAction:
+ task = &taskqueue.Task{
+ Path: fmt.Sprintf("/invocation/%s", id),
+ Delay: time.Second, // give the transaction time to land
+ }
+ default:
+ panic("unknown action type")
+ }
+ if err := taskqueue.Add(c, "default", task); err != nil {
+ return err
+ }
+ }
+
+ entity.State = machine.State
+ return datastore.Put(c, &entity)
+ }, nil)
+
+ if err != nil {
+ logging.Errorf(c, "FAIL - %s", err)
+ }
+ return err
+}
+
+func startJob(c context.Context, id string) error {
+ return evolve(c, id, func(c context.Context, m *cron.Machine) error {
+ // Forcefully restart the chain of tasks.
+ m.Disable()
+ m.Enable()
+ return nil
+ })
+}
+
+func handleTick(c context.Context, id string, nonce int64) error {
+ return evolve(c, id, func(c context.Context, m *cron.Machine) error {
+ return m.OnTimerTick(nonce)
+ })
+}
+
+func handleInvocation(c context.Context, id string) error {
+ logging.Infof(c, "INVOCATION of job %q has finished!", id)
+ return evolve(c, id, func(c context.Context, m *cron.Machine) error {
+ m.RewindIfNecessary()
+ return nil
+ })
+}
+
+func init() {
+ r := router.New()
+ gaemiddleware.InstallHandlers(r)
+
+ // Kick-start a bunch of jobs by visiting:
+ //
+ // http://localhost:8080/start/with 10s interval
+ // http://localhost:8080/start/with 5s interval
+ // http://localhost:8080/start/0 * * * * * * *
+ //
+ // And the look at the logs.
+
+ r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
+ jobID := c.Params.ByName("JobID")
+ if err := startJob(c.Context, jobID); err != nil {
+ panic(err)
+ }
+ })
+
+ r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) {
+ jobID := c.Params.ByName("JobID")
+ nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
+ if err != nil {
+ panic(err)
+ }
+ if err := handleTick(c.Context, jobID, nonce); err != nil {
+ panic(err)
+ }
+ })
+
+ r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
+ jobID := c.Params.ByName("JobID")
+ if err := handleInvocation(c.Context, jobID); err != nil {
+ panic(err)
+ }
+ })
+
+ http.DefaultServeMux.Handle("/", r)
+}
« no previous file with comments | « scheduler/appengine/engine/cron/demo/app.yaml ('k') | scheduler/appengine/engine/cron/machine.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698