| Index: appengine/cmd/dm/mutate/ack_fwd_dep.go
|
| diff --git a/appengine/cmd/dm/mutate/ack_fwd_dep.go b/appengine/cmd/dm/mutate/ack_fwd_dep.go
|
| index 1ca9507ae5aafd1f02bb4f017eba6d91744e7e41..0e52a68c3b4af57889c4f899f4069eabf873308c 100644
|
| --- a/appengine/cmd/dm/mutate/ack_fwd_dep.go
|
| +++ b/appengine/cmd/dm/mutate/ack_fwd_dep.go
|
| @@ -12,21 +12,14 @@ import (
|
| "golang.org/x/net/context"
|
| )
|
|
|
| -// AckFwdDep records the fact that a BackDep was successfully created for this
|
| -// FwdDep. It may also propagate Finished information (e.g. that the depended-on
|
| -// Attempt was actually already completed at the time that the dependency was
|
| -// taken on it).
|
| -//
|
| -// AckFwdDep is also used to propagate the fact that an Attempt (A) completed
|
| -// back to another Attempt (B) that's blocked on A.
|
| +// AckFwdDep records the fact that a dependency was completed.
|
| type AckFwdDep struct {
|
| - Dep *model.FwdEdge
|
| - DepIsFinished bool
|
| + Dep *model.FwdEdge
|
| }
|
|
|
| // Root implements tumble.Mutation.
|
| func (f *AckFwdDep) Root(c context.Context) *datastore.Key {
|
| - return datastore.Get(c).MakeKey("Attempt", f.Dep.From.DMEncoded())
|
| + return model.AttemptKeyFromID(c, f.Dep.From)
|
| }
|
|
|
| // RollForward implements tumble.Mutation.
|
| @@ -39,39 +32,23 @@ func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err
|
| return
|
| }
|
|
|
| - // if the attempt and fdep aren't on the same execution, then bail
|
| - if atmpt.CurExecution != fdep.ForExecution {
|
| + if atmpt.State != dm.Attempt_WAITING || atmpt.CurExecution != fdep.ForExecution {
|
| return
|
| }
|
|
|
| - needPut := false
|
| -
|
| idx := uint32(fdep.BitIndex)
|
|
|
| - if !atmpt.AddingDepsBitmap.IsSet(idx) {
|
| - atmpt.AddingDepsBitmap.Set(idx)
|
| -
|
| - if atmpt.AddingDepsBitmap.All(true) {
|
| - atmpt.MustModifyState(c, dm.Attempt_BLOCKED)
|
| - }
|
| -
|
| - needPut = true
|
| - }
|
| -
|
| - if f.DepIsFinished {
|
| - if !atmpt.WaitingDepBitmap.IsSet(idx) {
|
| - atmpt.WaitingDepBitmap.Set(idx)
|
| + if !atmpt.DepMap.IsSet(idx) {
|
| + atmpt.DepMap.Set(idx)
|
|
|
| - if atmpt.WaitingDepBitmap.All(true) {
|
| - atmpt.MustModifyState(c, dm.Attempt_NEEDS_EXECUTION)
|
| - muts = append(muts, &ScheduleExecution{For: f.Dep.From})
|
| + if atmpt.DepMap.All(true) {
|
| + if err = atmpt.ModifyState(c, dm.Attempt_SCHEDULING); err != nil {
|
| + return
|
| }
|
| -
|
| - needPut = true
|
| + atmpt.DepMap.Reset()
|
| + muts = append(muts, &ScheduleExecution{For: f.Dep.From})
|
| }
|
| - }
|
|
|
| - if needPut {
|
| err = ds.Put(atmpt)
|
| }
|
|
|
|
|