OLD | NEW |
---|---|
1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. | 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
16 package demo | 16 package demo |
17 | 17 |
18 import ( | 18 import ( |
19 "fmt" | |
19 "net/http" | 20 "net/http" |
21 "strconv" | |
22 "sync" | |
20 "time" | 23 "time" |
21 | 24 |
22 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
23 | 26 |
24 "github.com/golang/protobuf/proto" | 27 "github.com/golang/protobuf/proto" |
25 "github.com/luci/gae/service/datastore" | 28 "github.com/luci/gae/service/datastore" |
29 "github.com/luci/gae/service/info" | |
30 "github.com/luci/gae/service/memcache" | |
26 | 31 |
27 "github.com/luci/luci-go/appengine/gaemiddleware" | 32 "github.com/luci/luci-go/appengine/gaemiddleware" |
33 "github.com/luci/luci-go/appengine/memlock" | |
28 "github.com/luci/luci-go/common/clock" | 34 "github.com/luci/luci-go/common/clock" |
29 "github.com/luci/luci-go/common/data/rand/mathrand" | 35 "github.com/luci/luci-go/common/data/rand/mathrand" |
30 "github.com/luci/luci-go/common/logging" | 36 "github.com/luci/luci-go/common/logging" |
37 "github.com/luci/luci-go/common/retry/transient" | |
31 "github.com/luci/luci-go/server/router" | 38 "github.com/luci/luci-go/server/router" |
32 | 39 |
33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" | 40 "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
41 "github.com/luci/luci-go/scheduler/appengine/engine/dsset" | |
34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" | 42 "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" | 43 "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
36 "github.com/luci/luci-go/scheduler/appengine/schedule" | 44 "github.com/luci/luci-go/scheduler/appengine/schedule" |
37 ) | 45 ) |
38 | 46 |
39 var tasks = tq.Dispatcher{} | 47 var dispatcher = tq.Dispatcher{} |
40 | 48 |
41 type CronState struct { | 49 type CronState struct { |
42 _extra datastore.PropertyMap `gae:"-,extra"` | 50 _extra datastore.PropertyMap `gae:"-,extra"` |
43 | 51 |
44 ID string `gae:"$id"` | 52 ID string `gae:"$id"` |
45 State cron.State `gae:",noindex"` | 53 State cron.State `gae:",noindex"` |
54 | |
55 NextInvID int64 `gae:",noindex"` | |
56 RunningSet []int64 `gae:",noindex"` | |
46 } | 57 } |
47 | 58 |
48 func (s *CronState) schedule() *schedule.Schedule { | 59 func (s *CronState) schedule() *schedule.Schedule { |
49 parsed, err := schedule.Parse(s.ID, 0) | 60 parsed, err := schedule.Parse(s.ID, 0) |
50 if err != nil { | 61 if err != nil { |
51 panic(err) | 62 panic(err) |
52 } | 63 } |
53 return parsed | 64 return parsed |
54 } | 65 } |
55 | 66 |
56 // evolve instantiates cron.Machine, calls the callback and submits emitted | 67 func pendingTriggersSet(c context.Context, jobID string) *dsset.Set { |
57 // actions. | 68 » return &dsset.Set{ |
69 » » ID: "triggers:" + jobID, | |
70 » » ShardCount: 8, | |
71 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
72 » » TombstonesDelay: 15 * time.Minute, | |
73 » } | |
74 } | |
75 | |
76 func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set { | |
77 » return &dsset.Set{ | |
78 » » ID: "finished:" + jobID, | |
79 » » ShardCount: 8, | |
80 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), | |
81 » » TombstonesDelay: 15 * time.Minute, | |
82 » } | |
83 } | |
84 | |
85 func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error { | |
86 » machine := &cron.Machine{ | |
87 » » Now: clock.Now(c), | |
88 » » Schedule: entity.schedule(), | |
89 » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
90 » » State: entity.State, | |
91 » } | |
92 | |
93 » if err := cb(c, machine); err != nil { | |
94 » » return err | |
95 » } | |
96 | |
97 » tasks := []*tq.Task{} | |
98 » for _, action := range machine.Actions { | |
99 » » switch a := action.(type) { | |
100 » » case cron.TickLaterAction: | |
101 » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNo nce, a.When.Sub(time.Now())) | |
102 » » » tasks = append(tasks, &tq.Task{ | |
103 » » » » Payload: &internal.TickLaterTask{JobId: entity.I D, TickNonce: a.TickNonce}, | |
104 » » » » ETA: a.When, | |
105 » » » }) | |
106 » » case cron.StartInvocationAction: | |
107 » » » tasks = append(tasks, &tq.Task{ | |
108 » » » » Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()}, | |
109 » » » }) | |
110 » » default: | |
111 » » » panic("unknown action type") | |
112 » » } | |
113 » } | |
114 » if err := dispatcher.AddTasks(c, tasks); err != nil { | |
115 » » return err | |
116 » } | |
117 | |
118 » entity.State = machine.State | |
119 » return nil | |
120 } | |
121 | |
58 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { | 122 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { |
59 err := datastore.RunInTransaction(c, func(c context.Context) error { | 123 err := datastore.RunInTransaction(c, func(c context.Context) error { |
60 » » entity := CronState{ID: id} | 124 » » entity := &CronState{ID: id} |
tandrii(chromium)
2017/07/31 18:58:46
btw, why this change? I think entity object should
| |
61 » » if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { | 125 » » if err := datastore.Get(c, entity); err != nil && err != datasto re.ErrNoSuchEntity { |
62 return err | 126 return err |
63 } | 127 } |
64 | 128 » » if err := pokeMachine(c, entity, cb); err != nil { |
65 » » machine := &cron.Machine{ | |
66 » » » Now: clock.Now(c), | |
67 » » » Schedule: entity.schedule(), | |
68 » » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, | |
69 » » » State: entity.State, | |
70 » » } | |
71 | |
72 » » if err := cb(c, machine); err != nil { | |
73 return err | 129 return err |
74 } | 130 } |
75 | 131 » » return datastore.Put(c, entity) |
76 » » for _, action := range machine.Actions { | |
77 » » » var task tq.Task | |
78 » » » switch a := action.(type) { | |
79 » » » case cron.TickLaterAction: | |
80 » » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) | |
81 » » » » task = tq.Task{ | |
82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, | |
83 » » » » » ETA: a.When, | |
84 » » » » } | |
85 » » » case cron.StartInvocationAction: | |
86 » » » » task = tq.Task{ | |
87 » » » » » Payload: &internal.StartInvocationTask{J obId: id}, | |
88 » » » » » Delay: time.Second, // give the transa ction time to land | |
89 » » » » } | |
90 » » » default: | |
91 » » » » panic("unknown action type") | |
92 » » » } | |
93 » » » if err := tasks.AddTask(c, &task); err != nil { | |
94 » » » » return err | |
95 » » » } | |
96 » » } | |
97 | |
98 » » entity.State = machine.State | |
99 » » return datastore.Put(c, &entity) | |
100 }, nil) | 132 }, nil) |
101 | 133 » return transient.Tag.Apply(err) |
102 » if err != nil { | |
103 » » logging.Errorf(c, "FAIL - %s", err) | |
104 » } | |
105 » return err | |
106 } | 134 } |
107 | 135 |
108 func startJob(c context.Context, id string) error { | 136 func startJob(c context.Context, id string) error { |
109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { | 137 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
110 // Forcefully restart the chain of tasks. | 138 // Forcefully restart the chain of tasks. |
111 m.Disable() | 139 m.Disable() |
112 m.Enable() | 140 m.Enable() |
113 return nil | 141 return nil |
114 }) | 142 }) |
115 } | 143 } |
116 | 144 |
145 func addTrigger(c context.Context, jobID, triggerID string) error { | |
146 logging.Infof(c, "Triggering %q - %q", jobID, triggerID) | |
147 | |
148 // Add the trigger request to the pending set. | |
149 if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerI D}}); err != nil { | |
150 return err | |
151 } | |
152 | |
153 // Run a task that examines the pending set and makes decisions. | |
154 return kickTriageTask(c, jobID) | |
155 } | |
156 | |
157 func kickTriageTask(c context.Context, jobID string) error { | |
158 // Throttle to once per 2 sec (and make sure it is always in the future) . | |
159 eta := clock.Now(c).Unix() | |
160 eta = (eta/2 + 1) * 2 | |
161 dedupKey := fmt.Sprintf("triage:%s:%d", jobID, eta) | |
162 | |
163 // Use cheaper but crappier memcache as a first guard. | |
164 itm := memcache.NewItem(c, dedupKey).SetExpiration(time.Minute) | |
165 if memcache.Get(c, itm) == nil { | |
166 logging.Infof(c, "The triage task has already been scheduled") | |
167 return nil // already added! | |
168 } | |
169 | |
170 err := dispatcher.AddTask(c, &tq.Task{ | |
171 DeduplicationKey: dedupKey, | |
172 ETA: time.Unix(eta, 0), | |
173 Payload: &internal.TriageTriggersTask{JobId: jobID}, | |
174 }) | |
175 if err != nil { | |
176 return err | |
177 } | |
178 logging.Infof(c, "Scheduled the triage task") | |
179 | |
180 // Best effort in setting memcache flag. No big deal if it fails. | |
181 memcache.Set(c, itm) | |
182 return nil | |
183 } | |
184 | |
117 func handleTick(c context.Context, task proto.Message, execCount int) error { | 185 func handleTick(c context.Context, task proto.Message, execCount int) error { |
118 msg := task.(*internal.TickLaterTask) | 186 msg := task.(*internal.TickLaterTask) |
119 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 187 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { |
120 return m.OnTimerTick(msg.TickNonce) | 188 return m.OnTimerTick(msg.TickNonce) |
121 }) | 189 }) |
122 } | 190 } |
123 | 191 |
124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r { | 192 func handleTrigger(c context.Context, task proto.Message, execCount int) error { |
125 » msg := task.(*internal.StartInvocationTask) | 193 » msg := task.(*internal.TriggerInvocationTask) |
126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) | 194 » return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId)) |
127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { | 195 } |
128 » » m.RewindIfNecessary() | 196 |
129 » » return nil | 197 func handleTriage(c context.Context, task proto.Message, execCount int) error { |
198 » msg := task.(*internal.TriageTriggersTask) | |
199 » logging.Infof(c, "Triaging requests for %q", msg.JobId) | |
200 | |
201 » err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c) , func(context.Context) error { | |
202 » » logging.Infof(c, "Got the lock!") | |
203 » » return runTriage(c, msg.JobId) | |
130 }) | 204 }) |
205 return transient.Tag.Apply(err) | |
206 } | |
207 | |
208 func runTriage(c context.Context, jobID string) error { | |
209 wg := sync.WaitGroup{} | |
210 wg.Add(2) | |
211 | |
212 var triggersList *dsset.Listing | |
213 var triggersErr error | |
214 | |
215 var finishedList *dsset.Listing | |
216 var finishedErr error | |
217 | |
218 // Grab all pending requests (and stuff to cleanup). | |
219 triggersSet := pendingTriggersSet(c, jobID) | |
220 go func() { | |
221 defer wg.Done() | |
222 triggersList, triggersErr = triggersSet.List(c) | |
223 if triggersErr == nil { | |
224 logging.Infof(c, "Triggers: %d items, %d tombs to cleanu p", | |
225 len(triggersList.Items), len(triggersList.Tombst ones)) | |
226 } | |
227 }() | |
228 | |
229 // Same for recently finished invocations. | |
230 finishedSet := recentlyFinishedSet(c, jobID) | |
231 go func() { | |
232 defer wg.Done() | |
233 finishedList, finishedErr = finishedSet.List(c) | |
234 if finishedErr == nil { | |
235 logging.Infof(c, "Finished: %d items, %d tombs to cleanu p", | |
236 len(finishedList.Items), len(finishedList.Tombst ones)) | |
237 } | |
238 }() | |
239 | |
240 wg.Wait() | |
241 switch { | |
242 case triggersErr != nil: | |
243 return triggersErr | |
244 case finishedErr != nil: | |
245 return finishedErr | |
246 } | |
247 | |
248 // Do cleanups first. | |
249 if err := dsset.CleanupStorage(c, triggersList.Tombstones, finishedList. Tombstones); err != nil { | |
250 return err | |
251 } | |
252 | |
253 var cleanup []*dsset.Tombstone | |
254 err := datastore.RunInTransaction(c, func(c context.Context) error { | |
255 state := &CronState{ID: jobID} | |
256 if err := datastore.Get(c, state); err != nil && err != datastor e.ErrNoSuchEntity { | |
257 return err | |
258 } | |
259 | |
260 popOps := []*dsset.PopOp{} | |
261 | |
262 // Tidy RunningSet by removing all recently finished invocations . | |
263 if !finishedList.Empty() { | |
264 op, err := finishedSet.BeginPop(c, finishedList) | |
265 if err != nil { | |
266 return err | |
267 } | |
268 popOps = append(popOps, op) | |
269 | |
270 reallyFinished := map[int64]struct{}{} | |
271 for _, itm := range finishedList.Items { | |
272 if op.Pop(itm.ID) { | |
273 id, _ := strconv.ParseInt(itm.ID, 10, 64 ) | |
274 reallyFinished[id] = struct{}{} | |
275 } | |
276 } | |
277 | |
278 filtered := []int64{} | |
279 for _, id := range state.RunningSet { | |
280 if _, yep := reallyFinished[id]; yep { | |
281 logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id) | |
282 } else { | |
283 filtered = append(filtered, id) | |
284 } | |
285 } | |
286 state.RunningSet = filtered | |
287 } | |
288 | |
289 // Launch new invocations for each pending trigger. | |
290 if !triggersList.Empty() { | |
291 op, err := triggersSet.BeginPop(c, triggersList) | |
292 if err != nil { | |
293 return err | |
294 } | |
295 popOps = append(popOps, op) | |
296 | |
297 batch := internal.LaunchInvocationsBatchTask{JobId: stat e.ID} | |
298 for _, trigger := range triggersList.Items { | |
299 if op.Pop(trigger.ID) { | |
300 logging.Infof(c, "Launching new launch-% d for trigger %s", state.NextInvID, trigger.ID) | |
301 state.RunningSet = append(state.RunningS et, state.NextInvID) | |
302 batch.InvId = append(batch.InvId, state. NextInvID) | |
303 state.NextInvID++ | |
304 } | |
305 } | |
306 // Transactionally trigger a batch with new invocations. | |
307 if len(batch.InvId) != 0 { | |
308 if err := dispatcher.AddTask(c, &tq.Task{Payload : &batch}); err != nil { | |
309 return err | |
310 } | |
311 } | |
312 } | |
313 | |
314 // Submit set changes. | |
315 var err error | |
316 if cleanup, err = dsset.FinishPop(c, popOps...); err != nil { | |
317 return err | |
318 } | |
319 | |
320 logging.Infof(c, "Running invocations - %v", state.RunningSet) | |
321 | |
322 // If nothing is running, poke the cron machine. Maybe it wants to start | |
323 // something. | |
324 if len(state.RunningSet) == 0 { | |
325 err = pokeMachine(c, state, func(c context.Context, m *c ron.Machine) error { | |
326 m.RewindIfNecessary() | |
327 return nil | |
328 }) | |
329 if err != nil { | |
330 return err | |
331 } | |
332 } | |
333 | |
334 // Done! | |
335 return datastore.Put(c, state) | |
336 }, nil) | |
337 | |
338 if err == nil && len(cleanup) != 0 { | |
339 // Best effort cleanup of storage of consumed items. | |
340 logging.Infof(c, "Cleaning up storage of %d items", len(cleanup) ) | |
341 if err := dsset.CleanupStorage(c, cleanup); err != nil { | |
342 logging.Warningf(c, "Best effort cleanup failed - %s", e rr) | |
343 } | |
344 } | |
345 | |
346 return transient.Tag.Apply(err) | |
347 } | |
348 | |
349 func handleBatchLaunch(c context.Context, task proto.Message, execCount int) err or { | |
350 msg := task.(*internal.LaunchInvocationsBatchTask) | |
351 logging.Infof(c, "Batch launch for %q", msg.JobId) | |
352 | |
353 tasks := []*tq.Task{} | |
354 for _, invId := range msg.InvId { | |
355 logging.Infof(c, "Launching inv-%d", invId) | |
356 tasks = append(tasks, &tq.Task{ | |
357 DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, in vId), | |
358 Payload: &internal.LaunchInvocationTask{ | |
359 JobId: msg.JobId, | |
360 InvId: invId, | |
361 }, | |
362 }) | |
363 } | |
364 | |
365 return dispatcher.AddTasks(c, tasks) | |
366 } | |
367 | |
368 func handleLaunchTask(c context.Context, task proto.Message, execCount int) erro r { | |
369 msg := task.(*internal.LaunchInvocationTask) | |
370 logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvI d) | |
371 | |
372 // There can be more stuff here. But we just finish the invocation right away. | |
373 | |
374 finishedSet := recentlyFinishedSet(c, msg.JobId) | |
375 err := finishedSet.Add(c, []dsset.Item{ | |
376 {ID: fmt.Sprintf("%d", msg.InvId)}, | |
377 }) | |
378 if err != nil { | |
379 return err | |
380 } | |
381 | |
382 // Kick the triage now that the set of running invocations has been modi fied. | |
383 return kickTriageTask(c, msg.JobId) | |
131 } | 384 } |
132 | 385 |
133 func init() { | 386 func init() { |
134 r := router.New() | 387 r := router.New() |
135 gaemiddleware.InstallHandlers(r) | 388 gaemiddleware.InstallHandlers(r) |
136 | 389 |
137 » tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil ) | 390 » dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil) |
138 » tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil) | 391 » dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger , "triggers", nil) |
139 » tasks.InstallRoutes(r, gaemiddleware.BaseProd()) | 392 » dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "t riages", nil) |
393 » dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBa tchLaunch, "launches", nil) | |
394 » dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTa sk, "invocations", nil) | |
395 » dispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) | |
140 | 396 |
141 // Kick-start a bunch of jobs by visiting: | 397 // Kick-start a bunch of jobs by visiting: |
142 // | 398 // |
143 // http://localhost:8080/start/with 10s interval | 399 // http://localhost:8080/start/with 10s interval |
144 // http://localhost:8080/start/with 5s interval | 400 // http://localhost:8080/start/with 5s interval |
145 // http://localhost:8080/start/0 * * * * * * * | 401 // http://localhost:8080/start/0 * * * * * * * |
146 // | 402 // |
147 // And the look at the logs. | 403 // And the look at the logs. |
148 | 404 |
149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { | 405 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
150 jobID := c.Params.ByName("JobID") | 406 jobID := c.Params.ByName("JobID") |
151 if err := startJob(c.Context, jobID); err != nil { | 407 if err := startJob(c.Context, jobID); err != nil { |
152 panic(err) | 408 panic(err) |
153 } | 409 } |
154 }) | 410 }) |
155 | 411 |
412 r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Contex t) { | |
413 jobID := c.Params.ByName("JobID") | |
414 triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixN ano()) | |
415 if err := addTrigger(c.Context, jobID, triggerID); err != nil { | |
416 panic(err) | |
417 } | |
418 }) | |
419 | |
156 http.DefaultServeMux.Handle("/", r) | 420 http.DefaultServeMux.Handle("/", r) |
157 } | 421 } |
OLD | NEW |