OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2017 The LUCI Authors. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. | |
16 package demo | |
17 | |
18 import ( | |
19 "fmt" | |
20 "net/http" | |
21 "strconv" | |
22 "time" | |
23 | |
24 "golang.org/x/net/context" | |
25 | |
26 "github.com/luci/gae/service/datastore" | |
27 "github.com/luci/gae/service/taskqueue" | |
28 | |
29 "github.com/luci/luci-go/appengine/gaemiddleware" | |
30 "github.com/luci/luci-go/common/clock" | |
31 "github.com/luci/luci-go/common/data/rand/mathrand" | |
32 "github.com/luci/luci-go/common/logging" | |
33 "github.com/luci/luci-go/server/router" | |
34 | |
35 "github.com/luci/luci-go/scheduler/appengine/engine/cron" | |
36 "github.com/luci/luci-go/scheduler/appengine/schedule" | |
37 ) | |
38 | |
39 type CronState struct { | |
40 _extra datastore.PropertyMap `gae:"-,extra"` | |
tandrii(chromium)
2017/07/14 18:17:01
what's this for?
Vadim Sh.
2017/07/14 18:29:12
This is not strictly necessary in this code...
In
tandrii(chromium)
2017/07/14 18:36:08
i guess so, but was confused since this is new fie
| |
41 | |
42 ID string `gae:"$id"` | |
43 State cron.State `gae:",noindex"` | |
44 } | |
45 | |
46 func (s *CronState) schedule() *schedule.Schedule { | |
47 parsed, err := schedule.Parse(s.ID, 0) | |
48 if err != nil { | |
49 panic(err) | |
50 } | |
51 return parsed | |
52 } | |
53 | |
54 // evolve instantiates cron.Machine, calls the callback and submits emitted | |
55 // actions. | |
56 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { | |
57 err := datastore.RunInTransaction(c, func(c context.Context) error { | |
58 entity := CronState{ID: id} | |
59 if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { | |
60 return err | |
61 } | |
62 | |
63 machine := &cron.Machine{ | |
64 Now: clock.Now(c), | |
65 Schedule: entity.schedule(), | |
66 Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
67 State: entity.State, | |
68 } | |
69 | |
70 if err := cb(c, machine); err != nil { | |
71 return err | |
72 } | |
73 | |
74 for _, action := range machine.Actions { | |
75 var task *taskqueue.Task | |
76 switch a := action.(type) { | |
77 case cron.TickLaterAction: | |
78 logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) | |
79 task = &taskqueue.Task{ | |
80 Path: fmt.Sprintf("/tick/%s/%d", id, a.T ickNonce), | |
81 ETA: a.When, | |
82 } | |
83 case cron.StartInvocationAction: | |
84 task = &taskqueue.Task{ | |
85 Path: fmt.Sprintf("/invocation/%s", id) , | |
86 Delay: time.Second, // give the transact ion time to land | |
tandrii(chromium)
2017/07/14 18:17:01
I don't get why 1s is necessary.
Suppose Delay is
Vadim Sh.
2017/07/14 18:29:12
This is opportunistic attempt to avoid hitting tra
tandrii(chromium)
2017/07/14 18:36:08
Got it. So, if Delay was 0 AND handleInvocation hi
Vadim Sh.
2017/07/14 21:57:21
Yes.
| |
87 } | |
88 default: | |
89 panic("unknown action type") | |
90 } | |
91 if err := taskqueue.Add(c, "default", task); err != nil { | |
92 return err | |
93 } | |
94 } | |
95 | |
96 entity.State = machine.State | |
97 return datastore.Put(c, &entity) | |
98 }, nil) | |
99 | |
100 if err != nil { | |
101 logging.Errorf(c, "FAIL - %s", err) | |
102 } | |
103 return err | |
104 } | |
105 | |
106 func startJob(c context.Context, id string) error { | |
107 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | |
108 // Forcefully restart the chain of tasks. | |
109 m.Disable() | |
110 m.Enable() | |
111 return nil | |
112 }) | |
113 } | |
114 | |
115 func handleTick(c context.Context, id string, nonce int64) error { | |
116 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | |
117 return m.OnTimerTick(nonce) | |
118 }) | |
119 } | |
120 | |
121 func handleInvocation(c context.Context, id string) error { | |
122 logging.Infof(c, "INVOCATION of job %q has finished!", id) | |
123 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | |
124 m.RewindIfNecessary() | |
125 return nil | |
126 }) | |
127 } | |
128 | |
129 func init() { | |
130 r := router.New() | |
131 gaemiddleware.InstallHandlers(r) | |
132 | |
133 // Kick-start a bunch of jobs by visiting: | |
134 // | |
135 // http://localhost:8080/start/with 10s interval | |
136 // http://localhost:8080/start/with 5s interval | |
137 // http://localhost:8080/start/0 * * * * * * * | |
138 // | |
139 // And the look at the logs. | |
140 | |
141 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | |
142 jobID := c.Params.ByName("JobID") | |
143 if err := startJob(c.Context, jobID); err != nil { | |
144 panic(err) | |
145 } | |
146 }) | |
147 | |
148 r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *rout er.Context) { | |
149 jobID := c.Params.ByName("JobID") | |
150 nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64) | |
151 if err != nil { | |
152 panic(err) | |
153 } | |
154 if err := handleTick(c.Context, jobID, nonce); err != nil { | |
155 panic(err) | |
156 } | |
157 }) | |
158 | |
159 r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Co ntext) { | |
160 jobID := c.Params.ByName("JobID") | |
161 if err := handleInvocation(c.Context, jobID); err != nil { | |
162 panic(err) | |
163 } | |
164 }) | |
165 | |
166 http.DefaultServeMux.Handle("/", r) | |
167 } | |
OLD | NEW |