| Index: scheduler/appengine/engine/engine.go
|
| diff --git a/scheduler/appengine/engine/engine.go b/scheduler/appengine/engine/engine.go
|
| index 59488dc1bc5ef02b1dbc941b654695874ceed098..8a705fd4170935c4842e6c2b632bd6b66c3292b2 100644
|
| --- a/scheduler/appengine/engine/engine.go
|
| +++ b/scheduler/appengine/engine/engine.go
|
| @@ -32,6 +32,7 @@ import (
|
| "github.com/luci/luci-go/common/data/stringset"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/common/retry/transient"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/identity"
|
| "github.com/luci/luci-go/server/auth/signing"
|
| @@ -561,7 +562,7 @@ func (e *engineImpl) doIfNotDone(c context.Context, key string, cb func() error)
|
| case err == mc.ErrCacheMiss:
|
| break
|
| default:
|
| - return errors.WrapTransient(err)
|
| + return transient.Tag.Apply(err)
|
| }
|
|
|
| // Do it.
|
| @@ -591,7 +592,7 @@ func (e *engineImpl) GetAllProjects(c context.Context) ([]string, error) {
|
| Distinct(true)
|
| entities := []Job{}
|
| if err := ds.GetAll(c, q, &entities); err != nil {
|
| - return nil, errors.WrapTransient(err)
|
| + return nil, transient.Tag.Apply(err)
|
| }
|
| // Filter out duplicates, sort.
|
| projects := stringset.New(len(entities))
|
| @@ -616,7 +617,7 @@ func (e *engineImpl) GetProjectJobs(c context.Context, projectID string) ([]*Job
|
| func (e *engineImpl) queryEnabledJobs(c context.Context, q *ds.Query) ([]*Job, error) {
|
| entities := []*Job{}
|
| if err := ds.GetAll(c, q, &entities); err != nil {
|
| - return nil, errors.WrapTransient(err)
|
| + return nil, transient.Tag.Apply(err)
|
| }
|
| // Non-ancestor query used, need to recheck filters.
|
| filtered := make([]*Job, 0, len(entities))
|
| @@ -636,7 +637,7 @@ func (e *engineImpl) GetJob(c context.Context, jobID string) (*Job, error) {
|
| case err == ds.ErrNoSuchEntity:
|
| return nil, nil
|
| default:
|
| - return nil, errors.WrapTransient(err)
|
| + return nil, transient.Tag.Apply(err)
|
| }
|
| }
|
|
|
| @@ -680,7 +681,7 @@ func (e *engineImpl) ListInvocations(c context.Context, jobID string, pageSize i
|
| return ds.Stop
|
| })
|
| if err != nil {
|
| - return nil, "", errors.WrapTransient(err)
|
| + return nil, "", transient.Tag.Apply(err)
|
| }
|
| return out, newCursor, nil
|
| }
|
| @@ -696,7 +697,7 @@ func (e *engineImpl) GetInvocation(c context.Context, jobID string, invID int64)
|
| case err == ds.ErrNoSuchEntity:
|
| return nil, nil
|
| default:
|
| - return nil, errors.WrapTransient(err)
|
| + return nil, transient.Tag.Apply(err)
|
| }
|
| }
|
|
|
| @@ -704,7 +705,7 @@ func (e *engineImpl) GetInvocationsByNonce(c context.Context, invNonce int64) ([
|
| q := ds.NewQuery("Invocation").Eq("InvocationNonce", invNonce)
|
| entities := []*Invocation{}
|
| if err := ds.GetAll(c, q, &entities); err != nil {
|
| - return nil, errors.WrapTransient(err)
|
| + return nil, transient.Tag.Apply(err)
|
| }
|
| return entities, nil
|
| }
|
| @@ -759,7 +760,7 @@ func (e *engineImpl) UpdateProjectJobs(c context.Context, projectID string, defs
|
| if updateErrs.Get() == nil && disableErrs.Get() == nil {
|
| return nil
|
| }
|
| - return errors.WrapTransient(errors.NewMultiError(updateErrs.Get(), disableErrs.Get()))
|
| + return transient.Tag.Apply(errors.NewMultiError(updateErrs.Get(), disableErrs.Get()))
|
| }
|
|
|
| func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
|
| @@ -769,7 +770,7 @@ func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
|
| q := ds.NewQuery("Job").Eq("Enabled", true)
|
| keys := []*ds.Key{}
|
| if err := ds.GetAll(c, q, &keys); err != nil {
|
| - return errors.WrapTransient(err)
|
| + return transient.Tag.Apply(err)
|
| }
|
| wg := sync.WaitGroup{}
|
| errs := errors.NewLazyMultiError(len(keys))
|
| @@ -781,7 +782,7 @@ func (e *engineImpl) ResetAllJobsOnDevServer(c context.Context) error {
|
| }(i, key)
|
| }
|
| wg.Wait()
|
| - return errors.WrapTransient(errs.Get())
|
| + return transient.Tag.Apply(errs.Get())
|
| }
|
|
|
| // getProjectJobs fetches from ds all enabled jobs belonging to a given
|
| @@ -792,7 +793,7 @@ func (e *engineImpl) getProjectJobs(c context.Context, projectID string) (map[st
|
| Eq("ProjectID", projectID)
|
| entities := []*Job{}
|
| if err := ds.GetAll(c, q, &entities); err != nil {
|
| - return nil, errors.WrapTransient(err)
|
| + return nil, transient.Tag.Apply(err)
|
| }
|
| out := make(map[string]*Job, len(entities))
|
| for _, job := range entities {
|
| @@ -838,7 +839,7 @@ func (e *engineImpl) txn(c context.Context, jobID string, txn txnCallback) error
|
| modified := stored
|
| err = txn(c, &modified, err == ds.ErrNoSuchEntity)
|
| if err != nil && err != errSkipPut {
|
| - fatal = !errors.IsTransient(err)
|
| + fatal = !transient.Tag.In(err)
|
| return err
|
| }
|
| if err != errSkipPut && !modified.isEqual(&stored) {
|
| @@ -854,7 +855,7 @@ func (e *engineImpl) txn(c context.Context, jobID string, txn txnCallback) error
|
| // By now err is already transient (since 'fatal' is false) or it is commit
|
| // error (i.e. produced by RunInTransaction itself, not by its callback).
|
| // Need to wrap commit errors too.
|
| - return errors.WrapTransient(err)
|
| + return transient.Tag.Apply(err)
|
| }
|
| if attempt > 1 {
|
| logging.Infof(c, "Committed on %d attempt", attempt)
|
| @@ -884,7 +885,7 @@ func (e *engineImpl) rollSM(c context.Context, job *Job, cb func(*StateMachine)
|
| // Fatal errors (when we have them) should be reflected as a state changing
|
| // into "BROKEN" state.
|
| if err := cb(&sm); err != nil {
|
| - return errors.WrapTransient(err)
|
| + return transient.Tag.Apply(err)
|
| }
|
| if len(sm.Actions) != 0 {
|
| if err := e.enqueueJobActions(c, job.JobID, sm.Actions); err != nil {
|
| @@ -974,7 +975,7 @@ func (e *engineImpl) enqueueJobActions(c context.Context, jobID string, actions
|
| i++
|
| }
|
| wg.Wait()
|
| - return errors.WrapTransient(errs.Get())
|
| + return transient.Tag.Apply(errs.Get())
|
| }
|
|
|
| // enqueueInvTimers submits all timers emitted by an invocation manager by
|
| @@ -997,7 +998,7 @@ func (e *engineImpl) enqueueInvTimers(c context.Context, inv *Invocation, timers
|
| Payload: payload,
|
| }
|
| }
|
| - return errors.WrapTransient(tq.Add(c, e.TimersQueueName, tasks...))
|
| + return transient.Tag.Apply(tq.Add(c, e.TimersQueueName, tasks...))
|
| }
|
|
|
| func (e *engineImpl) ExecuteSerializedAction(c context.Context, action []byte, retryCount int) error {
|
| @@ -1290,7 +1291,7 @@ func (e *engineImpl) recordOverrun(c context.Context, jobID string, overruns int
|
| inv.debugLog(c, "New invocation should be starting now, but previous one is still running: %d", runningInvID)
|
| }
|
| inv.debugLog(c, "Total overruns thus far: %d", overruns)
|
| - return errors.WrapTransient(ds.Put(c, &inv))
|
| + return transient.Tag.Apply(ds.Put(c, &inv))
|
| }
|
|
|
| // invocationTimerTick is called via Task Queue to handle AddTimer callbacks.
|
| @@ -1327,7 +1328,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
|
| err = ctl.manager.HandleTimer(c, ctl, timer.Name, timer.Payload)
|
| if err != nil {
|
| logging.Errorf(c, "Error when handling the timer - %s", err)
|
| - if !errors.IsTransient(err) && ctl.State().Status != task.StatusFailed {
|
| + if !transient.Tag.In(err) && ctl.State().Status != task.StatusFailed {
|
| ctl.DebugLog("Fatal error when handling timer, aborting invocation - %s", err)
|
| ctl.State().Status = task.StatusFailed
|
| }
|
| @@ -1344,7 +1345,7 @@ func (e *engineImpl) invocationTimerTick(c context.Context, jobID string, invID
|
| switch {
|
| case err == nil && saveErr == nil:
|
| return nil
|
| - case errors.IsTransient(saveErr):
|
| + case transient.Tag.In(err):
|
| return saveErr
|
| default:
|
| return err // transient or fatal
|
| @@ -1494,15 +1495,14 @@ func (e *engineImpl) startInvocation(c context.Context, jobID string, invocation
|
| }
|
|
|
| // Give up retrying on transient errors after some number of attempts.
|
| - if errors.IsTransient(err) && retryCount+1 >= invocationRetryLimit {
|
| + if transient.Tag.In(err) && retryCount+1 >= invocationRetryLimit {
|
| err = fmt.Errorf("Too many retries, giving up (original error - %s)", err)
|
| }
|
|
|
| // If asked to retry the invocation (by returning a transient error), do not
|
| // touch Job entity when saving the current (failed) invocation. That way Job
|
| // stays in "QUEUED" state (indicating it's queued for a new invocation).
|
| - retryInvocation := errors.IsTransient(err)
|
| - if saveErr := ctl.saveImpl(c, !retryInvocation); saveErr != nil {
|
| + if saveErr := ctl.saveImpl(c, !transient.Tag.In(err)); saveErr != nil {
|
| logging.Errorf(c, "Failed to save invocation state - %s", saveErr)
|
| if err == nil {
|
| err = saveErr
|
| @@ -1662,7 +1662,7 @@ func (e *engineImpl) PullPubSubOnDevServer(c context.Context, taskManagerName, p
|
| return nil
|
| }
|
| err = e.handlePubSubMessage(c, msg)
|
| - if err == nil || !errors.IsTransient(err) {
|
| + if err == nil || !transient.Tag.In(err) {
|
| ack() // ack only on success of fatal errors (to stop redelivery)
|
| }
|
| return err
|
| @@ -1713,7 +1713,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
|
| err = ctl.manager.HandleNotification(c, ctl, msg)
|
| if err != nil {
|
| logging.Errorf(c, "Error when handling the message - %s", err)
|
| - if !errors.IsTransient(err) && ctl.State().Status != task.StatusFailed {
|
| + if !transient.Tag.In(err) && ctl.State().Status != task.StatusFailed {
|
| ctl.DebugLog("Fatal error when handling PubSub notification, aborting invocation - %s", err)
|
| ctl.State().Status = task.StatusFailed
|
| }
|
| @@ -1730,7 +1730,7 @@ func (e *engineImpl) handlePubSubMessage(c context.Context, msg *pubsub.PubsubMe
|
| switch {
|
| case err == nil && saveErr == nil:
|
| return nil
|
| - case errors.IsTransient(saveErr):
|
| + case transient.Tag.In(saveErr):
|
| return saveErr
|
| default:
|
| return err // transient or fatal
|
| @@ -1831,7 +1831,7 @@ func (ctl *taskController) Save(ctx context.Context) error {
|
| // errUpdateConflict means Invocation is being modified by two TaskController's
|
| // concurrently. It should not be happening often. If it happens, task queue
|
| // call is retried to rerun the two-part transaction from scratch.
|
| -var errUpdateConflict = errors.WrapTransient(errors.New("concurrent modifications of single Invocation"))
|
| +var errUpdateConflict = errors.New("concurrent modifications of single Invocation", transient.Tag)
|
|
|
| // saveImpl uploads updated Invocation to the datastore. If updateJob is true,
|
| // it will also roll corresponding state machine forward.
|
| @@ -1887,7 +1887,7 @@ func (ctl *taskController) saveImpl(ctx context.Context, updateJob bool) (err er
|
| logging.Errorf(c, "Invocation is suddenly gone")
|
| return errors.New("invocation is suddenly gone")
|
| case err != nil:
|
| - return errors.WrapTransient(err)
|
| + return transient.Tag.Apply(err)
|
| }
|
|
|
| // Make sure no one touched it while we were handling the invocation.
|
|
|