Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Side by Side Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2980943002: Add cron.Machine state machine. (Closed)
Patch Set: add-cron-machine-construct Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ.
16 package demo
17
18 import (
19 "fmt"
20 "net/http"
21 "strconv"
22 "time"
23
24 "golang.org/x/net/context"
25
26 "github.com/luci/gae/service/datastore"
27 "github.com/luci/gae/service/taskqueue"
28
29 "github.com/luci/luci-go/appengine/gaemiddleware"
30 "github.com/luci/luci-go/common/clock"
31 "github.com/luci/luci-go/common/data/rand/mathrand"
32 "github.com/luci/luci-go/common/logging"
33 "github.com/luci/luci-go/server/router"
34
35 "github.com/luci/luci-go/scheduler/appengine/engine/cron"
36 "github.com/luci/luci-go/scheduler/appengine/schedule"
37 )
38
39 type CronState struct {
40 _extra datastore.PropertyMap `gae:"-,extra"`
tandrii(chromium) 2017/07/14 18:17:01 what's this for?
Vadim Sh. 2017/07/14 18:29:12 This is not strictly necessary in this code... In
tandrii(chromium) 2017/07/14 18:36:08 i guess so, but was confused since this is new fie
41
42 ID string `gae:"$id"`
43 State cron.State `gae:",noindex"`
44 }
45
46 func (s *CronState) schedule() *schedule.Schedule {
47 parsed, err := schedule.Parse(s.ID, 0)
48 if err != nil {
49 panic(err)
50 }
51 return parsed
52 }
53
54 // evolve instantiates cron.Machine, calls the callback and submits emitted
55 // actions.
56 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error {
57 err := datastore.RunInTransaction(c, func(c context.Context) error {
58 entity := CronState{ID: id}
59 if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity {
60 return err
61 }
62
63 machine := &cron.Machine{
64 Now: clock.Now(c),
65 Schedule: entity.schedule(),
66 Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
67 State: entity.State,
68 }
69
70 if err := cb(c, machine); err != nil {
71 return err
72 }
73
74 for _, action := range machine.Actions {
75 var task *taskqueue.Task
76 switch a := action.(type) {
77 case cron.TickLaterAction:
78 logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
79 task = &taskqueue.Task{
80 Path: fmt.Sprintf("/tick/%s/%d", id, a.T ickNonce),
81 ETA: a.When,
82 }
83 case cron.StartInvocationAction:
84 task = &taskqueue.Task{
85 Path: fmt.Sprintf("/invocation/%s", id) ,
86 Delay: time.Second, // give the transact ion time to land
tandrii(chromium) 2017/07/14 18:17:01 I don't get why 1s is necessary. Suppose Delay is
Vadim Sh. 2017/07/14 18:29:12 This is opportunistic attempt to avoid hitting tra
tandrii(chromium) 2017/07/14 18:36:08 Got it. So, if Delay was 0 AND handleInvocation hi
Vadim Sh. 2017/07/14 21:57:21 Yes.
87 }
88 default:
89 panic("unknown action type")
90 }
91 if err := taskqueue.Add(c, "default", task); err != nil {
92 return err
93 }
94 }
95
96 entity.State = machine.State
97 return datastore.Put(c, &entity)
98 }, nil)
99
100 if err != nil {
101 logging.Errorf(c, "FAIL - %s", err)
102 }
103 return err
104 }
105
106 func startJob(c context.Context, id string) error {
107 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
108 // Forcefully restart the chain of tasks.
109 m.Disable()
110 m.Enable()
111 return nil
112 })
113 }
114
115 func handleTick(c context.Context, id string, nonce int64) error {
116 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
117 return m.OnTimerTick(nonce)
118 })
119 }
120
121 func handleInvocation(c context.Context, id string) error {
122 logging.Infof(c, "INVOCATION of job %q has finished!", id)
123 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
124 m.RewindIfNecessary()
125 return nil
126 })
127 }
128
129 func init() {
130 r := router.New()
131 gaemiddleware.InstallHandlers(r)
132
133 // Kick-start a bunch of jobs by visiting:
134 //
135 // http://localhost:8080/start/with 10s interval
136 // http://localhost:8080/start/with 5s interval
137 // http://localhost:8080/start/0 * * * * * * *
138 //
139 // And the look at the logs.
140
141 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
142 jobID := c.Params.ByName("JobID")
143 if err := startJob(c.Context, jobID); err != nil {
144 panic(err)
145 }
146 })
147
148 r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *rout er.Context) {
149 jobID := c.Params.ByName("JobID")
150 nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
151 if err != nil {
152 panic(err)
153 }
154 if err := handleTick(c.Context, jobID, nonce); err != nil {
155 panic(err)
156 }
157 })
158
159 r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Co ntext) {
160 jobID := c.Params.ByName("JobID")
161 if err := handleInvocation(c.Context, jobID); err != nil {
162 panic(err)
163 }
164 })
165
166 http.DefaultServeMux.Handle("/", r)
167 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698