Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(538)

Side by Side Diff: appengine/cmd/dm/mutate/timeout_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package mutate
6
7 import (
8 "fmt"
9 "time"
10
11 "golang.org/x/net/context"
12
13 "github.com/luci/gae/service/datastore"
14 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
15 "github.com/luci/luci-go/appengine/cmd/dm/model"
16 "github.com/luci/luci-go/appengine/tumble"
17 dm "github.com/luci/luci-go/common/api/dm/service/v1"
18 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/logging"
20 )
21
22 // TimeoutExecution is a named mutation which triggers on a delay. If the
23 // execution is in the noted state when the trigger hits, this sets the
24 // Execution to have an AbnormalFinish status of TIMED_OUT.
25 type TimeoutExecution struct {
26 For *dm.Execution_ID
27 State dm.Execution_State
28 // TimeoutAttempt is the number of attempts to stop a STOPPING execution ,
29 // since this potentially requires an RPC to the distributor to enact.
30 TimeoutAttempt uint
31 Deadline time.Time
32 }
33
34 const maxTimeoutAttempts = 3
35
36 var _ tumble.DelayedMutation = (*TimeoutExecution)(nil)
37
38 // Root implements tumble.Mutation
39 func (t *TimeoutExecution) Root(c context.Context) *datastore.Key {
40 return model.AttemptKeyFromID(c, t.For.AttemptID())
41 }
42
43 // RollForward implements tumble.Mutation
44 func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutatio n, err error) {
45 e := model.ExecutionFromID(c, t.For)
46
47 ds := datastore.Get(c)
48 if err = ds.Get(e); err != nil {
49 return
50 }
51 if e.State != t.State {
52 return
53 }
54
55 // will be overwritten if this execution is STOPPING and the timeout is not
56 // abnormal
57 rslt := &distributor.TaskResult{AbnormalFinish: &dm.AbnormalFinish{
58 Reason: fmt.Sprintf("DM timeout (%s)", e.State),
59 Status: dm.AbnormalFinish_TIMED_OUT}}
60
61 if e.State == dm.Execution_STOPPING {
62 // if it's supposed to be STOPPING, maybe we just missed a notif ication from
63 // the distributor (or the distributor is not using pubsub).
64 reg := distributor.GetRegistry(c)
65 var dist distributor.D
66 var vers string
67 dist, vers, err = reg.MakeDistributor(c, e.DistributorConfigName )
68
69 if vers != "" && vers != e.DistributorConfigVersion {
70 logging.Fields{
71 "cfg_name": e.DistributorConfigName,
72 "orig_cfg_vers": e.DistributorConfigVersion,
73 "cur_cfg_vers": vers,
74 }.Warningf(c, "mismatched distributor config versions")
75 }
76
77 if err != nil {
78 logging.Fields{
79 logging.ErrorKey: err,
80 "cfgName": e.DistributorConfigName,
81 }.Errorf(c, "Could not MakeDistributor")
dnj (Google) 2016/06/09 18:00:57 Does this mean that if we ever unregister a Distri
iannucci 2016/06/15 00:46:01 Added todo: need to distinguish luci-config flake
82 return
83 }
84 var realRslt *distributor.TaskResult
85 realRslt, err = dist.GetStatus(distributor.Token(e.DistributorTo ken))
86 if err != nil || rslt == nil && t.TimeoutAttempt < maxTimeoutAtt empts {
dnj (Google) 2016/06/09 18:00:57 rslt will never be nil here. Do you mean realRslt?
iannucci 2016/06/15 00:46:01 Done.
87 logging.Fields{
88 logging.ErrorKey: err,
89 "task_result": rslt,
dnj (Google) 2016/06/09 18:00:57 (same here)
iannucci 2016/06/15 00:46:01 Done.
90 "timeout_attempt": t.TimeoutAttempt,
91 }.Infof(c, "GetStatus failed/nop'd while timing out STOP PING execution")
92 // TODO(riannucci): do randomized exponential backoff in stead of constant
93 // backoff? Kinda don't really want to spend more than 1 .5m waiting
94 // anyway, and the actual GetStatus call does local retr ies already, so
95 // hopefully this is fine. If this is wrong, the distrib utor should adjust
96 // its timeToStop value to be better.
97 t.Deadline = t.Deadline.Add(time.Second * 30)
98 t.TimeoutAttempt++
99 err = nil
100 muts = append(muts, t)
dnj (Google) 2016/06/09 18:00:56 I think you'll like new Tumble API for this one :)
iannucci 2016/06/15 00:46:01 Acknowledged.
101 return
102 }
103
104 if err != nil {
dnj (Google) 2016/06/09 18:00:57 Can this ever be hit? You handle "err != nil" abov
iannucci 2016/06/15 00:46:01 I was missing some parens above.
105 rslt.AbnormalFinish.Reason = fmt.Sprintf("DM timeout (%s ) w/ error: %s", e.State, err)
106 err = nil
107 } else if realRslt != nil {
108 rslt = realRslt
109 }
110 }
111
112 muts = append(muts, &FinishExecution{t.For, rslt})
113 return
114 }
115
116 // ProcessAfter implements tumble.DelayedMutation
117 func (t *TimeoutExecution) ProcessAfter() time.Time { return t.Deadline }
118
119 // HighPriority implements tumble.DelayedMutation
120 func (t *TimeoutExecution) HighPriority() bool { return false }
121
122 // ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the
123 // Execution's State to determine which timeout should be set, if any. If no
124 // timeout should be active, this will cancel any existing timeouts for this
125 // Execution.
126 func ResetExecutionTimeout(c context.Context, e *model.Execution) error {
127 howLong := time.Duration(0)
128 switch e.State {
129 case dm.Execution_SCHEDULING:
130 howLong = e.TimeToStart
131 case dm.Execution_RUNNING:
132 howLong = e.TimeToRun
133 case dm.Execution_STOPPING:
134 howLong = e.TimeToStop
135 }
136 eid := e.GetEID()
137 key := model.ExecutionKeyFromID(c, eid)
138 if howLong == 0 {
139 return tumble.CancelNamedMutations(c, key, "timeout")
140 }
141 return tumble.PutNamedMutations(c, key, map[string]tumble.Mutation{
142 "timeout": &TimeoutExecution{eid, e.State, 0, clock.Now(c).UTC() .Add(howLong)},
143 })
144 }
145
146 func init() {
147 tumble.Register((*TimeoutExecution)(nil))
148 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698