Chromium Code Reviews| Index: scheduler/appengine/engine/cron/demo/main.go |
| diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go |
| index 381f63877e45c67b091bbb6fc92b9ae19331fcf5..5f30311e2fd5ae15ada7a480836cec1b81b0e6df 100644 |
| --- a/scheduler/appengine/engine/cron/demo/main.go |
| +++ b/scheduler/appengine/engine/cron/demo/main.go |
| @@ -16,33 +16,44 @@ |
| package demo |
| import ( |
| + "fmt" |
| "net/http" |
| + "strconv" |
| + "sync" |
| "time" |
| "golang.org/x/net/context" |
| "github.com/golang/protobuf/proto" |
| "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/info" |
| + "github.com/luci/gae/service/memcache" |
| "github.com/luci/luci-go/appengine/gaemiddleware" |
| + "github.com/luci/luci-go/appengine/memlock" |
| "github.com/luci/luci-go/common/clock" |
| "github.com/luci/luci-go/common/data/rand/mathrand" |
| "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/retry/transient" |
| "github.com/luci/luci-go/server/router" |
| "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
| + "github.com/luci/luci-go/scheduler/appengine/engine/dsset" |
| "github.com/luci/luci-go/scheduler/appengine/engine/internal" |
| "github.com/luci/luci-go/scheduler/appengine/engine/tq" |
| "github.com/luci/luci-go/scheduler/appengine/schedule" |
| ) |
| -var tasks = tq.Dispatcher{} |
| +var dispatcher = tq.Dispatcher{} |
| type CronState struct { |
| _extra datastore.PropertyMap `gae:"-,extra"` |
| ID string `gae:"$id"` |
| State cron.State `gae:",noindex"` |
| + |
| + NextInvID int64 `gae:",noindex"` |
| + RunningSet []int64 `gae:",noindex"` |
| } |
| func (s *CronState) schedule() *schedule.Schedule { |
| @@ -53,56 +64,73 @@ func (s *CronState) schedule() *schedule.Schedule { |
| return parsed |
| } |
| -// evolve instantiates cron.Machine, calls the callback and submits emitted |
| -// actions. |
| -func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error { |
| - err := datastore.RunInTransaction(c, func(c context.Context) error { |
| - entity := CronState{ID: id} |
| - if err := datastore.Get(c, &entity); err != nil && err != datastore.ErrNoSuchEntity { |
| - return err |
| - } |
| +func pendingTriggersSet(c context.Context, jobID string) *dsset.Set { |
| + return &dsset.Set{ |
| + ID: "triggers:" + jobID, |
| + ShardCount: 8, |
| + TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), |
| + TombstonesDelay: 15 * time.Minute, |
| + } |
| +} |
| - machine := &cron.Machine{ |
| - Now: clock.Now(c), |
| - Schedule: entity.schedule(), |
| - Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, |
| - State: entity.State, |
| +func recentlyFinishedSet(c context.Context, jobID string) *dsset.Set { |
| + return &dsset.Set{ |
| + ID: "finished:" + jobID, |
| + ShardCount: 8, |
| + TombstonesRoot: datastore.KeyForObj(c, &CronState{ID: jobID}), |
| + TombstonesDelay: 15 * time.Minute, |
| + } |
| +} |
| + |
| +func pokeMachine(c context.Context, entity *CronState, cb func(context.Context, *cron.Machine) error) error { |
| + machine := &cron.Machine{ |
| + Now: clock.Now(c), |
| + Schedule: entity.schedule(), |
| + Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, |
| + State: entity.State, |
| + } |
| + |
| + if err := cb(c, machine); err != nil { |
| + return err |
| + } |
| + |
| + tasks := []*tq.Task{} |
| + for _, action := range machine.Actions { |
| + switch a := action.(type) { |
| + case cron.TickLaterAction: |
| + logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) |
| + tasks = append(tasks, &tq.Task{ |
| + Payload: &internal.TickLaterTask{JobId: entity.ID, TickNonce: a.TickNonce}, |
| + ETA: a.When, |
| + }) |
| + case cron.StartInvocationAction: |
| + tasks = append(tasks, &tq.Task{ |
| + Payload: &internal.TriggerInvocationTask{JobId: entity.ID, TriggerId: mathrand.Get(c).Int63()}, |
| + }) |
| + default: |
| + panic("unknown action type") |
| } |
| + } |
| + if err := dispatcher.AddTasks(c, tasks); err != nil { |
| + return err |
| + } |
| + |
| + entity.State = machine.State |
| + return nil |
| +} |
| - if err := cb(c, machine); err != nil { |
| +func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error { |
| + err := datastore.RunInTransaction(c, func(c context.Context) error { |
| + entity := &CronState{ID: id} |
|
tandrii(chromium)
2017/07/31 18:58:46
btw, why this change? I think entity object should
|
| + if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNoSuchEntity { |
| return err |
| } |
| - |
| - for _, action := range machine.Actions { |
| - var task tq.Task |
| - switch a := action.(type) { |
| - case cron.TickLaterAction: |
| - logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) |
| - task = tq.Task{ |
| - Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce}, |
| - ETA: a.When, |
| - } |
| - case cron.StartInvocationAction: |
| - task = tq.Task{ |
| - Payload: &internal.StartInvocationTask{JobId: id}, |
| - Delay: time.Second, // give the transaction time to land |
| - } |
| - default: |
| - panic("unknown action type") |
| - } |
| - if err := tasks.AddTask(c, &task); err != nil { |
| - return err |
| - } |
| + if err := pokeMachine(c, entity, cb); err != nil { |
| + return err |
| } |
| - |
| - entity.State = machine.State |
| - return datastore.Put(c, &entity) |
| + return datastore.Put(c, entity) |
| }, nil) |
| - |
| - if err != nil { |
| - logging.Errorf(c, "FAIL - %s", err) |
| - } |
| - return err |
| + return transient.Tag.Apply(err) |
| } |
| func startJob(c context.Context, id string) error { |
| @@ -114,6 +142,46 @@ func startJob(c context.Context, id string) error { |
| }) |
| } |
| +func addTrigger(c context.Context, jobID, triggerID string) error { |
| + logging.Infof(c, "Triggering %q - %q", jobID, triggerID) |
| + |
| + // Add the trigger request to the pending set. |
| + if err := pendingTriggersSet(c, jobID).Add(c, []dsset.Item{{ID: triggerID}}); err != nil { |
| + return err |
| + } |
| + |
| + // Run a task that examines the pending set and makes decisions. |
| + return kickTriageTask(c, jobID) |
| +} |
| + |
| +func kickTriageTask(c context.Context, jobID string) error { |
| + // Throttle to once per 2 sec (and make sure it is always in the future). |
| + eta := clock.Now(c).Unix() |
| + eta = (eta/2 + 1) * 2 |
| + dedupKey := fmt.Sprintf("triage:%s:%d", jobID, eta) |
| + |
| + // Use cheaper but crappier memcache as a first guard. |
| + itm := memcache.NewItem(c, dedupKey).SetExpiration(time.Minute) |
| + if memcache.Get(c, itm) == nil { |
| + logging.Infof(c, "The triage task has already been scheduled") |
| + return nil // already added! |
| + } |
| + |
| + err := dispatcher.AddTask(c, &tq.Task{ |
| + DeduplicationKey: dedupKey, |
| + ETA: time.Unix(eta, 0), |
| + Payload: &internal.TriageTriggersTask{JobId: jobID}, |
| + }) |
| + if err != nil { |
| + return err |
| + } |
| + logging.Infof(c, "Scheduled the triage task") |
| + |
| + // Best effort in setting memcache flag. No big deal if it fails. |
| + memcache.Set(c, itm) |
| + return nil |
| +} |
| + |
| func handleTick(c context.Context, task proto.Message, execCount int) error { |
| msg := task.(*internal.TickLaterTask) |
| return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
| @@ -121,22 +189,210 @@ func handleTick(c context.Context, task proto.Message, execCount int) error { |
| }) |
| } |
| -func handleInvocation(c context.Context, task proto.Message, execCount int) error { |
| - msg := task.(*internal.StartInvocationTask) |
| - logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId) |
| - return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) error { |
| - m.RewindIfNecessary() |
| - return nil |
| +func handleTrigger(c context.Context, task proto.Message, execCount int) error { |
| + msg := task.(*internal.TriggerInvocationTask) |
| + return addTrigger(c, msg.JobId, fmt.Sprintf("cron:%d", msg.TriggerId)) |
| +} |
| + |
| +func handleTriage(c context.Context, task proto.Message, execCount int) error { |
| + msg := task.(*internal.TriageTriggersTask) |
| + logging.Infof(c, "Triaging requests for %q", msg.JobId) |
| + |
| + err := memlock.TryWithLock(c, "triageLock:"+msg.JobId, info.RequestID(c), func(context.Context) error { |
| + logging.Infof(c, "Got the lock!") |
| + return runTriage(c, msg.JobId) |
| + }) |
| + return transient.Tag.Apply(err) |
| +} |
| + |
| +func runTriage(c context.Context, jobID string) error { |
| + wg := sync.WaitGroup{} |
| + wg.Add(2) |
| + |
| + var triggersList *dsset.Listing |
| + var triggersErr error |
| + |
| + var finishedList *dsset.Listing |
| + var finishedErr error |
| + |
| + // Grab all pending requests (and stuff to cleanup). |
| + triggersSet := pendingTriggersSet(c, jobID) |
| + go func() { |
| + defer wg.Done() |
| + triggersList, triggersErr = triggersSet.List(c) |
| + if triggersErr == nil { |
| + logging.Infof(c, "Triggers: %d items, %d tombs to cleanup", |
| + len(triggersList.Items), len(triggersList.Tombstones)) |
| + } |
| + }() |
| + |
| + // Same for recently finished invocations. |
| + finishedSet := recentlyFinishedSet(c, jobID) |
| + go func() { |
| + defer wg.Done() |
| + finishedList, finishedErr = finishedSet.List(c) |
| + if finishedErr == nil { |
| + logging.Infof(c, "Finished: %d items, %d tombs to cleanup", |
| + len(finishedList.Items), len(finishedList.Tombstones)) |
| + } |
| + }() |
| + |
| + wg.Wait() |
| + switch { |
| + case triggersErr != nil: |
| + return triggersErr |
| + case finishedErr != nil: |
| + return finishedErr |
| + } |
| + |
| + // Do cleanups first. |
| + if err := dsset.CleanupStorage(c, triggersList.Tombstones, finishedList.Tombstones); err != nil { |
| + return err |
| + } |
| + |
| + var cleanup []*dsset.Tombstone |
| + err := datastore.RunInTransaction(c, func(c context.Context) error { |
| + state := &CronState{ID: jobID} |
| + if err := datastore.Get(c, state); err != nil && err != datastore.ErrNoSuchEntity { |
| + return err |
| + } |
| + |
| + popOps := []*dsset.PopOp{} |
| + |
| + // Tidy RunningSet by removing all recently finished invocations. |
| + if !finishedList.Empty() { |
| + op, err := finishedSet.BeginPop(c, finishedList) |
| + if err != nil { |
| + return err |
| + } |
| + popOps = append(popOps, op) |
| + |
| + reallyFinished := map[int64]struct{}{} |
| + for _, itm := range finishedList.Items { |
| + if op.Pop(itm.ID) { |
| + id, _ := strconv.ParseInt(itm.ID, 10, 64) |
| + reallyFinished[id] = struct{}{} |
| + } |
| + } |
| + |
| + filtered := []int64{} |
| + for _, id := range state.RunningSet { |
| + if _, yep := reallyFinished[id]; yep { |
| + logging.Infof(c, "Invocation finished-%d is acknowledged as finished", id) |
| + } else { |
| + filtered = append(filtered, id) |
| + } |
| + } |
| + state.RunningSet = filtered |
| + } |
| + |
| + // Launch new invocations for each pending trigger. |
| + if !triggersList.Empty() { |
| + op, err := triggersSet.BeginPop(c, triggersList) |
| + if err != nil { |
| + return err |
| + } |
| + popOps = append(popOps, op) |
| + |
| + batch := internal.LaunchInvocationsBatchTask{JobId: state.ID} |
| + for _, trigger := range triggersList.Items { |
| + if op.Pop(trigger.ID) { |
| + logging.Infof(c, "Launching new launch-%d for trigger %s", state.NextInvID, trigger.ID) |
| + state.RunningSet = append(state.RunningSet, state.NextInvID) |
| + batch.InvId = append(batch.InvId, state.NextInvID) |
| + state.NextInvID++ |
| + } |
| + } |
| + // Transactionally trigger a batch with new invocations. |
| + if len(batch.InvId) != 0 { |
| + if err := dispatcher.AddTask(c, &tq.Task{Payload: &batch}); err != nil { |
| + return err |
| + } |
| + } |
| + } |
| + |
| + // Submit set changes. |
| + var err error |
| + if cleanup, err = dsset.FinishPop(c, popOps...); err != nil { |
| + return err |
| + } |
| + |
| + logging.Infof(c, "Running invocations - %v", state.RunningSet) |
| + |
| + // If nothing is running, poke the cron machine. Maybe it wants to start |
| + // something. |
| + if len(state.RunningSet) == 0 { |
| + err = pokeMachine(c, state, func(c context.Context, m *cron.Machine) error { |
| + m.RewindIfNecessary() |
| + return nil |
| + }) |
| + if err != nil { |
| + return err |
| + } |
| + } |
| + |
| + // Done! |
| + return datastore.Put(c, state) |
| + }, nil) |
| + |
| + if err == nil && len(cleanup) != 0 { |
| + // Best effort cleanup of storage of consumed items. |
| + logging.Infof(c, "Cleaning up storage of %d items", len(cleanup)) |
| + if err := dsset.CleanupStorage(c, cleanup); err != nil { |
| + logging.Warningf(c, "Best effort cleanup failed - %s", err) |
| + } |
| + } |
| + |
| + return transient.Tag.Apply(err) |
| +} |
| + |
| +func handleBatchLaunch(c context.Context, task proto.Message, execCount int) error { |
| + msg := task.(*internal.LaunchInvocationsBatchTask) |
| + logging.Infof(c, "Batch launch for %q", msg.JobId) |
| + |
| + tasks := []*tq.Task{} |
| + for _, invId := range msg.InvId { |
| + logging.Infof(c, "Launching inv-%d", invId) |
| + tasks = append(tasks, &tq.Task{ |
| + DeduplicationKey: fmt.Sprintf("inv:%s:%d", msg.JobId, invId), |
| + Payload: &internal.LaunchInvocationTask{ |
| + JobId: msg.JobId, |
| + InvId: invId, |
| + }, |
| + }) |
| + } |
| + |
| + return dispatcher.AddTasks(c, tasks) |
| +} |
| + |
| +func handleLaunchTask(c context.Context, task proto.Message, execCount int) error { |
| + msg := task.(*internal.LaunchInvocationTask) |
| + logging.Infof(c, "Executing invocation %q: exec-%d", msg.JobId, msg.InvId) |
| + |
| + // There can be more stuff here. But we just finish the invocation right away. |
| + |
| + finishedSet := recentlyFinishedSet(c, msg.JobId) |
| + err := finishedSet.Add(c, []dsset.Item{ |
| + {ID: fmt.Sprintf("%d", msg.InvId)}, |
| }) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + // Kick the triage now that the set of running invocations has been modified. |
| + return kickTriageTask(c, msg.JobId) |
| } |
| func init() { |
| r := router.New() |
| gaemiddleware.InstallHandlers(r) |
| - tasks.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil) |
| - tasks.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "default", nil) |
| - tasks.InstallRoutes(r, gaemiddleware.BaseProd()) |
| + dispatcher.RegisterTask(&internal.TickLaterTask{}, handleTick, "timers", nil) |
| + dispatcher.RegisterTask(&internal.TriggerInvocationTask{}, handleTrigger, "triggers", nil) |
| + dispatcher.RegisterTask(&internal.TriageTriggersTask{}, handleTriage, "triages", nil) |
| + dispatcher.RegisterTask(&internal.LaunchInvocationsBatchTask{}, handleBatchLaunch, "launches", nil) |
| + dispatcher.RegisterTask(&internal.LaunchInvocationTask{}, handleLaunchTask, "invocations", nil) |
| + dispatcher.InstallRoutes(r, gaemiddleware.BaseProd()) |
| // Kick-start a bunch of jobs by visiting: |
| // |
| @@ -153,5 +409,13 @@ func init() { |
| } |
| }) |
| + r.GET("/trigger/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { |
| + jobID := c.Params.ByName("JobID") |
| + triggerID := fmt.Sprintf("manual:%d", clock.Now(c.Context).UnixNano()) |
| + if err := addTrigger(c.Context, jobID, triggerID); err != nil { |
| + panic(err) |
| + } |
| + }) |
| + |
| http.DefaultServeMux.Handle("/", r) |
| } |