Chromium Code Reviews| Index: appengine/cmd/dm/model/execution.go |
| diff --git a/appengine/cmd/dm/model/execution.go b/appengine/cmd/dm/model/execution.go |
| index c0b233512f494ee54cec0701c2f6a981ba739703..99413e1987fdf93871dcd21da9d0628d50c2a846 100644 |
| --- a/appengine/cmd/dm/model/execution.go |
| +++ b/appengine/cmd/dm/model/execution.go |
| @@ -16,7 +16,9 @@ import ( |
| "golang.org/x/net/context" |
| "github.com/luci/gae/service/datastore" |
| - "github.com/luci/luci-go/common/api/dm/service/v1" |
| + dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/cryptorand" |
| "github.com/luci/luci-go/common/grpcutil" |
| "github.com/luci/luci-go/common/logging" |
| google_pb "github.com/luci/luci-go/common/proto/google" |
| @@ -30,12 +32,31 @@ type Execution struct { |
| ID uint32 `gae:"$id"` |
| Attempt *datastore.Key `gae:"$parent"` |
| - State dm.Execution_State |
| - StateReason string `gae:",noindex"` |
| + Created time.Time |
| + Modified time.Time |
| - Created time.Time |
| - DistributorToken string |
| - DistributorURL string `gae:",noindex"` |
| + DistributorConfigName string |
| + DistributorConfigVersion string |
| + DistributorToken string |
| + |
| + State dm.Execution_State |
| + |
| + // Only valid in the ABNORMAL_FINISHED state. |
| + AbnormalFinish dm.AbnormalFinish |
| + |
| + // Only valid in the FINISHED state. |
| + ResultPersistentState string |
| + |
| + // These are DM's internal mechanism for performing timeout actions on |
| + // Executions. |
| + // |
| + // The TimeTo* variables are filled in by the distributor when this Execution |
| + // is created. |
| + // |
| + // The Timeout is only active when the Execution is in a non-terminal state. |
| + TimeToStart time.Duration `gae:",noindex"` // SCHEDULING -> RUNNING |
| + TimeToRun time.Duration `gae:",noindex"` // RUNNING -> STOPPING |
| + TimeToStop time.Duration `gae:",noindex"` // STOPPING -> FINISHED |
|
iannucci
2016/06/08 02:54:24
timeouts!
|
| // Token is a randomized nonce that's used to verify that RPCs verify from the |
| // expected client (the client that's currently running the Execution). The |
| @@ -43,21 +64,77 @@ type Execution struct { |
| // |
| // When the Execution is handed to the distributor, the Token is randomly |
| // generated by DM and passed to the distributor. The State of the Execution |
| - // starts as Scheduled. This token may be used by the client to "activate" the |
| + // starts as SCHEDULED. This token may be used by the client to "activate" the |
| // Execution with the ActivateExecution rpc. At that point, the client |
| - // provides a new random token, the Execution State moves from Scheduled to |
| - // Running, and Token assumes the new value. As long as the Execution State is |
| - // running, the client may continue to use that new Token value to |
| + // provides a new random token, the Execution State moves from SCHEDULED to |
| + // RUNNING, and Token assumes the new value. As long as the Execution State is |
| + // RUNNING, the client may continue to use that new Token value to |
| // authenticate other rpc's like AddDeps and FinishAttempt. |
| // |
| - // As soon as the Execution is no longer supposed to have access, Token will |
| - // be nil'd out. |
| + // As soon as the Execution is in the STOPPING, ABNORMAL_FINISHED or FINISHED |
| + // state, this will be nil'd out. |
| Token []byte `gae:",noindex"` |
| } |
| -// Revoke will null-out the Token and Put this Execution to the datastore. |
| +// MakeExecution makes a new Execution in the SCHEDULING state, with a new |
| +// random Token. |
| +func MakeExecution(c context.Context, e *dm.Execution_ID, cfgName, cfgVers string) *Execution { |
| + now := clock.Now(c).UTC() |
| + ret := &Execution{ |
| + ID: e.Id, |
| + Attempt: datastore.Get(c).MakeKey("Attempt", e.AttemptID().DMEncoded()), |
|
dnj (Google)
2016/06/09 18:00:56
AttemptKeyFromID?
iannucci
2016/06/15 00:46:01
good catch, I tried to find these all but apparent
|
| + |
| + Created: now, |
| + Modified: now, |
| + |
| + DistributorConfigName: cfgName, |
| + DistributorConfigVersion: cfgVers, |
| + |
| + Token: MakeRandomToken(c, dm.MinimumActivationTokenLength), |
| + } |
| + return ret |
| +} |
| + |
| +// ModifyState changes the current state of this Execution and updates its |
| +// Modified timestamp. |
| +func (e *Execution) ModifyState(c context.Context, newState dm.Execution_State) error { |
| + if e.State == newState { |
| + return nil |
| + } |
| + if err := e.State.Evolve(newState); err != nil { |
| + return err |
| + } |
| + now := clock.Now(c).UTC() |
| + if now.After(e.Modified) { |
| + e.Modified = now |
| + } else { |
| + // Microsecond is the smallest granularity that datastore can store |
| + // timestamps, so use that to disambiguate: the goal here is that any |
| + // modification always increments the modified time, and never decrements |
| + // it. |
| + e.Modified = e.Modified.Add(time.Microsecond) |
| + } |
| + return nil |
| +} |
| + |
| +// MakeRandomToken creates a cryptographically random byte slice of the |
| +// specified length. It panics if the specified length cannot be read in full. |
| +func MakeRandomToken(c context.Context, l uint32) []byte { |
| + rtok := make([]byte, l) |
| + if _, err := cryptorand.Read(c, rtok); err != nil { |
| + panic(err) |
| + } |
| + return rtok |
| +} |
| + |
| +// Revoke will null-out the Token and Put this Execution to the datastore. This |
|
dnj (Google)
2016/06/09 18:00:56
nit: "nil-out" in above documentation, "null-out"
iannucci
2016/06/15 00:46:01
Done.
|
| +// action requires the Execution to be in the RUNNING state, and causes it to |
| +// enter the STOPPING state. |
| func (e *Execution) Revoke(c context.Context) error { |
| e.Token = nil |
| + if err := e.ModifyState(c, dm.Execution_STOPPING); err != nil { |
| + return err |
| + } |
| return datastore.Get(c).Put(e) |
| } |
| @@ -69,7 +146,8 @@ func loadExecution(c context.Context, eid *dm.Execution_ID) (a *Attempt, e *Exec |
| err = datastore.Get(c).GetMulti([]interface{}{a, e}) |
| if err != nil { |
| - err = fmt.Errorf("couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) |
| + err = grpcutil.Errf(codes.Internal, |
| + "couldn't get attempt %v or its execution %d: %s", a.ID, e.ID, err) |
| return |
| } |
| @@ -102,6 +180,14 @@ func verifyExecutionAndCheckExTok(c context.Context, auth *dm.Execution_Auth) (a |
| return |
| } |
| +func makeError(err error, msg string) error { |
| + code := grpcutil.Code(err) |
| + if code == codes.Unknown { |
| + code = codes.Unauthenticated |
|
dnj (Google)
2016/06/09 18:00:56
Does this make sense? Unauthenticated specifically
iannucci
2016/06/15 00:46:01
Yeah, basically: unless we specifically tagged thi
|
| + } |
| + return grpcutil.Errf(code, msg) |
| +} |
| + |
| // AuthenticateExecution verifies that the Attempt is executing, and that evkey |
| // matches the execution key of the current Execution for this Attempt. |
| // |
| @@ -110,7 +196,7 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem |
| a, e, err = verifyExecutionAndCheckExTok(c, auth) |
| if err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") |
| - err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth") |
| + err = makeError(err, "requires execution Auth") |
| } |
| return a, e, err |
| } |
| @@ -122,14 +208,14 @@ func AuthenticateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attem |
| func InvalidateExecution(c context.Context, auth *dm.Execution_Auth) (a *Attempt, e *Execution, err error) { |
| if a, e, err = verifyExecutionAndCheckExTok(c, auth); err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to verify execution") |
| - err = grpcutil.Errf(codes.Unauthenticated, "requires execution Auth") |
| + err = makeError(err, "requires execution Auth") |
| return |
| } |
| err = e.Revoke(c) |
| if err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to revoke execution") |
| - err = grpcutil.Errf(codes.Internal, "unable to invalidate Auth") |
| + err = makeError(err, "unable to invalidate Auth") |
| } |
| return |
| } |
| @@ -146,7 +232,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT |
| } |
| switch e.State { |
| - case dm.Execution_SCHEDULED: |
| + case dm.Execution_SCHEDULING: |
| if subtle.ConstantTimeCompare(e.Token, auth.Token) != 1 { |
| err = errors.New("incorrect ActivationToken") |
| return |
| @@ -155,6 +241,7 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT |
| e.State.MustEvolve(dm.Execution_RUNNING) |
| e.Token = actTok |
| err = datastore.Get(c).Put(e) |
| + logging.Infof(c, "activated execution %s: was SCHEDULING now RUNNING", auth.Id) |
| case dm.Execution_RUNNING: |
| if subtle.ConstantTimeCompare(e.Token, actTok) != 1 { |
| @@ -164,6 +251,8 @@ func verifyExecutionAndActivate(c context.Context, auth *dm.Execution_Auth, actT |
| // by the same client, so there's no error, or it's wrong which means it's |
| // a retry by a different client. |
| + logging.Infof(c, "already activated execution %s", auth.Id) |
| + |
| default: |
| err = fmt.Errorf("Execution is in wrong state") |
| } |
| @@ -180,28 +269,58 @@ func ActivateExecution(c context.Context, auth *dm.Execution_Auth, actToken []by |
| a, e, err = verifyExecutionAndActivate(c, auth, actToken) |
| if err != nil { |
| logging.Fields{ek: err, "eid": auth.Id}.Errorf(c, "failed to activate execution") |
| - err = grpcutil.Errf(codes.Unauthenticated, "failed to activate execution Auth") |
| + err = makeError(err, "failed to activate execution Auth") |
| } |
| return a, e, err |
| } |
| +// GetEID gets an Execution_ID for this Execution. It panics if the Execution |
| +// is in an invalid state. |
| +func (e *Execution) GetEID() *dm.Execution_ID { |
| + aid := &dm.Attempt_ID{} |
| + if e.ID == 0 { |
| + panic("cannot create valid Execution_ID with 0-value ID field") |
| + } |
| + if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { |
| + panic(err) |
| + } |
| + return dm.NewExecutionID(aid.Quest, aid.Id, e.ID) |
| +} |
| + |
| // ToProto returns a dm proto version of this Execution. |
| func (e *Execution) ToProto(includeID bool) *dm.Execution { |
| - ret := &dm.Execution{ |
| - Data: &dm.Execution_Data{ |
| - State: e.State, |
| - StateReason: e.StateReason, |
| - Created: google_pb.NewTimestamp(e.Created), |
| - DistributorToken: e.DistributorToken, |
| - DistributorInfoUrl: e.DistributorURL, |
| - }, |
| - } |
| + ret := &dm.Execution{Data: e.DataProto()} |
| if includeID { |
| - aid := &dm.Attempt_ID{} |
| - if err := aid.SetDMEncoded(e.Attempt.StringID()); err != nil { |
| - panic(err) |
| - } |
| - ret.Id = dm.NewExecutionID(aid.Quest, aid.Id, e.ID) |
| + ret.Id = e.GetEID() |
| + } |
| + return ret |
| +} |
| + |
| +// DataProto returns an Execution.Data message for this Execution. |
| +// |
| +// This omits the DistributorInfo.Url portion, which must be filled in elsewhere for |
| +// package cyclical import reasons. |
| +func (e *Execution) DataProto() (ret *dm.Execution_Data) { |
| + switch e.State { |
| + case dm.Execution_SCHEDULING: |
| + ret = dm.NewExecutionScheduling().Data |
| + case dm.Execution_RUNNING: |
| + ret = dm.NewExecutionRunning().Data |
| + case dm.Execution_STOPPING: |
| + ret = dm.NewExecutionStopping().Data |
| + case dm.Execution_FINISHED: |
| + ret = dm.NewExecutionFinished(string(e.ResultPersistentState)).Data |
| + case dm.Execution_ABNORMAL_FINISHED: |
| + ret = dm.NewExecutionAbnormalFinish(&e.AbnormalFinish).Data |
| + default: |
| + panic(fmt.Errorf("unknown Execution_State: %s", e.State)) |
| + } |
| + ret.Created = google_pb.NewTimestamp(e.Created) |
| + ret.Modified = google_pb.NewTimestamp(e.Modified) |
| + ret.DistributorInfo = &dm.Execution_Data_DistributorInfo{ |
| + ConfigName: e.DistributorConfigName, |
| + ConfigVersion: e.DistributorConfigVersion, |
| + Token: e.DistributorToken, |
| } |
| return ret |
| } |