Index: scheduler/appengine/engine/engine.go |
diff --git a/scheduler/appengine/engine/engine.go b/scheduler/appengine/engine/engine.go |
index 59488dc1bc5ef02b1dbc941b654695874ceed098..8a705fd4170935c4842e6c2b632bd6b66c3292b2 100644 |
--- a/scheduler/appengine/engine/engine.go |
+++ b/scheduler/appengine/engine/engine.go |
@@ -32,6 +32,7 @@ import ( |
"github.com/luci/luci-go/common/data/stringset" |
"github.com/luci/luci-go/common/errors" |
"github.com/luci/luci-go/common/logging" |
+ "github.com/luci/luci-go/common/retry/transient" |
"github.com/luci/luci-go/server/auth" |
"github.com/luci/luci-go/server/auth/identity" |
"github.com/luci/luci-go/server/auth/signing" |
@@ -561,7 +562,7 @@ func (e *engineImpl) doIfNotDone(c context.Context, key string, cb func() error) |
case err == mc.ErrCacheMiss: |
break |
default: |
- return errors.WrapTransient(err) |
+ return transient.Tag.Apply(err) |
} |
// Do it. |
@@ -591,7 +592,7 @@ func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) { |
Distinct(true) |
entities := []Job{} |
if err := ds.GetAll(c, q, &entities); err != nil { |
- return nil, errors.WrapTransient(err) |
+ return nil, transient.Tag.Apply(err) |
} |
// Filter out duplicates, sort. |
projects := stringset.New(len(entities)) |
@@ -616,7 +617,7 @@ func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job |
func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, error) { |
entities := []*Job{} |
if err := ds.GetAll(c, q, &entities); err != nil { |
- return nil, errors.WrapTransient(err) |
+ return nil, transient.Tag.Apply(err) |
} |
// Non-ancestor query used, need to recheck filters. |
filtered := make([]*Job, 0, len(entities)) |
@@ -636,7 +637,7 @@ func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) { |
case err == ds.ErrNoSuchEntity: |
return nil, nil |
default: |
- return nil, errors.WrapTransient(err) |
+ return nil, transient.Tag.Apply(err) |
} |
} |
@@ -680,7 +681,7 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i |
return ds.Stop |
}) |
if err != nil { |
- return nil, "", errors.WrapTransient(err) |
+ return nil, "", transient.Tag.Apply(err) |
} |
return out, newCursor, nil |
} |
@@ -696,7 +697,7 @@ func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64) |
case err == ds.ErrNoSuchEntity: |
return nil, nil |
default: |
- return nil, errors.WrapTransient(err) |
+ return nil, transient.Tag.Apply(err) |
} |
} |
@@ -704,7 +705,7 @@ func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([ |
q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce) |
entities := []*Invocation{} |
if err := ds.GetAll(c, q, &entities); err != nil { |
- return nil, errors.WrapTransient(err) |
+ return nil, transient.Tag.Apply(err) |
} |
return entities, nil |
} |
@@ -759,7 +760,7 @@ func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs |
if updateErrs.Get() == nil && disableErrs.Get() == nil { |
return nil |
} |
- return errors.WrapTransient(errors.NewMultiError(updateErrs.Get(), disableErrs.Get())) |
+ return transient.Tag.Apply(errors.NewMultiError(updateErrs.Get(), disableErrs.Get())) |
} |
func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { |
@@ -769,7 +770,7 @@ func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { |
q := ds.NewQuery("Job").Eq("Enabled", true) |
keys := []*ds.Key{} |
if err := ds.GetAll(c, q, &keys); err != nil { |
- return errors.WrapTransient(err) |
+ return transient.Tag.Apply(err) |
} |
wg := sync.WaitGroup{} |
errs := errors.NewLazyMultiError(len(keys)) |
@@ -781,7 +782,7 @@ func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error { |
}(i, key) |
} |
wg.Wait() |
- return errors.WrapTransient(errs.Get()) |
+ return transient.Tag.Apply(errs.Get()) |
} |
// getProjectJobs fetches from ds all enabled jobs belonging to a given |
@@ -792,7 +793,7 @@ func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st |
Eq("ProjectID", projectID) |
entities := []*Job{} |
if err := ds.GetAll(c, q, &entities); err != nil { |
- return nil, errors.WrapTransient(err) |
+ return nil, transient.Tag.Apply(err) |
} |
out := make(map[string]*Job, len(entities)) |
for _, job := range entities { |
@@ -838,7 +839,7 @@ func (e *engineImpl) txn(c context.Context, jobID string, txn txnCallback) error |
modified := stored |
err = txn(c, &modified, err == ds.ErrNoSuchEntity) |
if err != nil && err != errSkipPut { |
- fatal = !errors.IsTransient(err) |
+ fatal = !transient.Tag.In(err) |
return err |
} |
if err != errSkipPut && !modified.isEqual(&stored) { |
@@ -854,7 +855,7 @@ func (e *engineImpl) txn(c context.Context, jobID string, txn txnCallback) error |
// By now err is already transient (since 'fatal' is false) or it is commit |
// error (i.e. produced by RunInTransaction itself, not by its callback). |
// Need to wrap commit errors too. |
- return errors.WrapTransient(err) |
+ return transient.Tag.Apply(err) |
} |
if attempt > 1 { |
logging.Infof(c, "Committed on %d attempt", attempt) |
@@ -884,7 +885,7 @@ func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine) |
// Fatal errors (when we have them) should be reflected as a state changing |
// into "BROKEN" state. |
if err := cb(&sm); err != nil { |
- return errors.WrapTransient(err) |
+ return transient.Tag.Apply(err) |
} |
if len(sm.Actions) != 0 { |
if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err != nil { |
@@ -974,7 +975,7 @@ func (e *engineImpl) enqueueJobActions(c context.Context, jobID string, actions |
i++ |
} |
wg.Wait() |
- return errors.WrapTransient(errs.Get()) |
+ return transient.Tag.Apply(errs.Get()) |
} |
// enqueueInvTimers submits all timers emitted by an invocation manager by |
@@ -997,7 +998,7 @@ func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers |
Payload: payload, |
} |
} |
- return errors.WrapTransient(tq.Add(c, e.TimersQueueName, tasks...)) |
+ return transient.Tag.Apply(tq.Add(c, e.TimersQueueName, tasks...)) |
} |
func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, retryCount int) error { |
@@ -1290,7 +1291,7 @@ func (e *engineImpl) recordOverrun(c context.Context, jobID string, overruns int |
inv.debugLog(c, "New invocation should be starting now, but previous one is still running: %d", runningInvID) |
} |
inv.debugLog(c, "Total overruns thus far: %d", overruns) |
- return errors.WrapTransient(ds.Put(c, &inv)) |
+ return transient.Tag.Apply(ds.Put(c, &inv)) |
} |
// invocationTimerTick is called via Task Queue to handle AddTimer callbacks. |
@@ -1327,7 +1328,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID |
err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload) |
if err != nil { |
logging.Errorf(c, "Error when handling the timer - %s", err) |
- if !errors.IsTransient(err) && ctl.State().Status != task.StatusFailed { |
+ if !transient.Tag.In(err) && ctl.State().Status != task.StatusFailed { |
ctl.DebugLog("Fatal error when handling timer, aborting invocation - %s", err) |
ctl.State().Status = task.StatusFailed |
} |
@@ -1344,7 +1345,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID |
switch { |
case err == nil && saveErr == nil: |
return nil |
- case errors.IsTransient(saveErr): |
+ case transient.Tag.In(err): |
return saveErr |
default: |
return err // transient or fatal |
@@ -1494,15 +1495,14 @@ func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation |
} |
// Give up retrying on transient errors after some number of attempts. |
- if errors.IsTransient(err) && retryCount+1 >= invocationRetryLimit { |
+ if transient.Tag.In(err) && retryCount+1 >= invocationRetryLimit { |
err = fmt.Errorf("Too many retries, giving up (original error - %s)", err) |
} |
// If asked to retry the invocation (by returning a transient error), do not |
// touch Job entity when saving the current (failed) invocation. That way Job |
// stays in "QUEUED" state (indicating it's queued for a new invocation). |
- retryInvocation := errors.IsTransient(err) |
- if saveErr := ctl.saveImpl(c, !retryInvocation); saveErr != nil { |
+ if saveErr := ctl.saveImpl(c, !transient.Tag.In(err)); saveErr != nil { |
logging.Errorf(c, "Failed to save invocation state - %s", saveErr) |
if err == nil { |
err = saveErr |
@@ -1662,7 +1662,7 @@ func (e *engineImpl) PullPubSubOnDevServer(c context.Context, taskManagerName, p |
return nil |
} |
err = e.handlePubSubMessage(c, msg) |
- if err == nil || !errors.IsTransient(err) { |
+ if err == nil || !transient.Tag.In(err) { |
ack() // ack only on success of fatal errors (to stop redelivery) |
} |
return err |
@@ -1713,7 +1713,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe |
err = ctl.manager.HandleNotification(c, ctl, msg) |
if err != nil { |
logging.Errorf(c, "Error when handling the message - %s", err) |
- if !errors.IsTransient(err) && ctl.State().Status != task.StatusFailed { |
+ if !transient.Tag.In(err) && ctl.State().Status != task.StatusFailed { |
ctl.DebugLog("Fatal error when handling PubSub notification, aborting invocation - %s", err) |
ctl.State().Status = task.StatusFailed |
} |
@@ -1730,7 +1730,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe |
switch { |
case err == nil && saveErr == nil: |
return nil |
- case errors.IsTransient(saveErr): |
+ case transient.Tag.In(saveErr): |
return saveErr |
default: |
return err // transient or fatal |
@@ -1831,7 +1831,7 @@ func (ctl *taskController) Save(ctx context.Context) error { |
// errUpdateConflict means Invocation is being modified by two TaskController's |
// concurrently. It should not be happening often. If it happens, task queue |
// call is retried to rerun the two-part transaction from scratch. |
-var errUpdateConflict = errors.WrapTransient(errors.New("concurrent modifications of single Invocation")) |
+var errUpdateConflict = errors.New("concurrent modifications of single Invocation", transient.Tag) |
// saveImpl uploads updated Invocation to the datastore. If updateJob is true, |
// it will also roll corresponding state machine forward. |
@@ -1887,7 +1887,7 @@ func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er |
logging.Errorf(c, "Invocation is suddenly gone") |
return errors.New("invocation is suddenly gone") |
case err != nil: |
- return errors.WrapTransient(err) |
+ return transient.Tag.Apply(err) |
} |
// Make sure no one touched it while we were handling the invocation. |