OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package mutate |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "time" |
| 10 |
| 11 "golang.org/x/net/context" |
| 12 |
| 13 "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| 15 "github.com/luci/luci-go/appengine/cmd/dm/model" |
| 16 "github.com/luci/luci-go/appengine/tumble" |
| 17 dm "github.com/luci/luci-go/common/api/dm/service/v1" |
| 18 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/logging" |
| 20 ) |
| 21 |
| 22 // TimeoutExecution is a named mutation which triggers on a delay. If the |
| 23 // execution is in the noted state when the trigger hits, this sets the |
| 24 // Execution to have an AbnormalFinish status of TIMED_OUT. |
| 25 type TimeoutExecution struct { |
| 26 For *dm.Execution_ID |
| 27 State dm.Execution_State |
| 28 // TimeoutAttempt is the number of attempts to stop a STOPPING execution
, |
| 29 // since this potentially requires an RPC to the distributor to enact. |
| 30 TimeoutAttempt uint |
| 31 Deadline time.Time |
| 32 } |
| 33 |
| 34 const maxTimeoutAttempts = 3 |
| 35 |
| 36 var _ tumble.DelayedMutation = (*TimeoutExecution)(nil) |
| 37 |
| 38 // Root implements tumble.Mutation |
| 39 func (t *TimeoutExecution) Root(c context.Context) *datastore.Key { |
| 40 return model.AttemptKeyFromID(c, t.For.AttemptID()) |
| 41 } |
| 42 |
| 43 // RollForward implements tumble.Mutation |
| 44 func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutatio
n, err error) { |
| 45 e := model.ExecutionFromID(c, t.For) |
| 46 |
| 47 ds := datastore.Get(c) |
| 48 if err = ds.Get(e); err != nil { |
| 49 return |
| 50 } |
| 51 if e.State != t.State { |
| 52 return |
| 53 } |
| 54 |
| 55 // will be overwritten if this execution is STOPPING and the timeout is
not |
| 56 // abnormal |
| 57 rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{ |
| 58 Reason: fmt.Sprintf("DM timeout (%s)", e.State), |
| 59 Status: dm.AbnormalFinish_TIMED_OUT}} |
| 60 |
| 61 if e.State == dm.Execution_STOPPING { |
| 62 // if it's supposed to be STOPPING, maybe we just missed a notif
ication from |
| 63 // the distributor (or the distributor is not using pubsub). |
| 64 reg := distributor.GetRegistry(c) |
| 65 var dist distributor.D |
| 66 var vers string |
| 67 dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName
) |
| 68 |
| 69 if vers != "" && vers != e.DistributorConfigVersion { |
| 70 logging.Fields{ |
| 71 "cfg_name": e.DistributorConfigName, |
| 72 "orig_cfg_vers": e.DistributorConfigVersion, |
| 73 "cur_cfg_vers": vers, |
| 74 }.Warningf(c, "mismatched distributor config versions") |
| 75 } |
| 76 |
| 77 // TODO(iannucci): make this set the REJECTED state if we loaded
the config, |
| 78 // but the distributor no longer exists. |
| 79 if err != nil { |
| 80 logging.Fields{ |
| 81 logging.ErrorKey: err, |
| 82 "cfgName": e.DistributorConfigName, |
| 83 }.Errorf(c, "Could not MakeDistributor") |
| 84 return |
| 85 } |
| 86 var realRslt *distributor.TaskResult |
| 87 realRslt, err = dist.GetStatus(distributor.Token(e.DistributorTo
ken)) |
| 88 if (err != nil || realRslt == nil) && t.TimeoutAttempt < maxTime
outAttempts { |
| 89 logging.Fields{ |
| 90 logging.ErrorKey: err, |
| 91 "task_result": realRslt, |
| 92 "timeout_attempt": t.TimeoutAttempt, |
| 93 }.Infof(c, "GetStatus failed/nop'd while timing out STOP
PING execution") |
| 94 // TODO(riannucci): do randomized exponential backoff in
stead of constant |
| 95 // backoff? Kinda don't really want to spend more than 1
.5m waiting |
| 96 // anyway, and the actual GetStatus call does local retr
ies already, so |
| 97 // hopefully this is fine. If this is wrong, the distrib
utor should adjust |
| 98 // its timeToStop value to be better. |
| 99 t.Deadline = t.Deadline.Add(time.Second * 30) |
| 100 t.TimeoutAttempt++ |
| 101 err = nil |
| 102 muts = append(muts, t) |
| 103 return |
| 104 } |
| 105 |
| 106 if err != nil { |
| 107 rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s
) w/ error: %s", e.State, err) |
| 108 err = nil |
| 109 } else if realRslt != nil { |
| 110 rslt = realRslt |
| 111 } |
| 112 } |
| 113 |
| 114 muts = append(muts, &FinishExecution{t.For, rslt}) |
| 115 return |
| 116 } |
| 117 |
| 118 // ProcessAfter implements tumble.DelayedMutation |
| 119 func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline } |
| 120 |
| 121 // HighPriority implements tumble.DelayedMutation |
| 122 func (t *TimeoutExecution) HighPriority() bool { return false } |
| 123 |
| 124 // ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the |
| 125 // Execution's State to determine which timeout should be set, if any. If no |
| 126 // timeout should be active, this will cancel any existing timeouts for this |
| 127 // Execution. |
| 128 func ResetExecutionTimeout(c context.Context, e *model.Execution) error { |
| 129 howLong := time.Duration(0) |
| 130 switch e.State { |
| 131 case dm.Execution_SCHEDULING: |
| 132 howLong = e.TimeToStart |
| 133 case dm.Execution_RUNNING: |
| 134 howLong = e.TimeToRun |
| 135 case dm.Execution_STOPPING: |
| 136 howLong = e.TimeToStop |
| 137 } |
| 138 eid := e.GetEID() |
| 139 key := model.ExecutionKeyFromID(c, eid) |
| 140 if howLong == 0 { |
| 141 return tumble.CancelNamedMutations(c, key, "timeout") |
| 142 } |
| 143 return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{ |
| 144 "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC()
.Add(howLong)}, |
| 145 }) |
| 146 } |
| 147 |
| 148 func init() { |
| 149 tumble.Register((*TimeoutExecution)(nil)) |
| 150 } |
OLD | NEW |