Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3062)

Unified Diff: scheduler/appengine/engine/engine.go

Issue 2951393002: [errors] de-specialize Transient in favor of Tags. (Closed)
Patch Set: more refactor Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: scheduler/appengine/engine/engine.go
diff --git a/scheduler/appengine/engine/engine.go b/scheduler/appengine/engine/engine.go
index 59488dc1bc5ef02b1dbc941b654695874ceed098..8a705fd4170935c4842e6c2b632bd6b66c3292b2 100644
--- a/scheduler/appengine/engine/engine.go
+++ b/scheduler/appengine/engine/engine.go
@@ -32,6 +32,7 @@ import (
"github.com/luci/luci-go/common/data/stringset"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry/transient"
"github.com/luci/luci-go/server/auth"
"github.com/luci/luci-go/server/auth/identity"
"github.com/luci/luci-go/server/auth/signing"
@@ -561,7 +562,7 @@ func (e *engineImpl) doIfNotDone(c context.Context, key string, cb func() error)
case err == mc.ErrCacheMiss:
break
default:
- return errors.WrapTransient(err)
+ return transient.Tag.Apply(err)
}
// Do it.
@@ -591,7 +592,7 @@ func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) {
Distinct(true)
entities := []Job{}
if err := ds.GetAll(c, q, &entities); err != nil {
- return nil, errors.WrapTransient(err)
+ return nil, transient.Tag.Apply(err)
}
// Filter out duplicates, sort.
projects := stringset.New(len(entities))
@@ -616,7 +617,7 @@ func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, error) {
entities := []*Job{}
if err := ds.GetAll(c, q, &entities); err != nil {
- return nil, errors.WrapTransient(err)
+ return nil, transient.Tag.Apply(err)
}
// Non-ancestor query used, need to recheck filters.
filtered := make([]*Job, 0, len(entities))
@@ -636,7 +637,7 @@ func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
case err == ds.ErrNoSuchEntity:
return nil, nil
default:
- return nil, errors.WrapTransient(err)
+ return nil, transient.Tag.Apply(err)
}
}
@@ -680,7 +681,7 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
return ds.Stop
})
if err != nil {
- return nil, "", errors.WrapTransient(err)
+ return nil, "", transient.Tag.Apply(err)
}
return out, newCursor, nil
}
@@ -696,7 +697,7 @@ func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
case err == ds.ErrNoSuchEntity:
return nil, nil
default:
- return nil, errors.WrapTransient(err)
+ return nil, transient.Tag.Apply(err)
}
}
@@ -704,7 +705,7 @@ func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
entities := []*Invocation{}
if err := ds.GetAll(c, q, &entities); err != nil {
- return nil, errors.WrapTransient(err)
+ return nil, transient.Tag.Apply(err)
}
return entities, nil
}
@@ -759,7 +760,7 @@ func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
if updateErrs.Get() == nil && disableErrs.Get() == nil {
return nil
}
- return errors.WrapTransient(errors.NewMultiError(updateErrs.Get(), disableErrs.Get()))
+ return transient.Tag.Apply(errors.NewMultiError(updateErrs.Get(), disableErrs.Get()))
}
func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
@@ -769,7 +770,7 @@ func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
q := ds.NewQuery("Job").Eq("Enabled", true)
keys := []*ds.Key{}
if err := ds.GetAll(c, q, &keys); err != nil {
- return errors.WrapTransient(err)
+ return transient.Tag.Apply(err)
}
wg := sync.WaitGroup{}
errs := errors.NewLazyMultiError(len(keys))
@@ -781,7 +782,7 @@ func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
}(i, key)
}
wg.Wait()
- return errors.WrapTransient(errs.Get())
+ return transient.Tag.Apply(errs.Get())
}
// getProjectJobs fetches from ds all enabled jobs belonging to a given
@@ -792,7 +793,7 @@ func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st
Eq("ProjectID", projectID)
entities := []*Job{}
if err := ds.GetAll(c, q, &entities); err != nil {
- return nil, errors.WrapTransient(err)
+ return nil, transient.Tag.Apply(err)
}
out := make(map[string]*Job, len(entities))
for _, job := range entities {
@@ -838,7 +839,7 @@ func (e *engineImpl) txn(c context.Context, jobID string, txn txnCallback) error
modified := stored
err = txn(c, &modified, err == ds.ErrNoSuchEntity)
if err != nil && err != errSkipPut {
- fatal = !errors.IsTransient(err)
+ fatal = !transient.Tag.In(err)
return err
}
if err != errSkipPut && !modified.isEqual(&stored) {
@@ -854,7 +855,7 @@ func (e *engineImpl) txn(c context.Context, jobID string, txn txnCallback) error
// By now err is already transient (since 'fatal' is false) or it is commit
// error (i.e. produced by RunInTransaction itself, not by its callback).
// Need to wrap commit errors too.
- return errors.WrapTransient(err)
+ return transient.Tag.Apply(err)
}
if attempt > 1 {
logging.Infof(c, "Committed on %d attempt", attempt)
@@ -884,7 +885,7 @@ func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine)
// Fatal errors (when we have them) should be reflected as a state changing
// into "BROKEN" state.
if err := cb(&sm); err != nil {
- return errors.WrapTransient(err)
+ return transient.Tag.Apply(err)
}
if len(sm.Actions) != 0 {
if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err != nil {
@@ -974,7 +975,7 @@ func (e *engineImpl) enqueueJobActions(c context.Context, jobID string, actions
i++
}
wg.Wait()
- return errors.WrapTransient(errs.Get())
+ return transient.Tag.Apply(errs.Get())
}
// enqueueInvTimers submits all timers emitted by an invocation manager by
@@ -997,7 +998,7 @@ func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers
Payload: payload,
}
}
- return errors.WrapTransient(tq.Add(c, e.TimersQueueName, tasks...))
+ return transient.Tag.Apply(tq.Add(c, e.TimersQueueName, tasks...))
}
func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, retryCount int) error {
@@ -1290,7 +1291,7 @@ func (e *engineImpl) recordOverrun(c context.Context, jobID string, overruns int
inv.debugLog(c, "New invocation should be starting now, but previous one is still running: %d", runningInvID)
}
inv.debugLog(c, "Total overruns thus far: %d", overruns)
- return errors.WrapTransient(ds.Put(c, &inv))
+ return transient.Tag.Apply(ds.Put(c, &inv))
}
// invocationTimerTick is called via Task Queue to handle AddTimer callbacks.
@@ -1327,7 +1328,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload)
if err != nil {
logging.Errorf(c, "Error when handling the timer - %s", err)
- if !errors.IsTransient(err) && ctl.State().Status != task.StatusFailed {
+ if !transient.Tag.In(err) && ctl.State().Status != task.StatusFailed {
ctl.DebugLog("Fatal error when handling timer, aborting invocation - %s", err)
ctl.State().Status = task.StatusFailed
}
@@ -1344,7 +1345,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
switch {
case err == nil && saveErr == nil:
return nil
- case errors.IsTransient(saveErr):
+ case transient.Tag.In(err):
return saveErr
default:
return err // transient or fatal
@@ -1494,15 +1495,14 @@ func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation
}
// Give up retrying on transient errors after some number of attempts.
- if errors.IsTransient(err) && retryCount+1 >= invocationRetryLimit {
+ if transient.Tag.In(err) && retryCount+1 >= invocationRetryLimit {
err = fmt.Errorf("Too many retries, giving up (original error - %s)", err)
}
// If asked to retry the invocation (by returning a transient error), do not
// touch Job entity when saving the current (failed) invocation. That way Job
// stays in "QUEUED" state (indicating it's queued for a new invocation).
- retryInvocation := errors.IsTransient(err)
- if saveErr := ctl.saveImpl(c, !retryInvocation); saveErr != nil {
+ if saveErr := ctl.saveImpl(c, !transient.Tag.In(err)); saveErr != nil {
logging.Errorf(c, "Failed to save invocation state - %s", saveErr)
if err == nil {
err = saveErr
@@ -1662,7 +1662,7 @@ func (e *engineImpl) PullPubSubOnDevServer(c context.Context, taskManagerName, p
return nil
}
err = e.handlePubSubMessage(c, msg)
- if err == nil || !errors.IsTransient(err) {
+ if err == nil || !transient.Tag.In(err) {
ack() // ack only on success of fatal errors (to stop redelivery)
}
return err
@@ -1713,7 +1713,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
err = ctl.manager.HandleNotification(c, ctl, msg)
if err != nil {
logging.Errorf(c, "Error when handling the message - %s", err)
- if !errors.IsTransient(err) && ctl.State().Status != task.StatusFailed {
+ if !transient.Tag.In(err) && ctl.State().Status != task.StatusFailed {
ctl.DebugLog("Fatal error when handling PubSub notification, aborting invocation - %s", err)
ctl.State().Status = task.StatusFailed
}
@@ -1730,7 +1730,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
switch {
case err == nil && saveErr == nil:
return nil
- case errors.IsTransient(saveErr):
+ case transient.Tag.In(saveErr):
return saveErr
default:
return err // transient or fatal
@@ -1831,7 +1831,7 @@ func (ctl *taskController) Save(ctx context.Context) error {
// errUpdateConflict means Invocation is being modified by two TaskController's
// concurrently. It should not be happening often. If it happens, task queue
// call is retried to rerun the two-part transaction from scratch.
-var errUpdateConflict = errors.WrapTransient(errors.New("concurrent modifications of single Invocation"))
+var errUpdateConflict = errors.New("concurrent modifications of single Invocation", transient.Tag)
// saveImpl uploads updated Invocation to the datastore. If updateJob is true,
// it will also roll corresponding state machine forward.
@@ -1887,7 +1887,7 @@ func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er
logging.Errorf(c, "Invocation is suddenly gone")
return errors.New("invocation is suddenly gone")
case err != nil:
- return errors.WrapTransient(err)
+ return transient.Tag.Apply(err)
}
// Make sure no one touched it while we were handling the invocation.

Powered by Google App Engine
This is Rietveld 408576698