OLD | NEW |
---|---|
1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. | 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
16 package demo | 16 package demo |
17 | 17 |
18 import ( | 18 import ( |
19 "fmt" | |
19 "net/http" | 20 "net/http" |
21 "strconv" | |
22 "sync" | |
20 "time" | 23 "time" |
21 | 24 |
22 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
23 | 26 |
24 "github.com/golang/protobuf/proto" | 27 "github.com/golang/protobuf/proto" |
25 "github.com/luci/gae/service/datastore" | 28 "github.com/luci/gae/service/datastore" |
29 "github.com/luci/gae/service/info" | |
26 | 30 |
27 "github.com/luci/luci-go/appengine/gaemiddleware" | 31 "github.com/luci/luci-go/appengine/gaemiddleware" |
32 "github.com/luci/luci-go/appengine/memlock" | |
28 "github.com/luci/luci-go/common/clock" | 33 "github.com/luci/luci-go/common/clock" |
29 "github.com/luci/luci-go/common/data/rand/mathrand" | 34 "github.com/luci/luci-go/common/data/rand/mathrand" |
30 "github.com/luci/luci-go/common/logging" | 35 "github.com/luci/luci-go/common/logging" |
36 "github.com/luci/luci-go/common/retry/transient" | |
31 "github.com/luci/luci-go/server/router" | 37 "github.com/luci/luci-go/server/router" |
32 | 38 |
33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" | 39 "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
40 "github.com/luci/luci-go/scheduler/appengine/engine/dsset" | |
34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" | 41 "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" | 42 "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
36 "github.com/luci/luci-go/scheduler/appengine/schedule" | 43 "github.com/luci/luci-go/scheduler/appengine/schedule" |
37 ) | 44 ) |
38 | 45 |
39 var tasks = tq.Dispatcher{} | 46 var dispatcher = tq.Dispatcher{} |
40 | 47 |
41 type CronState struct { | 48 type CronState struct { |
42 _extra datastore.PropertyMap `gae:"-,extra"` | 49 _extra datastore.PropertyMap `gae:"-,extra"` |
43 | 50 |
44 ID string `gae:"$id"` | 51 ID string `gae:"$id"` |
45 State cron.State `gae:",noindex"` | 52 State cron.State `gae:",noindex"` |
53 | |
54 NextInvID int64 `gae:",noindex"` | |
55 RunningSet []int64 `gae:",noindex"` | |
46 } | 56 } |
47 | 57 |
48 func (s *CronState) schedule() *schedule.Schedule { | 58 func (s *CronState) schedule() *schedule.Schedule { |
49 parsed, err := schedule.Parse(s.ID, 0) | 59 parsed, err := schedule.Parse(s.ID, 0) |
50 if err != nil { | 60 if err != nil { |
51 panic(err) | 61 panic(err) |
52 } | 62 } |
53 return parsed | 63 return parsed |
54 } | 64 } |
55 | 65 |
56 // evolve instantiates cron.Machine, calls the callback and submits emitted | 66 func pendingTriggersSet(c context.Context, jobID string) *dsset.Set { |
57 // actions. | 67 » return &dsset.Set{ |
68 » » ID: "triggers:" + jobID, | |
69 » » ShardCount: 8, | |
70 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
71 » » TombstonesDelay: 3 * time.Minute, | |
72 » } | |
73 } | |
74 | |
75 func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set { | |
76 » return &dsset.Set{ | |
77 » » ID: "finished:" + jobID, | |
78 » » ShardCount: 8, | |
79 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
80 » » TombstonesDelay: 3 * time.Minute, | |
81 » } | |
82 } | |
83 | |
84 func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error { | |
85 » machine := &cron.Machine{ | |
86 » » Now: clock.Now(c), | |
87 » » Schedule: entity.schedule(), | |
88 » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
89 » » State: entity.State, | |
90 » } | |
91 | |
92 » if err := cb(c, machine); err != nil { | |
93 » » return err | |
94 » } | |
95 | |
96 » tasks := []*tq.Task{} | |
97 » for _, action := range machine.Actions { | |
98 » » switch a := action.(type) { | |
99 » » case cron.TickLaterAction: | |
100 » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNo nce, a.When.Sub(time.Now())) | |
101 » » » tasks = append(tasks, &tq.Task{ | |
102 » » » » Payload: &internal.TickLaterTask{JobId: entity.I D, TickNonce: a.TickNonce}, | |
103 » » » » ETA: a.When, | |
104 » » » }) | |
105 » » case cron.StartInvocationAction: | |
106 » » » tasks = append(tasks, &tq.Task{ | |
107 » » » » Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()}, | |
108 » » » }) | |
109 » » default: | |
110 » » » panic("unknown action type") | |
111 » » } | |
112 » } | |
113 » if err := dispatcher.AddTasks(c, tasks); err != nil { | |
114 » » return err | |
115 » } | |
116 | |
117 » entity.State = machine.State | |
118 » return nil | |
119 } | |
120 | |
58 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { | 121 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { |
59 err := datastore.RunInTransaction(c, func(c context.Context) error { | 122 err := datastore.RunInTransaction(c, func(c context.Context) error { |
60 » » entity := CronState{ID: id} | 123 » » entity := &CronState{ID: id} |
61 » » if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { | 124 » » if err := datastore.Get(c, entity); err != nil && err != datasto re.ErrNoSuchEntity { |
62 return err | 125 return err |
63 } | 126 } |
64 | 127 » » if err := pokeMachine(c, entity, cb); err != nil { |
65 » » machine := &cron.Machine{ | |
66 » » » Now: clock.Now(c), | |
67 » » » Schedule: entity.schedule(), | |
68 » » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
69 » » » State: entity.State, | |
70 » » } | |
71 | |
72 » » if err := cb(c, machine); err != nil { | |
73 return err | 128 return err |
74 } | 129 } |
75 | 130 » » return datastore.Put(c, entity) |
76 » » for _, action := range machine.Actions { | |
77 » » » var task tq.Task | |
78 » » » switch a := action.(type) { | |
79 » » » case cron.TickLaterAction: | |
80 » » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) | |
81 » » » » task = tq.Task{ | |
82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, | |
83 » » » » » ETA: a.When, | |
84 » » » » } | |
85 » » » case cron.StartInvocationAction: | |
86 » » » » task = tq.Task{ | |
87 » » » » » Payload: &internal.StartInvocationTask{J obId: id}, | |
88 » » » » » Delay: time.Second, // give the transa ction time to land | |
89 » » » » } | |
90 » » » default: | |
91 » » » » panic("unknown action type") | |
92 » » » } | |
93 » » » if err := tasks.AddTask(c, &task); err != nil { | |
94 » » » » return err | |
95 » » » } | |
96 » » } | |
97 | |
98 » » entity.State = machine.State | |
99 » » return datastore.Put(c, &entity) | |
100 }, nil) | 131 }, nil) |
101 | 132 » return transient.Tag.Apply(err) |
102 » if err != nil { | |
103 » » logging.Errorf(c, "FAIL - %s", err) | |
104 » } | |
105 » return err | |
106 } | 133 } |
107 | 134 |
108 func startJob(c context.Context, id string) error { | 135 func startJob(c context.Context, id string) error { |
109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 136 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
110 // Forcefully restart the chain of tasks. | 137 // Forcefully restart the chain of tasks. |
111 m.Disable() | 138 m.Disable() |
112 m.Enable() | 139 m.Enable() |
113 return nil | 140 return nil |
114 }) | 141 }) |
115 } | 142 } |
116 | 143 |
144 func addTrigger(c context.Context, jobID, triggerID string) error { | |
145 logging.Infof(c, "Triggering %q - %q", jobID, triggerID) | |
146 | |
147 // Add the trigger request to the pending set. | |
148 if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerI D}}); err != nil { | |
149 return err | |
150 } | |
151 | |
152 // Run a task that examines the pending set and makes decisions. | |
153 return kickTriageTask(c, jobID) | |
154 } | |
155 | |
156 func kickTriageTask(c context.Context, jobID string) error { | |
157 // Throttle to once per 2 sec (and make sure it is always in the future) . | |
158 eta := clock.Now(c).Unix() | |
159 eta = (eta/2 + 1) * 2 | |
160 return dispatcher.AddTask(c, &tq.Task{ | |
161 DeduplicationKey: fmt.Sprintf("triage:%s:%d", jobID, eta), | |
162 ETA: time.Unix(eta, 0), | |
163 Payload: &internal.TriageTriggersTask{JobId: jobID}, | |
164 }) | |
165 } | |
166 | |
117 func handleTick(c context.Context, task proto.Message, execCount int) error { | 167 func handleTick(c context.Context, task proto.Message, execCount int) error { |
118 msg := task.(*internal.TickLaterTask) | 168 msg := task.(*internal.TickLaterTask) |
119 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 169 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { |
120 return m.OnTimerTick(msg.TickNonce) | 170 return m.OnTimerTick(msg.TickNonce) |
121 }) | 171 }) |
122 } | 172 } |
123 | 173 |
124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r { | 174 func handleTrigger(c context.Context, task proto.Message, execCount int) error { |
125 » msg := task.(*internal.StartInvocationTask) | 175 » msg := task.(*internal.TriggerInvocationTask) |
126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) | 176 » return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId)) |
127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 177 } |
128 » » m.RewindIfNecessary() | 178 |
129 » » return nil | 179 func handleTriage(c context.Context, task proto.Message, execCount int) error { |
180 » msg := task.(*internal.TriageTriggersTask) | |
181 » logging.Infof(c, "Triaging requests for %q", msg.JobId) | |
182 | |
183 » err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c) , func(context.Context) error { | |
184 » » logging.Infof(c, "Got the lock!") | |
185 » » return runTriage(c, msg.JobId) | |
130 }) | 186 }) |
187 return transient.Tag.Apply(err) | |
188 } | |
189 | |
190 func runTriage(c context.Context, jobID string) error { | |
tandrii(chromium)
2017/07/21 13:42:52
Do IIC that this is each trigger job could also ch
Vadim Sh.
2017/07/24 00:30:15
Only if it's fine with using stale data. It can't
tandrii(chromium)
2017/07/31 18:58:45
Ah, so the policy is part of the triggerED job, no
| |
191 wg := sync.WaitGroup{} | |
192 wg.Add(2) | |
193 | |
194 var triggers []dsset.Item | |
195 var triggerTombs []dsset.Tombstone | |
196 var triggerErr error | |
197 | |
198 // Grab all pending requests (and stuff to cleanup). | |
199 triggersSet := pendingTriggersSet(c, jobID) | |
200 go func() { | |
201 defer wg.Done() | |
202 triggers, triggerTombs, triggerErr = triggersSet.List(c) | |
203 if triggerErr == nil { | |
204 logging.Infof(c, "Triggers: %d items, %d tombs to cleanu p", len(triggers), len(triggerTombs)) | |
205 triggerErr = triggersSet.CleanupStorage(c, triggerTombs) | |
206 } | |
207 }() | |
208 | |
209 var finished []dsset.Item | |
210 var finishedTombs []dsset.Tombstone | |
211 var finishedErr error | |
212 | |
213 // Same for recently finished invocations. | |
214 finishedSet := recentlyFinishedSet(c, jobID) | |
215 go func() { | |
216 wg.Done() | |
217 finished, finishedTombs, finishedErr = finishedSet.List(c) | |
218 if finishedErr == nil { | |
219 logging.Infof(c, "Finished: %d items, %d tombs to cleanu p", len(finished), len(finishedTombs)) | |
220 finishedErr = finishedSet.CleanupStorage(c, finishedTomb s) | |
221 } | |
222 }() | |
223 | |
224 // Do the cleanups first. | |
225 wg.Wait() | |
226 if triggerErr != nil { | |
227 return triggerErr | |
228 } | |
229 if finishedErr != nil { | |
230 return finishedErr | |
231 } | |
232 | |
233 err := datastore.RunInTransaction(c, func(c context.Context) error { | |
234 state := &CronState{ID: jobID} | |
235 if err := datastore.Get(c, state); err != nil && err != datastor e.ErrNoSuchEntity { | |
236 return err | |
237 } | |
238 | |
239 // Tidy RunningSet by removing all recently finished invocations . | |
240 if len(finished) != 0 || len(finishedTombs) != 0 { | |
241 op, err := finishedSet.BeginPop(c) | |
242 if err != nil { | |
243 return err | |
244 } | |
245 op.CleanupTombstones(finishedTombs) | |
246 | |
247 reallyFinished := map[int64]struct{}{} | |
248 for _, itm := range finished { | |
249 if op.Pop(itm.ID) { | |
250 id, _ := strconv.ParseInt(itm.ID, 10, 64 ) | |
251 reallyFinished[id] = struct{}{} | |
252 } | |
253 } | |
254 | |
255 if err := op.Submit(); err != nil { | |
256 return err | |
257 } | |
258 | |
259 filtered := []int64{} | |
260 for _, id := range state.RunningSet { | |
261 if _, yep := reallyFinished[id]; yep { | |
262 logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id) | |
263 } else { | |
264 filtered = append(filtered, id) | |
265 } | |
266 } | |
267 state.RunningSet = filtered | |
268 } | |
269 | |
270 // Launch new invocations for each pending trigger. | |
271 | |
272 op, err := triggersSet.BeginPop(c) | |
273 if err != nil { | |
274 return err | |
275 } | |
276 op.CleanupTombstones(triggerTombs) | |
277 | |
278 batch := internal.LaunchInvocationsBatchTask{JobId: state.ID} | |
279 for _, trigger := range triggers { | |
280 if op.Pop(trigger.ID) { | |
281 logging.Infof(c, "Launching new launch-%d for tr igger %s", state.NextInvID, trigger.ID) | |
282 state.RunningSet = append(state.RunningSet, stat e.NextInvID) | |
283 batch.InvId = append(batch.InvId, state.NextInvI D) | |
284 state.NextInvID++ | |
285 } | |
286 } | |
287 | |
288 if err := op.Submit(); err != nil { | |
289 return err | |
290 } | |
291 | |
292 // Transactionally trigger a batch with new invocations. | |
293 if len(batch.InvId) != 0 { | |
294 if err := dispatcher.AddTask(c, &tq.Task{Payload: &batch }); err != nil { | |
295 return err | |
296 } | |
297 } | |
298 | |
299 logging.Infof(c, "Running invocations - %v", state.RunningSet) | |
300 | |
301 // If nothing is running, poke the cron machine. Maybe it wants to start | |
302 // something. | |
303 if len(state.RunningSet) == 0 { | |
304 err := pokeMachine(c, state, func(c context.Context, m * cron.Machine) error { | |
305 m.RewindIfNecessary() | |
306 return nil | |
307 }) | |
308 if err != nil { | |
309 return err | |
310 } | |
311 } | |
312 | |
313 // Done! | |
314 return datastore.Put(c, state) | |
315 }, nil) | |
316 return transient.Tag.Apply(err) | |
317 } | |
318 | |
319 func handleBatchLaunch(c context.Context, task proto.Message, execCount int) err or { | |
320 msg := task.(*internal.LaunchInvocationsBatchTask) | |
321 logging.Infof(c, "Batch launch for %q", msg.JobId) | |
322 | |
323 tasks := []*tq.Task{} | |
324 for _, invId := range msg.InvId { | |
325 logging.Infof(c, "Launching inv-%d", invId) | |
326 tasks = append(tasks, &tq.Task{ | |
327 DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, in vId), | |
328 Payload: &internal.LaunchInvocationTask{ | |
329 JobId: msg.JobId, | |
330 InvId: invId, | |
331 }, | |
332 }) | |
333 } | |
334 | |
335 return dispatcher.AddTasks(c, tasks) | |
336 } | |
337 | |
338 func handleLaunchTask(c context.Context, task proto.Message, execCount int) erro r { | |
339 msg := task.(*internal.LaunchInvocationTask) | |
340 logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvI d) | |
341 | |
342 // There can be more stuff here. But we just finish the invocation right away. | |
343 | |
344 finishedSet := recentlyFinishedSet(c, msg.JobId) | |
345 err := finishedSet.Add(c, []dsset.Item{ | |
346 {ID: fmt.Sprintf("%d", msg.InvId)}, | |
347 }) | |
348 if err != nil { | |
349 return err | |
350 } | |
351 | |
352 // Kick the triage now that the set of running invocations has been modi fied. | |
353 return kickTriageTask(c, msg.JobId) | |
131 } | 354 } |
132 | 355 |
133 func init() { | 356 func init() { |
134 r := router.New() | 357 r := router.New() |
135 gaemiddleware.InstallHandlers(r) | 358 gaemiddleware.InstallHandlers(r) |
136 | 359 |
137 » tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil ) | 360 » dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil) |
138 » tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil) | 361 » dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger , "triggers", nil) |
139 » tasks.InstallRoutes(r, gaemiddleware.BaseProd()) | 362 » dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "t riages", nil) |
363 » dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBa tchLaunch, "launches", nil) | |
364 » dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTa sk, "invocations", nil) | |
365 » dispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) | |
140 | 366 |
141 // Kick-start a bunch of jobs by visiting: | 367 // Kick-start a bunch of jobs by visiting: |
142 // | 368 // |
143 // http://localhost:8080/start/with 10s interval | 369 // http://localhost:8080/start/with 10s interval |
144 // http://localhost:8080/start/with 5s interval | 370 // http://localhost:8080/start/with 5s interval |
145 // http://localhost:8080/start/0 * * * * * * * | 371 // http://localhost:8080/start/0 * * * * * * * |
146 // | 372 // |
147 // And the look at the logs. | 373 // And the look at the logs. |
148 | 374 |
149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | 375 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
150 jobID := c.Params.ByName("JobID") | 376 jobID := c.Params.ByName("JobID") |
151 if err := startJob(c.Context, jobID); err != nil { | 377 if err := startJob(c.Context, jobID); err != nil { |
152 panic(err) | 378 panic(err) |
153 } | 379 } |
154 }) | 380 }) |
155 | 381 |
382 r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Contex t) { | |
383 jobID := c.Params.ByName("JobID") | |
384 triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixN ano()) | |
385 if err := addTrigger(c.Context, jobID, triggerID); err != nil { | |
386 panic(err) | |
387 } | |
388 }) | |
389 | |
156 http.DefaultServeMux.Handle("/", r) | 390 http.DefaultServeMux.Handle("/", r) |
157 } | 391 } |
OLD | NEW |