Index: impl/prod/taskqueue.go |
diff --git a/impl/prod/taskqueue.go b/impl/prod/taskqueue.go |
index 815db3a734145680ba995038c483442aa57f604a..ce0f443dfb528cef53fa566de815775f899f5b65 100644 |
--- a/impl/prod/taskqueue.go |
+++ b/impl/prod/taskqueue.go |
@@ -9,15 +9,15 @@ import ( |
"reflect" |
tq "github.com/luci/gae/service/taskqueue" |
- "github.com/luci/luci-go/common/errors" |
"golang.org/x/net/context" |
+ "google.golang.org/appengine" |
"google.golang.org/appengine/taskqueue" |
) |
// useTQ adds a gae.TaskQueue implementation to context, accessible |
// by gae.GetTQ(c) |
func useTQ(c context.Context) context.Context { |
- return tq.SetFactory(c, func(ci context.Context) tq.Interface { |
+ return tq.SetRawFactory(c, func(ci context.Context) tq.RawInterface { |
return tqImpl{ci} |
}) |
} |
@@ -43,11 +43,10 @@ func init() { |
} |
} |
-// tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a |
-// *tq.Task, and passes through an error. |
-func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) { |
- if err != nil { |
- return nil, err |
+// tqR2F (TQ real-to-fake) converts a *taskqueue.Task to a *tq.Task. |
+func tqR2F(o *taskqueue.Task) *tq.Task { |
+ if o == nil { |
+ return nil |
} |
n := tq.Task{} |
n.Path = o.Path |
@@ -59,7 +58,7 @@ func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) { |
n.ETA = o.ETA |
n.RetryCount = o.RetryCount |
n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions) |
- return &n, nil |
+ return &n |
} |
// tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task. |
@@ -77,19 +76,6 @@ func tqF2R(n *tq.Task) *taskqueue.Task { |
return &o |
} |
-// tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of |
-// *taskqueue.Task to a slice of *tq.Task |
-func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) { |
- if err != nil { |
- return nil, errors.Fix(err) |
- } |
- ret := make([]*tq.Task, len(os)) |
- for i, t := range os { |
- ret[i], _ = tqR2FErr(t, nil) |
- } |
- return ret, nil |
-} |
- |
// tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. |
func tqMF2R(ns []*tq.Task) []*taskqueue.Task { |
ret := make([]*taskqueue.Task, len(ns)) |
@@ -99,52 +85,49 @@ func tqMF2R(ns []*tq.Task) []*taskqueue.Task { |
return ret |
} |
-//////// TQSingleReadWriter |
- |
-func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
- return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) |
-} |
-func (t tqImpl) Delete(task *tq.Task, queueName string) error { |
- return taskqueue.Delete(t.Context, tqF2R(task), queueName) |
-} |
- |
-//////// TQMultiReadWriter |
- |
-func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) { |
- return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)) |
-} |
-func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
- return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName)) |
+func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error { |
+ realTasks, err := taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName) |
+ if err != nil { |
+ if me, ok := err.(appengine.MultiError); ok { |
+ for i, err := range me { |
+ tsk := (*taskqueue.Task)(nil) |
+ if realTasks != nil { |
+ tsk = realTasks[i] |
+ } |
+ cb(tqR2F(tsk), err) |
+ } |
+ err = nil |
+ } |
+ } else { |
+ for _, tsk := range realTasks { |
+ cb(tqR2F(tsk), nil) |
+ } |
+ } |
+ return err |
} |
-//////// TQLeaser |
- |
-func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task, error) { |
- return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTime)) |
-} |
-func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag string) ([]*tq.Task, error) { |
- return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, leaseTime, tag)) |
-} |
-func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) error { |
- return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTime) |
+func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error { |
+ err := taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName) |
+ if me, ok := err.(appengine.MultiError); ok { |
+ for _, err := range me { |
+ cb(err) |
+ } |
+ err = nil |
+ } |
+ return err |
} |
-//////// TQPurger |
- |
func (t tqImpl) Purge(queueName string) error { |
return taskqueue.Purge(t.Context, queueName) |
} |
-//////// TQStatter |
- |
-func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) { |
+func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { |
stats, err := taskqueue.QueueStats(t.Context, queueNames) |
if err != nil { |
- return nil, err |
+ return err |
} |
- ret := make([]tq.Statistics, len(stats)) |
- for i, s := range stats { |
- ret[i] = tq.Statistics(s) |
+ for _, s := range stats { |
+ cb((*tq.Statistics)(&s), nil) |
} |
- return ret, nil |
+ return nil |
} |