Chromium Code Reviews| Index: appengine/cmd/dm/mutate/timeout_execution.go |
| diff --git a/appengine/cmd/dm/mutate/timeout_execution.go b/appengine/cmd/dm/mutate/timeout_execution.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..76ee16f8b0406dcfdf0891ecf6ff83b4f162f06e |
| --- /dev/null |
| +++ b/appengine/cmd/dm/mutate/timeout_execution.go |
| @@ -0,0 +1,148 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package mutate |
| + |
| +import ( |
| + "fmt" |
| + "time" |
| + |
| + "golang.org/x/net/context" |
| + |
| + "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| + "github.com/luci/luci-go/appengine/cmd/dm/model" |
| + "github.com/luci/luci-go/appengine/tumble" |
| + dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/logging" |
| +) |
| + |
| +// TimeoutExecution is a named mutation which triggers on a delay. If the |
| +// execution is in the noted state when the trigger hits, this sets the |
| +// Execution to have an AbnormalFinish status of TIMED_OUT. |
| +type TimeoutExecution struct { |
| + For *dm.Execution_ID |
| + State dm.Execution_State |
| + // TimeoutAttempt is the number of attempts to stop a STOPPING execution, |
| + // since this potentially requires an RPC to the distributor to enact. |
| + TimeoutAttempt uint |
| + Deadline time.Time |
| +} |
| + |
| +const maxTimeoutAttempts = 3 |
| + |
| +var _ tumble.DelayedMutation = (*TimeoutExecution)(nil) |
| + |
| +// Root implements tumble.Mutation |
| +func (t *TimeoutExecution) Root(c context.Context) *datastore.Key { |
| + return model.AttemptKeyFromID(c, t.For.AttemptID()) |
| +} |
| + |
| +// RollForward implements tumble.Mutation |
| +func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) { |
| + e := model.ExecutionFromID(c, t.For) |
| + |
| + ds := datastore.Get(c) |
| + if err = ds.Get(e); err != nil { |
| + return |
| + } |
| + if e.State != t.State { |
| + return |
| + } |
| + |
| + // will be overwritten if this execution is STOPPING and the timeout is not |
| + // abnormal |
| + rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{ |
| + Reason: fmt.Sprintf("DM timeout (%s)", e.State), |
| + Status: dm.AbnormalFinish_TIMED_OUT}} |
| + |
| + if e.State == dm.Execution_STOPPING { |
| + // if it's supposed to be STOPPING, maybe we just missed a notification from |
| + // the distributor (or the distributor is not using pubsub). |
| + reg := distributor.GetRegistry(c) |
| + var dist distributor.D |
| + var vers string |
| + dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName) |
| + |
| + if vers != "" && vers != e.DistributorConfigVersion { |
| + logging.Fields{ |
| + "cfg_name": e.DistributorConfigName, |
| + "orig_cfg_vers": e.DistributorConfigVersion, |
| + "cur_cfg_vers": vers, |
| + }.Warningf(c, "mismatched distributor config versions") |
| + } |
| + |
| + if err != nil { |
| + logging.Fields{ |
| + logging.ErrorKey: err, |
| + "cfgName": e.DistributorConfigName, |
| + }.Errorf(c, "Could not MakeDistributor") |
|
dnj (Google)
2016/06/09 18:00:57
Does this mean that if we ever unregister a Distri
iannucci
2016/06/15 00:46:01
Added todo: need to distinguish luci-config flake
|
| + return |
| + } |
| + var realRslt *distributor.TaskResult |
| + realRslt, err = dist.GetStatus(distributor.Token(e.DistributorToken)) |
| + if err != nil || rslt == nil && t.TimeoutAttempt < maxTimeoutAttempts { |
|
dnj (Google)
2016/06/09 18:00:57
rslt will never be nil here. Do you mean realRslt?
iannucci
2016/06/15 00:46:01
Done.
|
| + logging.Fields{ |
| + logging.ErrorKey: err, |
| + "task_result": rslt, |
|
dnj (Google)
2016/06/09 18:00:57
(same here)
iannucci
2016/06/15 00:46:01
Done.
|
| + "timeout_attempt": t.TimeoutAttempt, |
| + }.Infof(c, "GetStatus failed/nop'd while timing out STOPPING execution") |
| + // TODO(riannucci): do randomized exponential backoff instead of constant |
| + // backoff? Kinda don't really want to spend more than 1.5m waiting |
| + // anyway, and the actual GetStatus call does local retries already, so |
| + // hopefully this is fine. If this is wrong, the distributor should adjust |
| + // its timeToStop value to be better. |
| + t.Deadline = t.Deadline.Add(time.Second * 30) |
| + t.TimeoutAttempt++ |
| + err = nil |
| + muts = append(muts, t) |
|
dnj (Google)
2016/06/09 18:00:56
I think you'll like new Tumble API for this one :)
iannucci
2016/06/15 00:46:01
Acknowledged.
|
| + return |
| + } |
| + |
| + if err != nil { |
|
dnj (Google)
2016/06/09 18:00:57
Can this ever be hit? You handle "err != nil" abov
iannucci
2016/06/15 00:46:01
I was missing some parens above.
|
| + rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s) w/ error: %s", e.State, err) |
| + err = nil |
| + } else if realRslt != nil { |
| + rslt = realRslt |
| + } |
| + } |
| + |
| + muts = append(muts, &FinishExecution{t.For, rslt}) |
| + return |
| +} |
| + |
| +// ProcessAfter implements tumble.DelayedMutation |
| +func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline } |
| + |
| +// HighPriority implements tumble.DelayedMutation |
| +func (t *TimeoutExecution) HighPriority() bool { return false } |
| + |
| +// ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the |
| +// Execution's State to determine which timeout should be set, if any. If no |
| +// timeout should be active, this will cancel any existing timeouts for this |
| +// Execution. |
| +func ResetExecutionTimeout(c context.Context, e *model.Execution) error { |
| + howLong := time.Duration(0) |
| + switch e.State { |
| + case dm.Execution_SCHEDULING: |
| + howLong = e.TimeToStart |
| + case dm.Execution_RUNNING: |
| + howLong = e.TimeToRun |
| + case dm.Execution_STOPPING: |
| + howLong = e.TimeToStop |
| + } |
| + eid := e.GetEID() |
| + key := model.ExecutionKeyFromID(c, eid) |
| + if howLong == 0 { |
| + return tumble.CancelNamedMutations(c, key, "timeout") |
| + } |
| + return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{ |
| + "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC().Add(howLong)}, |
| + }) |
| +} |
| + |
| +func init() { |
| + tumble.Register((*TimeoutExecution)(nil)) |
| +} |