Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(402)

Unified Diff: appengine/cmd/dm/mutate/finish_execution.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: self review Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/cmd/dm/mutate/finish_execution.go
diff --git a/appengine/cmd/dm/mutate/finish_execution.go b/appengine/cmd/dm/mutate/finish_execution.go
new file mode 100644
index 0000000000000000000000000000000000000000..ecea9d59962d3a4f50719cf2b801f78c55854edb
--- /dev/null
+++ b/appengine/cmd/dm/mutate/finish_execution.go
@@ -0,0 +1,154 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package mutate
+
+import (
+ "fmt"
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/filter/txnBuf"
+ "github.com/luci/gae/service/datastore"
+
+ "github.com/luci/luci-go/appengine/tumble"
+ dm "github.com/luci/luci-go/common/api/dm/service/v1"
+
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor"
+ "github.com/luci/luci-go/appengine/cmd/dm/model"
+)
+
+// FinishExecution records the final state of the Execution, and advances the
+// Attempt state machine.
+type FinishExecution struct {
iannucci 2016/06/08 02:54:24 this is the major state machine change. Previously
+ EID *dm.Execution_ID
+ Result *distributor.TaskResult
+}
+
+// Root implements tumble.Mutation
+func (f *FinishExecution) Root(c context.Context) *datastore.Key {
+ return model.ExecutionKeyFromID(c, f.EID)
+}
+
+// shouldRetry loads the quest for this attempt, to determine if the attempt can
+// be retried. As a side-effect, it increments the RetryState counter for the
+// indicated failure type.
+//
+// If stat is not a retryable AbnormalFinish_Status, this will panic.
+func shouldRetry(c context.Context, a *model.Attempt, stat dm.AbnormalFinish_Status) (retry bool, err error) {
+ if !stat.CouldRetry() {
+ return
+ }
+ q := model.QuestFromID(a.ID.Quest)
+ dsNoTxn := txnBuf.GetNoTxn(c)
+ if err = dsNoTxn.Get(q); err != nil {
+ return
+ }
+ var cur, max uint32
+ switch stat {
+ case dm.AbnormalFinish_FAILED:
+ cur, max = a.RetryState.Failed, q.Desc.Meta.Retry.Failed
+ a.RetryState.Failed++
+ case dm.AbnormalFinish_CRASHED:
+ cur, max = a.RetryState.Crashed, q.Desc.Meta.Retry.Crashed
+ a.RetryState.Crashed++
+ case dm.AbnormalFinish_EXPIRED:
+ cur, max = a.RetryState.Expired, q.Desc.Meta.Retry.Expired
+ a.RetryState.Expired++
+ case dm.AbnormalFinish_TIMED_OUT:
+ cur, max = a.RetryState.TimedOut, q.Desc.Meta.Retry.TimedOut
+ a.RetryState.TimedOut++
+ default:
+ panic(fmt.Errorf("do not know how to retry %q", stat))
+ }
+ retry = cur < max
+ return
+}
+
+// RollForward implements tumble.Mutation
+func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error) {
+ a := model.AttemptFromID(f.EID.AttemptID())
+ e := model.ExecutionFromID(c, f.EID)
+
+ ds := datastore.Get(c)
+ if err = ds.GetMulti([]interface{}{a, e}); err != nil {
+ return
+ }
+
+ if a.State == dm.Attempt_EXECUTING && a.CurExecution == f.EID.Id && !e.State.Terminal() {
dnj (Google) 2016/06/09 18:00:56 For indentation purposes, maybe invert this and re
iannucci 2016/06/15 00:46:01 Done.
+ if f.Result.AbnormalFinish == nil && e.State != dm.Execution_STOPPING {
+ f.Result.AbnormalFinish = &dm.AbnormalFinish{
+ Status: dm.AbnormalFinish_FAILED,
+ Reason: fmt.Sprintf("distributor finished execution while it was in the %s state.", e.State),
+ }
+ }
+
+ if ab := f.Result.AbnormalFinish; ab != nil {
+ if err = e.ModifyState(c, dm.Execution_ABNORMAL_FINISHED); err != nil {
+ return
+ }
+ e.AbnormalFinish = *ab
+
+ var retry bool
+ if retry, err = shouldRetry(c, a, ab.Status); err != nil {
+ return
+ } else if retry {
+ if err = a.ModifyState(c, dm.Attempt_SCHEDULING); err != nil {
+ return
+ }
+ a.DepMap.Reset()
+ muts = append(muts, &ScheduleExecution{&a.ID})
+ } else {
+ // ran out of retries, or non-retriable error type
+ if err = a.ModifyState(c, dm.Attempt_ABNORMAL_FINISHED); err != nil {
+ return
+ }
+ a.AbnormalFinish = *ab
+ }
+ } else {
+ if err = e.ModifyState(c, dm.Execution_FINISHED); err != nil {
+ return
+ }
+ e.ResultPersistentState = string(f.Result.PersistentState)
+
+ a.PersistentState = string(f.Result.PersistentState)
+ a.RetryState.Reset()
+
+ if a.DepMap.Size() > 0 {
+ if err = a.ModifyState(c, dm.Attempt_WAITING); err != nil {
+ return
+ }
+ } else {
+ if err = a.ModifyState(c, dm.Attempt_FINISHED); err != nil {
+ return
+ }
+ muts = append(muts, &RecordCompletion{f.EID.AttemptID()})
+ }
+ }
+
+ // best-effort reset execution timeout
+ _ = ResetExecutionTimeout(c, e)
+
+ err = ds.PutMulti([]interface{}{a, e})
+ }
+ return
+}
+
+// FinishExecutionFn is the implementation of distributor.FinishExecutionFn.
+// It's defined here to avoid a circular dependency.
+func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *distributor.TaskResult) ([]tumble.Mutation, error) {
+ return []tumble.Mutation{&FinishExecution{EID: eid, Result: rslt}}, nil
+}
+
+// NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation
+// with some abnomal result.
+func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_Status, reason string) *FinishExecution {
+ return &FinishExecution{
+ eid, &distributor.TaskResult{
+ AbnormalFinish: &dm.AbnormalFinish{
+ Status: status, Reason: reason}}}
+}
+
+func init() {
+ tumble.Register((*FinishExecution)(nil))
+}

Powered by Google App Engine
This is Rietveld 408576698