Chromium Code Reviews| Index: appengine/cmd/dm/mutate/schedule_execution.go |
| diff --git a/appengine/cmd/dm/mutate/schedule_execution.go b/appengine/cmd/dm/mutate/schedule_execution.go |
| index 8b4e4d3f435bca5fa0c52933af1d2b2c81083f6e..52b3bd549f6e6e446bb2bf2f386c8cb3c681318a 100644 |
| --- a/appengine/cmd/dm/mutate/schedule_execution.go |
| +++ b/appengine/cmd/dm/mutate/schedule_execution.go |
| @@ -5,10 +5,16 @@ |
| package mutate |
| import ( |
| + "fmt" |
| + |
| + "github.com/luci/gae/filter/txnBuf" |
| "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| "github.com/luci/luci-go/appengine/cmd/dm/model" |
| "github.com/luci/luci-go/appengine/tumble" |
| "github.com/luci/luci-go/common/api/dm/service/v1" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logging" |
| "golang.org/x/net/context" |
| ) |
| @@ -20,11 +26,71 @@ type ScheduleExecution struct { |
| // Root implements tumble.Mutation |
| func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { |
| - return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For}) |
| + return model.AttemptKeyFromID(c, s.For) |
| } |
| // RollForward implements tumble.Mutation |
| func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) { |
|
iannucci
2016/06/08 02:54:24
this is the other beefish change: now when a Sched
|
| + ds := datastore.Get(c) |
| + a := model.AttemptFromID(s.For) |
| + if err = ds.Get(a); err != nil { |
| + return |
| + } |
| + |
| + if a.State != dm.Attempt_SCHEDULING { |
| + return |
| + } |
| + |
| + q := model.QuestFromID(s.For.Quest) |
| + if err = txnBuf.GetNoTxn(c).Get(q); err != nil { |
| + return |
| + } |
| + |
| + reg := distributor.GetRegistry(c) |
| + dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName) |
| + if err != nil { |
| + return |
| + } |
| + |
| + a.CurExecution++ |
| + if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil { |
| + return |
| + } |
| + |
| + eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution) |
| + e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver) |
| + |
| + exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token} |
| + |
| + var distTok distributor.Token |
| + distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run( |
|
dnj (Google)
2016/06/09 18:00:56
Since it's possible for the transaction to fail, i
iannucci
2016/06/15 00:46:01
This is documented in the distributor package, but
|
| + distributor.NewTaskDescription(c, &q.Desc, exAuth, |
| + distributor.PersistentState(a.PersistentState))) |
| + e.DistributorToken = string(distTok) |
| + if err != nil { |
| + if errors.IsTransient(err) { |
| + // tumble will retry us later |
| + logging.WithError(err).Errorf(c, "got transient error in ScheduleExecution") |
| + return |
| + } |
| + origErr := err |
| + |
| + // put a and e to the transaction buffer, so that |
| + // FinishExecution.RollForward can see them. |
| + if err = ds.PutMulti([]interface{}{a, e}); err != nil { |
| + return |
| + } |
| + return NewFinishExecutionAbnormal( |
| + eid, dm.AbnormalFinish_REJECTED, |
| + fmt.Sprintf("rejected during scheduling with non-transient error: %s", origErr), |
| + ).RollForward(c) |
| + } |
| + |
| + if err = ResetExecutionTimeout(c, e); err != nil { |
| + return |
| + } |
| + |
| + err = ds.PutMulti([]interface{}{a, e}) |
| return |
| } |