Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(264)

Side by Side Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ.
16 package demo 16 package demo
17 17
18 import ( 18 import (
19 "fmt"
19 "net/http" 20 "net/http"
21 "strconv"
22 "sync"
20 "time" 23 "time"
21 24
22 "golang.org/x/net/context" 25 "golang.org/x/net/context"
23 26
24 "github.com/golang/protobuf/proto" 27 "github.com/golang/protobuf/proto"
25 "github.com/luci/gae/service/datastore" 28 "github.com/luci/gae/service/datastore"
29 "github.com/luci/gae/service/info"
26 30
27 "github.com/luci/luci-go/appengine/gaemiddleware" 31 "github.com/luci/luci-go/appengine/gaemiddleware"
32 "github.com/luci/luci-go/appengine/memlock"
28 "github.com/luci/luci-go/common/clock" 33 "github.com/luci/luci-go/common/clock"
29 "github.com/luci/luci-go/common/data/rand/mathrand" 34 "github.com/luci/luci-go/common/data/rand/mathrand"
30 "github.com/luci/luci-go/common/logging" 35 "github.com/luci/luci-go/common/logging"
36 "github.com/luci/luci-go/common/retry/transient"
31 "github.com/luci/luci-go/server/router" 37 "github.com/luci/luci-go/server/router"
32 38
33 "github.com/luci/luci-go/scheduler/appengine/engine/cron" 39 "github.com/luci/luci-go/scheduler/appengine/engine/cron"
40 "github.com/luci/luci-go/scheduler/appengine/engine/dsset"
34 "github.com/luci/luci-go/scheduler/appengine/engine/internal" 41 "github.com/luci/luci-go/scheduler/appengine/engine/internal"
35 "github.com/luci/luci-go/scheduler/appengine/engine/tq" 42 "github.com/luci/luci-go/scheduler/appengine/engine/tq"
36 "github.com/luci/luci-go/scheduler/appengine/schedule" 43 "github.com/luci/luci-go/scheduler/appengine/schedule"
37 ) 44 )
38 45
39 var tasks = tq.Dispatcher{} 46 var dispatcher = tq.Dispatcher{}
40 47
41 type CronState struct { 48 type CronState struct {
42 _extra datastore.PropertyMap `gae:"-,extra"` 49 _extra datastore.PropertyMap `gae:"-,extra"`
43 50
44 ID string `gae:"$id"` 51 ID string `gae:"$id"`
45 State cron.State `gae:",noindex"` 52 State cron.State `gae:",noindex"`
53
54 NextInvID int64 `gae:",noindex"`
55 RunningSet []int64 `gae:",noindex"`
46 } 56 }
47 57
48 func (s *CronState) schedule() *schedule.Schedule { 58 func (s *CronState) schedule() *schedule.Schedule {
49 parsed, err := schedule.Parse(s.ID, 0) 59 parsed, err := schedule.Parse(s.ID, 0)
50 if err != nil { 60 if err != nil {
51 panic(err) 61 panic(err)
52 } 62 }
53 return parsed 63 return parsed
54 } 64 }
55 65
56 // evolve instantiates cron.Machine, calls the callback and submits emitted 66 func pendingTriggersSet(c context.Context, jobID string) *dsset.Set {
57 // actions. 67 » return &dsset.Set{
68 » » ID: "triggers:" + jobID,
69 » » ShardCount: 8,
70 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}),
71 » » TombstonesDelay: 3 * time.Minute,
72 » }
73 }
74
75 func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set {
76 » return &dsset.Set{
77 » » ID: "finished:" + jobID,
78 » » ShardCount: 8,
79 » » TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}),
80 » » TombstonesDelay: 3 * time.Minute,
81 » }
82 }
83
84 func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error {
85 » machine := &cron.Machine{
86 » » Now: clock.Now(c),
87 » » Schedule: entity.schedule(),
88 » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
89 » » State: entity.State,
90 » }
91
92 » if err := cb(c, machine); err != nil {
93 » » return err
94 » }
95
96 » tasks := []*tq.Task{}
97 » for _, action := range machine.Actions {
98 » » switch a := action.(type) {
99 » » case cron.TickLaterAction:
100 » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNo nce, a.When.Sub(time.Now()))
101 » » » tasks = append(tasks, &tq.Task{
102 » » » » Payload: &internal.TickLaterTask{JobId: entity.I D, TickNonce: a.TickNonce},
103 » » » » ETA: a.When,
104 » » » })
105 » » case cron.StartInvocationAction:
106 » » » tasks = append(tasks, &tq.Task{
107 » » » » Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()},
108 » » » })
109 » » default:
110 » » » panic("unknown action type")
111 » » }
112 » }
113 » if err := dispatcher.AddTasks(c, tasks); err != nil {
114 » » return err
115 » }
116
117 » entity.State = machine.State
118 » return nil
119 }
120
58 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error { 121 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine ) error) error {
59 err := datastore.RunInTransaction(c, func(c context.Context) error { 122 err := datastore.RunInTransaction(c, func(c context.Context) error {
60 » » entity := CronState{ID: id} 123 » » entity := &CronState{ID: id}
61 » » if err := datastore.Get(c, &entity); err != nil && err != datast ore.ErrNoSuchEntity { 124 » » if err := datastore.Get(c, entity); err != nil && err != datasto re.ErrNoSuchEntity {
62 return err 125 return err
63 } 126 }
64 127 » » if err := pokeMachine(c, entity, cb); err != nil {
65 » » machine := &cron.Machine{
66 » » » Now: clock.Now(c),
67 » » » Schedule: entity.schedule(),
68 » » » Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
69 » » » State: entity.State,
70 » » }
71
72 » » if err := cb(c, machine); err != nil {
73 return err 128 return err
74 } 129 }
75 130 » » return datastore.Put(c, entity)
76 » » for _, action := range machine.Actions {
77 » » » var task tq.Task
78 » » » switch a := action.(type) {
79 » » » case cron.TickLaterAction:
80 » » » » logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
81 » » » » task = tq.Task{
82 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
83 » » » » » ETA: a.When,
84 » » » » }
85 » » » case cron.StartInvocationAction:
86 » » » » task = tq.Task{
87 » » » » » Payload: &internal.StartInvocationTask{J obId: id},
88 » » » » » Delay: time.Second, // give the transa ction time to land
89 » » » » }
90 » » » default:
91 » » » » panic("unknown action type")
92 » » » }
93 » » » if err := tasks.AddTask(c, &task); err != nil {
94 » » » » return err
95 » » » }
96 » » }
97
98 » » entity.State = machine.State
99 » » return datastore.Put(c, &entity)
100 }, nil) 131 }, nil)
101 132 » return transient.Tag.Apply(err)
102 » if err != nil {
103 » » logging.Errorf(c, "FAIL - %s", err)
104 » }
105 » return err
106 } 133 }
107 134
108 func startJob(c context.Context, id string) error { 135 func startJob(c context.Context, id string) error {
109 return evolve(c, id, func(c context.Context, m *cron.Machine) error { 136 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
110 // Forcefully restart the chain of tasks. 137 // Forcefully restart the chain of tasks.
111 m.Disable() 138 m.Disable()
112 m.Enable() 139 m.Enable()
113 return nil 140 return nil
114 }) 141 })
115 } 142 }
116 143
144 func addTrigger(c context.Context, jobID, triggerID string) error {
145 logging.Infof(c, "Triggering %q - %q", jobID, triggerID)
146
147 // Add the trigger request to the pending set.
148 if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerI D}}); err != nil {
149 return err
150 }
151
152 // Run a task that examines the pending set and makes decisions.
153 return kickTriageTask(c, jobID)
154 }
155
156 func kickTriageTask(c context.Context, jobID string) error {
157 // Throttle to once per 2 sec (and make sure it is always in the future) .
158 eta := clock.Now(c).Unix()
159 eta = (eta/2 + 1) * 2
160 return dispatcher.AddTask(c, &tq.Task{
161 DeduplicationKey: fmt.Sprintf("triage:%s:%d", jobID, eta),
162 ETA: time.Unix(eta, 0),
163 Payload: &internal.TriageTriggersTask{JobId: jobID},
164 })
165 }
166
117 func handleTick(c context.Context, task proto.Message, execCount int) error { 167 func handleTick(c context.Context, task proto.Message, execCount int) error {
118 msg := task.(*internal.TickLaterTask) 168 msg := task.(*internal.TickLaterTask)
119 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { 169 return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or {
120 return m.OnTimerTick(msg.TickNonce) 170 return m.OnTimerTick(msg.TickNonce)
121 }) 171 })
122 } 172 }
123 173
124 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r { 174 func handleTrigger(c context.Context, task proto.Message, execCount int) error {
125 » msg := task.(*internal.StartInvocationTask) 175 » msg := task.(*internal.TriggerInvocationTask)
126 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) 176 » return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId))
127 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or { 177 }
128 » » m.RewindIfNecessary() 178
129 » » return nil 179 func handleTriage(c context.Context, task proto.Message, execCount int) error {
180 » msg := task.(*internal.TriageTriggersTask)
181 » logging.Infof(c, "Triaging requests for %q", msg.JobId)
182
183 » err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c) , func(context.Context) error {
184 » » logging.Infof(c, "Got the lock!")
185 » » return runTriage(c, msg.JobId)
130 }) 186 })
187 return transient.Tag.Apply(err)
188 }
189
190 func runTriage(c context.Context, jobID string) error {
tandrii(chromium) 2017/07/21 13:42:52 Do IIC that this is each trigger job could also ch
Vadim Sh. 2017/07/24 00:30:15 Only if it's fine with using stale data. It can't
tandrii(chromium) 2017/07/31 18:58:45 Ah, so the policy is part of the triggerED job, no
191 wg := sync.WaitGroup{}
192 wg.Add(2)
193
194 var triggers []dsset.Item
195 var triggerTombs []dsset.Tombstone
196 var triggerErr error
197
198 // Grab all pending requests (and stuff to cleanup).
199 triggersSet := pendingTriggersSet(c, jobID)
200 go func() {
201 defer wg.Done()
202 triggers, triggerTombs, triggerErr = triggersSet.List(c)
203 if triggerErr == nil {
204 logging.Infof(c, "Triggers: %d items, %d tombs to cleanu p", len(triggers), len(triggerTombs))
205 triggerErr = triggersSet.CleanupStorage(c, triggerTombs)
206 }
207 }()
208
209 var finished []dsset.Item
210 var finishedTombs []dsset.Tombstone
211 var finishedErr error
212
213 // Same for recently finished invocations.
214 finishedSet := recentlyFinishedSet(c, jobID)
215 go func() {
216 wg.Done()
217 finished, finishedTombs, finishedErr = finishedSet.List(c)
218 if finishedErr == nil {
219 logging.Infof(c, "Finished: %d items, %d tombs to cleanu p", len(finished), len(finishedTombs))
220 finishedErr = finishedSet.CleanupStorage(c, finishedTomb s)
221 }
222 }()
223
224 // Do the cleanups first.
225 wg.Wait()
226 if triggerErr != nil {
227 return triggerErr
228 }
229 if finishedErr != nil {
230 return finishedErr
231 }
232
233 err := datastore.RunInTransaction(c, func(c context.Context) error {
234 state := &CronState{ID: jobID}
235 if err := datastore.Get(c, state); err != nil && err != datastor e.ErrNoSuchEntity {
236 return err
237 }
238
239 // Tidy RunningSet by removing all recently finished invocations .
240 if len(finished) != 0 || len(finishedTombs) != 0 {
241 op, err := finishedSet.BeginPop(c)
242 if err != nil {
243 return err
244 }
245 op.CleanupTombstones(finishedTombs)
246
247 reallyFinished := map[int64]struct{}{}
248 for _, itm := range finished {
249 if op.Pop(itm.ID) {
250 id, _ := strconv.ParseInt(itm.ID, 10, 64 )
251 reallyFinished[id] = struct{}{}
252 }
253 }
254
255 if err := op.Submit(); err != nil {
256 return err
257 }
258
259 filtered := []int64{}
260 for _, id := range state.RunningSet {
261 if _, yep := reallyFinished[id]; yep {
262 logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id)
263 } else {
264 filtered = append(filtered, id)
265 }
266 }
267 state.RunningSet = filtered
268 }
269
270 // Launch new invocations for each pending trigger.
271
272 op, err := triggersSet.BeginPop(c)
273 if err != nil {
274 return err
275 }
276 op.CleanupTombstones(triggerTombs)
277
278 batch := internal.LaunchInvocationsBatchTask{JobId: state.ID}
279 for _, trigger := range triggers {
280 if op.Pop(trigger.ID) {
281 logging.Infof(c, "Launching new launch-%d for tr igger %s", state.NextInvID, trigger.ID)
282 state.RunningSet = append(state.RunningSet, stat e.NextInvID)
283 batch.InvId = append(batch.InvId, state.NextInvI D)
284 state.NextInvID++
285 }
286 }
287
288 if err := op.Submit(); err != nil {
289 return err
290 }
291
292 // Transactionally trigger a batch with new invocations.
293 if len(batch.InvId) != 0 {
294 if err := dispatcher.AddTask(c, &tq.Task{Payload: &batch }); err != nil {
295 return err
296 }
297 }
298
299 logging.Infof(c, "Running invocations - %v", state.RunningSet)
300
301 // If nothing is running, poke the cron machine. Maybe it wants to start
302 // something.
303 if len(state.RunningSet) == 0 {
304 err := pokeMachine(c, state, func(c context.Context, m * cron.Machine) error {
305 m.RewindIfNecessary()
306 return nil
307 })
308 if err != nil {
309 return err
310 }
311 }
312
313 // Done!
314 return datastore.Put(c, state)
315 }, nil)
316 return transient.Tag.Apply(err)
317 }
318
319 func handleBatchLaunch(c context.Context, task proto.Message, execCount int) err or {
320 msg := task.(*internal.LaunchInvocationsBatchTask)
321 logging.Infof(c, "Batch launch for %q", msg.JobId)
322
323 tasks := []*tq.Task{}
324 for _, invId := range msg.InvId {
325 logging.Infof(c, "Launching inv-%d", invId)
326 tasks = append(tasks, &tq.Task{
327 DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, in vId),
328 Payload: &internal.LaunchInvocationTask{
329 JobId: msg.JobId,
330 InvId: invId,
331 },
332 })
333 }
334
335 return dispatcher.AddTasks(c, tasks)
336 }
337
338 func handleLaunchTask(c context.Context, task proto.Message, execCount int) erro r {
339 msg := task.(*internal.LaunchInvocationTask)
340 logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvI d)
341
342 // There can be more stuff here. But we just finish the invocation right away.
343
344 finishedSet := recentlyFinishedSet(c, msg.JobId)
345 err := finishedSet.Add(c, []dsset.Item{
346 {ID: fmt.Sprintf("%d", msg.InvId)},
347 })
348 if err != nil {
349 return err
350 }
351
352 // Kick the triage now that the set of running invocations has been modi fied.
353 return kickTriageTask(c, msg.JobId)
131 } 354 }
132 355
133 func init() { 356 func init() {
134 r := router.New() 357 r := router.New()
135 gaemiddleware.InstallHandlers(r) 358 gaemiddleware.InstallHandlers(r)
136 359
137 » tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil ) 360 » dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil)
138 » tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "d efault", nil) 361 » dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger , "triggers", nil)
139 » tasks.InstallRoutes(r, gaemiddleware.BaseProd()) 362 » dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "t riages", nil)
363 » dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBa tchLaunch, "launches", nil)
364 » dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTa sk, "invocations", nil)
365 » dispatcher.InstallRoutes(r, gaemiddleware.BaseProd())
140 366
141 // Kick-start a bunch of jobs by visiting: 367 // Kick-start a bunch of jobs by visiting:
142 // 368 //
143 // http://localhost:8080/start/with 10s interval 369 // http://localhost:8080/start/with 10s interval
144 // http://localhost:8080/start/with 5s interval 370 // http://localhost:8080/start/with 5s interval
145 // http://localhost:8080/start/0 * * * * * * * 371 // http://localhost:8080/start/0 * * * * * * *
146 // 372 //
147 // And the look at the logs. 373 // And the look at the logs.
148 374
149 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { 375 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
150 jobID := c.Params.ByName("JobID") 376 jobID := c.Params.ByName("JobID")
151 if err := startJob(c.Context, jobID); err != nil { 377 if err := startJob(c.Context, jobID); err != nil {
152 panic(err) 378 panic(err)
153 } 379 }
154 }) 380 })
155 381
382 r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Contex t) {
383 jobID := c.Params.ByName("JobID")
384 triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixN ano())
385 if err := addTrigger(c.Context, jobID, triggerID); err != nil {
386 panic(err)
387 }
388 })
389
156 http.DefaultServeMux.Handle("/", r) 390 http.DefaultServeMux.Handle("/", r)
157 } 391 }
OLDNEW
« no previous file with comments | « no previous file | scheduler/appengine/engine/cron/demo/queue.yaml » ('j') | scheduler/appengine/engine/dsset/dsset.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698