Index: go/src/infra/gae/libs/gae/memory/taskqueue.go |
diff --git a/go/src/infra/gae/libs/gae/memory/taskqueue.go b/go/src/infra/gae/libs/gae/memory/taskqueue.go |
index 1a037ce6cb4926479ca7cb81d4d506df2b9d5117..e6e11352b3c0449a770e73c6cfe2b109938de1f5 100644 |
--- a/go/src/infra/gae/libs/gae/memory/taskqueue.go |
+++ b/go/src/infra/gae/libs/gae/memory/taskqueue.go |
@@ -6,7 +6,6 @@ package memory |
import ( |
"errors" |
- "fmt" |
"net/http" |
"regexp" |
@@ -21,31 +20,20 @@ import ( |
func useTQ(c context.Context) context.Context { |
return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { |
tqd := cur(ic).Get(memContextTQIdx) |
- var ret interface { |
- gae.TQTestable |
- gae.TaskQueue |
- } |
- switch x := tqd.(type) { |
- case *taskQueueData: |
- ret = &taskqueueImpl{ |
- dummy.TQ(), |
- x, |
- ic, |
- curGID(ic).namespace, |
- } |
- |
- case *txnTaskQueueData: |
- ret = &taskqueueTxnImpl{ |
+ if x, ok := tqd.(*taskQueueData); ok { |
+ return &taskqueueImpl{ |
dummy.TQ(), |
x, |
ic, |
curGID(ic).namespace, |
} |
- |
- default: |
- panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
} |
- return ret |
+ return &taskqueueTxnImpl{ |
+ dummy.TQ(), |
+ tqd.(*txnTaskQueueData), |
+ ic, |
+ curGID(ic).namespace, |
+ } |
}) |
} |
@@ -178,22 +166,24 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T |
return toRet, nil |
} |
-func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
- if err := t.isBroken(); err != nil { |
- return nil, err |
- } |
- t.Lock() |
- defer t.Unlock() |
- return t.addLocked(task, queueName) |
+func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) { |
+ err = t.run(func() (err error) { |
+ t.Lock() |
+ defer t.Unlock() |
+ retTask, err = t.addLocked(task, queueName) |
+ return |
+ }) |
+ return |
} |
-func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) { |
- if err := t.isBroken(); err != nil { |
- return nil, err |
- } |
- t.Lock() |
- defer t.Unlock() |
- return multi(tasks, queueName, t.addLocked) |
+func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) { |
+ err = t.run(func() (err error) { |
+ t.Lock() |
+ defer t.Unlock() |
+ retTasks, err = multi(tasks, queueName, t.addLocked) |
+ return |
+ }) |
+ return |
} |
////////////////////////////// private functions /////////////////////////////// |
@@ -217,20 +207,13 @@ func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string |
func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) { |
ret := []*gae.TQTask(nil) |
- me := gae.MultiError(nil) |
- foundErr := false |
- for _, task := range tasks { |
+ lme := gae.LazyMultiError{Size: len(tasks)} |
+ for i, task := range tasks { |
rt, err := f(task, queueName) |
ret = append(ret, rt) |
- me = append(me, err) |
- if err != nil { |
- foundErr = true |
- } |
- } |
- if !foundErr { |
- me = nil |
+ lme.Assign(i, err) |
} |
- return ret, me |
+ return ret, lme.Get() |
} |
func dupTask(t *gae.TQTask) *gae.TQTask { |