| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package mutate |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "time" |
| 10 |
| 11 "golang.org/x/net/context" |
| 12 |
| 13 "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| 15 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 16 "github.com/luci/luci-go/appengine/tumble" |
| 17 dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 18 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/logging" |
| 20 ) |
| 21 |
| 22 // TimeoutExecution is a named mutation which triggers on a delay. If the |
| 23 // execution is in the noted state when the trigger hits, this sets the |
| 24 // Execution to have an AbnormalFinish status of TIMED_OUT. |
| 25 type TimeoutExecution struct { |
| 26 For *dm.Execution_ID |
| 27 State dm.Execution_State |
| 28 // TimeoutAttempt is the number of attempts to stop a STOPPING execution
, |
| 29 // since this potentially requires an RPC to the distributor to enact. |
| 30 TimeoutAttempt uint |
| 31 Deadline time.Time |
| 32 } |
| 33 |
| 34 const maxTimeoutAttempts = 3 |
| 35 |
| 36 var _ tumble.DelayedMutation = (*TimeoutExecution)(nil) |
| 37 |
| 38 // Root implements tumble.Mutation |
| 39 func (t *TimeoutExecution) Root(c context.Context) *datastore.Key { |
| 40 return model.AttemptKeyFromID(c, t.For.AttemptID()) |
| 41 } |
| 42 |
| 43 // RollForward implements tumble.Mutation |
| 44 func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutatio
n, err error) { |
| 45 e := model.ExecutionFromID(c, t.For) |
| 46 |
| 47 ds := datastore.Get(c) |
| 48 if err = ds.Get(e); err != nil { |
| 49 return |
| 50 } |
| 51 if e.State != t.State { |
| 52 return |
| 53 } |
| 54 |
| 55 // will be overwritten if this execution is STOPPING and the timeout is
not |
| 56 // abnormal |
| 57 rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{ |
| 58 Reason: fmt.Sprintf("DM timeout (%s)", e.State), |
| 59 Status: dm.AbnormalFinish_TIMED_OUT}} |
| 60 |
| 61 if e.State == dm.Execution_STOPPING { |
| 62 // if it's supposed to be STOPPING, maybe we just missed a notif
ication from |
| 63 // the distributor (or the distributor is not using pubsub). |
| 64 reg := distributor.GetRegistry(c) |
| 65 var dist distributor.D |
| 66 var vers string |
| 67 dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName
) |
| 68 |
| 69 if vers != "" && vers != e.DistributorConfigVersion { |
| 70 logging.Fields{ |
| 71 "cfg_name": e.DistributorConfigName, |
| 72 "orig_cfg_vers": e.DistributorConfigVersion, |
| 73 "cur_cfg_vers": vers, |
| 74 }.Warningf(c, "mismatched distributor config versions") |
| 75 } |
| 76 |
| 77 if err != nil { |
| 78 logging.Fields{ |
| 79 logging.ErrorKey: err, |
| 80 "cfgName": e.DistributorConfigName, |
| 81 }.Errorf(c, "Could not MakeDistributor") |
| 82 return |
| 83 } |
| 84 var realRslt *distributor.TaskResult |
| 85 realRslt, err = dist.GetStatus(distributor.Token(e.DistributorTo
ken)) |
| 86 if err != nil || rslt == nil && t.TimeoutAttempt < maxTimeoutAtt
empts { |
| 87 logging.Fields{ |
| 88 logging.ErrorKey: err, |
| 89 "task_result": rslt, |
| 90 "timeout_attempt": t.TimeoutAttempt, |
| 91 }.Infof(c, "GetStatus failed/nop'd while timing out STOP
PING execution") |
| 92 // TODO(riannucci): do randomized exponential backoff in
stead of constant |
| 93 // backoff? Kinda don't really want to spend more than 1
.5m waiting |
| 94 // anyway, and the actual GetStatus call does local retr
ies already, so |
| 95 // hopefully this is fine. If this is wrong, the distrib
utor should adjust |
| 96 // its timeToStop value to be better. |
| 97 t.Deadline = t.Deadline.Add(time.Second * 30) |
| 98 t.TimeoutAttempt++ |
| 99 err = nil |
| 100 muts = append(muts, t) |
| 101 return |
| 102 } |
| 103 |
| 104 if err != nil { |
| 105 rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s
) w/ error: %s", e.State, err) |
| 106 err = nil |
| 107 } else if realRslt != nil { |
| 108 rslt = realRslt |
| 109 } |
| 110 } |
| 111 |
| 112 muts = append(muts, &FinishExecution{t.For, rslt}) |
| 113 return |
| 114 } |
| 115 |
| 116 // ProcessAfter implements tumble.DelayedMutation |
| 117 func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline } |
| 118 |
| 119 // HighPriority implements tumble.DelayedMutation |
| 120 func (t *TimeoutExecution) HighPriority() bool { return false } |
| 121 |
| 122 // ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the |
| 123 // Execution's State to determine which timeout should be set, if any. If no |
| 124 // timeout should be active, this will cancel any existing timeouts for this |
| 125 // Execution. |
| 126 func ResetExecutionTimeout(c context.Context, e *model.Execution) error { |
| 127 howLong := time.Duration(0) |
| 128 switch e.State { |
| 129 case dm.Execution_SCHEDULING: |
| 130 howLong = e.TimeToStart |
| 131 case dm.Execution_RUNNING: |
| 132 howLong = e.TimeToRun |
| 133 case dm.Execution_STOPPING: |
| 134 howLong = e.TimeToStop |
| 135 } |
| 136 eid := e.GetEID() |
| 137 key := model.ExecutionKeyFromID(c, eid) |
| 138 if howLong == 0 { |
| 139 return tumble.CancelNamedMutations(c, key, "timeout") |
| 140 } |
| 141 return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{ |
| 142 "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC()
.Add(howLong)}, |
| 143 }) |
| 144 } |
| 145 |
| 146 func init() { |
| 147 tumble.Register((*TimeoutExecution)(nil)) |
| 148 } |
| OLD | NEW |