| Index: go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| diff --git a/go/src/infra/gae/libs/gae/memory/taskqueue.go b/go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| index 1a037ce6cb4926479ca7cb81d4d506df2b9d5117..e6e11352b3c0449a770e73c6cfe2b109938de1f5 100644
|
| --- a/go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| +++ b/go/src/infra/gae/libs/gae/memory/taskqueue.go
|
| @@ -6,7 +6,6 @@ package memory
|
|
|
| import (
|
| "errors"
|
| - "fmt"
|
| "net/http"
|
| "regexp"
|
|
|
| @@ -21,31 +20,20 @@ import (
|
| func useTQ(c context.Context) context.Context {
|
| return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue {
|
| tqd := cur(ic).Get(memContextTQIdx)
|
| - var ret interface {
|
| - gae.TQTestable
|
| - gae.TaskQueue
|
| - }
|
| - switch x := tqd.(type) {
|
| - case *taskQueueData:
|
| - ret = &taskqueueImpl{
|
| - dummy.TQ(),
|
| - x,
|
| - ic,
|
| - curGID(ic).namespace,
|
| - }
|
| -
|
| - case *txnTaskQueueData:
|
| - ret = &taskqueueTxnImpl{
|
| + if x, ok := tqd.(*taskQueueData); ok {
|
| + return &taskqueueImpl{
|
| dummy.TQ(),
|
| x,
|
| ic,
|
| curGID(ic).namespace,
|
| }
|
| -
|
| - default:
|
| - panic(fmt.Errorf("TQ: bad type: %v", tqd))
|
| }
|
| - return ret
|
| + return &taskqueueTxnImpl{
|
| + dummy.TQ(),
|
| + tqd.(*txnTaskQueueData),
|
| + ic,
|
| + curGID(ic).namespace,
|
| + }
|
| })
|
| }
|
|
|
| @@ -178,22 +166,24 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
|
| return toRet, nil
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| - if err := t.isBroken(); err != nil {
|
| - return nil, err
|
| - }
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - return t.addLocked(task, queueName)
|
| +func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
|
| + err = t.run(func() (err error) {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + retTask, err = t.addLocked(task, queueName)
|
| + return
|
| + })
|
| + return
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) {
|
| - if err := t.isBroken(); err != nil {
|
| - return nil, err
|
| - }
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - return multi(tasks, queueName, t.addLocked)
|
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
|
| + err = t.run(func() (err error) {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + retTasks, err = multi(tasks, queueName, t.addLocked)
|
| + return
|
| + })
|
| + return
|
| }
|
|
|
| ////////////////////////////// private functions ///////////////////////////////
|
| @@ -217,20 +207,13 @@ func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string
|
|
|
| func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) {
|
| ret := []*gae.TQTask(nil)
|
| - me := gae.MultiError(nil)
|
| - foundErr := false
|
| - for _, task := range tasks {
|
| + lme := gae.LazyMultiError{Size: len(tasks)}
|
| + for i, task := range tasks {
|
| rt, err := f(task, queueName)
|
| ret = append(ret, rt)
|
| - me = append(me, err)
|
| - if err != nil {
|
| - foundErr = true
|
| - }
|
| - }
|
| - if !foundErr {
|
| - me = nil
|
| + lme.Assign(i, err)
|
| }
|
| - return ret, me
|
| + return ret, lme.Get()
|
| }
|
|
|
| func dupTask(t *gae.TQTask) *gae.TQTask {
|
|
|