| Index: go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| diff --git a/go/src/infra/gae/libs/gae/memory/taskqueue.go b/go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| index 98225ebea82f0f9b47430ebfe42eb266acbfdca6..1a037ce6cb4926479ca7cb81d4d506df2b9d5117 100644
|
| --- a/go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| +++ b/go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| @@ -82,14 +82,10 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
|
| return dupTask(toSched), nil
|
| }
|
|
|
| -func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
|
| - err = t.RunIfNotBroken(func() (err error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - retTask, err = t.addLocked(task, queueName)
|
| - return
|
| - })
|
| - return
|
| +func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + return t.addLocked(task, queueName)
|
| }
|
|
|
| func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
|
| @@ -113,34 +109,26 @@ func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
|
| }
|
|
|
| func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error {
|
| - return t.RunIfNotBroken(func() error {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - return t.deleteLocked(task, queueName)
|
| - })
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + return t.deleteLocked(task, queueName)
|
| }
|
|
|
| -func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
|
| - err = t.RunIfNotBroken(func() (err error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - retTasks, err = multi(tasks, queueName, t.addLocked)
|
| - return
|
| - })
|
| - return
|
| +func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + return multi(tasks, queueName, t.addLocked)
|
| }
|
|
|
| func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error {
|
| - return t.RunIfNotBroken(func() error {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| -
|
| - _, err := multi(tasks, queueName,
|
| - func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) {
|
| - return nil, t.deleteLocked(tsk, qn)
|
| - })
|
| - return err
|
| - })
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + _, err := multi(tasks, queueName,
|
| + func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) {
|
| + return nil, t.deleteLocked(tsk, qn)
|
| + })
|
| + return err
|
| }
|
|
|
| /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
|
| @@ -153,10 +141,10 @@ type taskqueueTxnImpl struct {
|
| ns string
|
| }
|
|
|
| -var (
|
| - _ = gae.TaskQueue((*taskqueueTxnImpl)(nil))
|
| - _ = gae.TQTestable((*taskqueueTxnImpl)(nil))
|
| -)
|
| +var _ interface {
|
| + gae.TaskQueue
|
| + gae.TQTestable
|
| +} = (*taskqueueTxnImpl)(nil)
|
|
|
| func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
|
| @@ -190,24 +178,22 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
|
| return toRet, nil
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
|
| - err = t.RunIfNotBroken(func() (err error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - retTask, err = t.addLocked(task, queueName)
|
| - return
|
| - })
|
| - return
|
| +func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| + if err := t.isBroken(); err != nil {
|
| + return nil, err
|
| + }
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + return t.addLocked(task, queueName)
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
|
| - err = t.RunIfNotBroken(func() (err error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - retTasks, err = multi(tasks, queueName, t.addLocked)
|
| - return
|
| - })
|
| - return
|
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) {
|
| + if err := t.isBroken(); err != nil {
|
| + return nil, err
|
| + }
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + return multi(tasks, queueName, t.addLocked)
|
| }
|
|
|
| ////////////////////////////// private functions ///////////////////////////////
|
|
|