 Chromium Code Reviews
 Chromium Code Reviews Issue 2980943002:
  Add cron.Machine state machine.  (Closed)
    
  
    Issue 2980943002:
  Add cron.Machine state machine.  (Closed) 
  | Index: scheduler/appengine/engine/cron/demo/main.go | 
| diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..e24f1be0978f157051a71004853be5a507801aab | 
| --- /dev/null | 
| +++ b/scheduler/appengine/engine/cron/demo/main.go | 
| @@ -0,0 +1,167 @@ | 
| +// Copyright 2017 The LUCI Authors. | 
| +// | 
| +// Licensed under the Apache License, Version 2.0 (the "License"); | 
| +// you may not use this file except in compliance with the License. | 
| +// You may obtain a copy of the License at | 
| +// | 
| +// http://www.apache.org/licenses/LICENSE-2.0 | 
| +// | 
| +// Unless required by applicable law or agreed to in writing, software | 
| +// distributed under the License is distributed on an "AS IS" BASIS, | 
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| +// See the License for the specific language governing permissions and | 
| +// limitations under the License. | 
| + | 
| +// Package demo shows how cron.Machines can be hosted with Datastore and TQ. | 
| +package demo | 
| + | 
| +import ( | 
| + "fmt" | 
| + "net/http" | 
| + "strconv" | 
| + "time" | 
| + | 
| + "golang.org/x/net/context" | 
| + | 
| + "github.com/luci/gae/service/datastore" | 
| + "github.com/luci/gae/service/taskqueue" | 
| + | 
| + "github.com/luci/luci-go/appengine/gaemiddleware" | 
| + "github.com/luci/luci-go/common/clock" | 
| + "github.com/luci/luci-go/common/data/rand/mathrand" | 
| + "github.com/luci/luci-go/common/logging" | 
| + "github.com/luci/luci-go/server/router" | 
| + | 
| + "github.com/luci/luci-go/scheduler/appengine/engine/cron" | 
| + "github.com/luci/luci-go/scheduler/appengine/schedule" | 
| +) | 
| + | 
| +type CronState struct { | 
| + _extra datastore.PropertyMap `gae:"-,extra"` | 
| 
tandrii(chromium)
2017/07/14 18:17:01
what's this for?
 
Vadim Sh.
2017/07/14 18:29:12
This is not strictly necessary in this code...
In
 
tandrii(chromium)
2017/07/14 18:36:08
i guess so, but was confused since this is new fie
 | 
| + | 
| + ID string `gae:"$id"` | 
| + State cron.State `gae:",noindex"` | 
| +} | 
| + | 
| +func (s *CronState) schedule() *schedule.Schedule { | 
| + parsed, err := schedule.Parse(s.ID, 0) | 
| + if err != nil { | 
| + panic(err) | 
| + } | 
| + return parsed | 
| +} | 
| + | 
| +// evolve instantiates cron.Machine, calls the callback and submits emitted | 
| +// actions. | 
| +func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error { | 
| + err := datastore.RunInTransaction(c, func(c context.Context) error { | 
| + entity := CronState{ID: id} | 
| + if err := datastore.Get(c, &entity); err != nil && err != datastore.ErrNoSuchEntity { | 
| + return err | 
| + } | 
| + | 
| + machine := &cron.Machine{ | 
| + Now: clock.Now(c), | 
| + Schedule: entity.schedule(), | 
| + Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | 
| + State: entity.State, | 
| + } | 
| + | 
| + if err := cb(c, machine); err != nil { | 
| + return err | 
| + } | 
| + | 
| + for _, action := range machine.Actions { | 
| + var task *taskqueue.Task | 
| + switch a := action.(type) { | 
| + case cron.TickLaterAction: | 
| + logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) | 
| + task = &taskqueue.Task{ | 
| + Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce), | 
| + ETA: a.When, | 
| + } | 
| + case cron.StartInvocationAction: | 
| + task = &taskqueue.Task{ | 
| + Path: fmt.Sprintf("/invocation/%s", id), | 
| + Delay: time.Second, // give the transaction time to land | 
| 
tandrii(chromium)
2017/07/14 18:17:01
I don't get why 1s is necessary.
Suppose Delay is
 
Vadim Sh.
2017/07/14 18:29:12
This is opportunistic attempt to avoid hitting tra
 
tandrii(chromium)
2017/07/14 18:36:08
Got it. So, if Delay was 0 AND handleInvocation hi
 
Vadim Sh.
2017/07/14 21:57:21
Yes.
 | 
| + } | 
| + default: | 
| + panic("unknown action type") | 
| + } | 
| + if err := taskqueue.Add(c, "default", task); err != nil { | 
| + return err | 
| + } | 
| + } | 
| + | 
| + entity.State = machine.State | 
| + return datastore.Put(c, &entity) | 
| + }, nil) | 
| + | 
| + if err != nil { | 
| + logging.Errorf(c, "FAIL - %s", err) | 
| + } | 
| + return err | 
| +} | 
| + | 
| +func startJob(c context.Context, id string) error { | 
| + return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 
| + // Forcefully restart the chain of tasks. | 
| + m.Disable() | 
| + m.Enable() | 
| + return nil | 
| + }) | 
| +} | 
| + | 
| +func handleTick(c context.Context, id string, nonce int64) error { | 
| + return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 
| + return m.OnTimerTick(nonce) | 
| + }) | 
| +} | 
| + | 
| +func handleInvocation(c context.Context, id string) error { | 
| + logging.Infof(c, "INVOCATION of job %q has finished!", id) | 
| + return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 
| + m.RewindIfNecessary() | 
| + return nil | 
| + }) | 
| +} | 
| + | 
| +func init() { | 
| + r := router.New() | 
| + gaemiddleware.InstallHandlers(r) | 
| + | 
| + // Kick-start a bunch of jobs by visiting: | 
| + // | 
| + // http://localhost:8080/start/with 10s interval | 
| + // http://localhost:8080/start/with 5s interval | 
| + // http://localhost:8080/start/0 * * * * * * * | 
| + // | 
| + // And the look at the logs. | 
| + | 
| + r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | 
| + jobID := c.Params.ByName("JobID") | 
| + if err := startJob(c.Context, jobID); err != nil { | 
| + panic(err) | 
| + } | 
| + }) | 
| + | 
| + r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) { | 
| + jobID := c.Params.ByName("JobID") | 
| + nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64) | 
| + if err != nil { | 
| + panic(err) | 
| + } | 
| + if err := handleTick(c.Context, jobID, nonce); err != nil { | 
| + panic(err) | 
| + } | 
| + }) | 
| + | 
| + r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | 
| + jobID := c.Params.ByName("JobID") | 
| + if err := handleInvocation(c.Context, jobID); err != nil { | 
| + panic(err) | 
| + } | 
| + }) | 
| + | 
| + http.DefaultServeMux.Handle("/", r) | 
| +} |