| Index: impl/prod/taskqueue.go
|
| diff --git a/impl/prod/taskqueue.go b/impl/prod/taskqueue.go
|
| index 815db3a734145680ba995038c483442aa57f604a..ce0f443dfb528cef53fa566de815775f899f5b65 100644
|
| --- a/impl/prod/taskqueue.go
|
| +++ b/impl/prod/taskqueue.go
|
| @@ -9,15 +9,15 @@ import (
|
| "reflect"
|
|
|
| tq "github.com/luci/gae/service/taskqueue"
|
| - "github.com/luci/luci-go/common/errors"
|
| "golang.org/x/net/context"
|
| + "google.golang.org/appengine"
|
| "google.golang.org/appengine/taskqueue"
|
| )
|
|
|
| // useTQ adds a gae.TaskQueue implementation to context, accessible
|
| // by gae.GetTQ(c)
|
| func useTQ(c context.Context) context.Context {
|
| - return tq.SetFactory(c, func(ci context.Context) tq.Interface {
|
| + return tq.SetRawFactory(c, func(ci context.Context) tq.RawInterface {
|
| return tqImpl{ci}
|
| })
|
| }
|
| @@ -43,11 +43,10 @@ func init() {
|
| }
|
| }
|
|
|
| -// tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a
|
| -// *tq.Task, and passes through an error.
|
| -func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
|
| - if err != nil {
|
| - return nil, err
|
| +// tqR2F (TQ real-to-fake) converts a *taskqueue.Task to a *tq.Task.
|
| +func tqR2F(o *taskqueue.Task) *tq.Task {
|
| + if o == nil {
|
| + return nil
|
| }
|
| n := tq.Task{}
|
| n.Path = o.Path
|
| @@ -59,7 +58,7 @@ func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
|
| n.ETA = o.ETA
|
| n.RetryCount = o.RetryCount
|
| n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions)
|
| - return &n, nil
|
| + return &n
|
| }
|
|
|
| // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task.
|
| @@ -77,19 +76,6 @@ func tqF2R(n *tq.Task) *taskqueue.Task {
|
| return &o
|
| }
|
|
|
| -// tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of
|
| -// *taskqueue.Task to a slice of *tq.Task
|
| -func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) {
|
| - if err != nil {
|
| - return nil, errors.Fix(err)
|
| - }
|
| - ret := make([]*tq.Task, len(os))
|
| - for i, t := range os {
|
| - ret[i], _ = tqR2FErr(t, nil)
|
| - }
|
| - return ret, nil
|
| -}
|
| -
|
| // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
|
| func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
|
| ret := make([]*taskqueue.Task, len(ns))
|
| @@ -99,52 +85,49 @@ func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
|
| return ret
|
| }
|
|
|
| -//////// TQSingleReadWriter
|
| -
|
| -func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
|
| - return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName))
|
| -}
|
| -func (t tqImpl) Delete(task *tq.Task, queueName string) error {
|
| - return taskqueue.Delete(t.Context, tqF2R(task), queueName)
|
| -}
|
| -
|
| -//////// TQMultiReadWriter
|
| -
|
| -func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
|
| - return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName))
|
| -}
|
| -func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
|
| - return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName))
|
| +func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
|
| + realTasks, err := taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
|
| + if err != nil {
|
| + if me, ok := err.(appengine.MultiError); ok {
|
| + for i, err := range me {
|
| + tsk := (*taskqueue.Task)(nil)
|
| + if realTasks != nil {
|
| + tsk = realTasks[i]
|
| + }
|
| + cb(tqR2F(tsk), err)
|
| + }
|
| + err = nil
|
| + }
|
| + } else {
|
| + for _, tsk := range realTasks {
|
| + cb(tqR2F(tsk), nil)
|
| + }
|
| + }
|
| + return err
|
| }
|
|
|
| -//////// TQLeaser
|
| -
|
| -func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task, error) {
|
| - return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTime))
|
| -}
|
| -func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag string) ([]*tq.Task, error) {
|
| - return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, leaseTime, tag))
|
| -}
|
| -func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) error {
|
| - return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTime)
|
| +func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
|
| + err := taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName)
|
| + if me, ok := err.(appengine.MultiError); ok {
|
| + for _, err := range me {
|
| + cb(err)
|
| + }
|
| + err = nil
|
| + }
|
| + return err
|
| }
|
|
|
| -//////// TQPurger
|
| -
|
| func (t tqImpl) Purge(queueName string) error {
|
| return taskqueue.Purge(t.Context, queueName)
|
| }
|
|
|
| -//////// TQStatter
|
| -
|
| -func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) {
|
| +func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
|
| stats, err := taskqueue.QueueStats(t.Context, queueNames)
|
| if err != nil {
|
| - return nil, err
|
| + return err
|
| }
|
| - ret := make([]tq.Statistics, len(stats))
|
| - for i, s := range stats {
|
| - ret[i] = tq.Statistics(s)
|
| + for _, s := range stats {
|
| + cb((*tq.Statistics)(&s), nil)
|
| }
|
| - return ret, nil
|
| + return nil
|
| }
|
|
|