Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Unified Diff: impl/prod/taskqueue.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: address comments Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/prod/memcache.go ('k') | service/datastore/context.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/prod/taskqueue.go
diff --git a/impl/prod/taskqueue.go b/impl/prod/taskqueue.go
index 815db3a734145680ba995038c483442aa57f604a..ce0f443dfb528cef53fa566de815775f899f5b65 100644
--- a/impl/prod/taskqueue.go
+++ b/impl/prod/taskqueue.go
@@ -9,15 +9,15 @@ import (
"reflect"
tq "github.com/luci/gae/service/taskqueue"
- "github.com/luci/luci-go/common/errors"
"golang.org/x/net/context"
+ "google.golang.org/appengine"
"google.golang.org/appengine/taskqueue"
)
// useTQ adds a gae.TaskQueue implementation to context, accessible
// by gae.GetTQ(c)
func useTQ(c context.Context) context.Context {
- return tq.SetFactory(c, func(ci context.Context) tq.Interface {
+ return tq.SetRawFactory(c, func(ci context.Context) tq.RawInterface {
return tqImpl{ci}
})
}
@@ -43,11 +43,10 @@ func init() {
}
}
-// tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a
-// *tq.Task, and passes through an error.
-func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
- if err != nil {
- return nil, err
+// tqR2F (TQ real-to-fake) converts a *taskqueue.Task to a *tq.Task.
+func tqR2F(o *taskqueue.Task) *tq.Task {
+ if o == nil {
+ return nil
}
n := tq.Task{}
n.Path = o.Path
@@ -59,7 +58,7 @@ func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) {
n.ETA = o.ETA
n.RetryCount = o.RetryCount
n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions)
- return &n, nil
+ return &n
}
// tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task.
@@ -77,19 +76,6 @@ func tqF2R(n *tq.Task) *taskqueue.Task {
return &o
}
-// tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of
-// *taskqueue.Task to a slice of *tq.Task
-func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) {
- if err != nil {
- return nil, errors.Fix(err)
- }
- ret := make([]*tq.Task, len(os))
- for i, t := range os {
- ret[i], _ = tqR2FErr(t, nil)
- }
- return ret, nil
-}
-
// tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task.
func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
ret := make([]*taskqueue.Task, len(ns))
@@ -99,52 +85,49 @@ func tqMF2R(ns []*tq.Task) []*taskqueue.Task {
return ret
}
-//////// TQSingleReadWriter
-
-func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
- return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName))
-}
-func (t tqImpl) Delete(task *tq.Task, queueName string) error {
- return taskqueue.Delete(t.Context, tqF2R(task), queueName)
-}
-
-//////// TQMultiReadWriter
-
-func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
- return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName))
-}
-func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
- return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName))
+func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
+ realTasks, err := taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
+ if err != nil {
+ if me, ok := err.(appengine.MultiError); ok {
+ for i, err := range me {
+ tsk := (*taskqueue.Task)(nil)
+ if realTasks != nil {
+ tsk = realTasks[i]
+ }
+ cb(tqR2F(tsk), err)
+ }
+ err = nil
+ }
+ } else {
+ for _, tsk := range realTasks {
+ cb(tqR2F(tsk), nil)
+ }
+ }
+ return err
}
-//////// TQLeaser
-
-func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task, error) {
- return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTime))
-}
-func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag string) ([]*tq.Task, error) {
- return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, leaseTime, tag))
-}
-func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) error {
- return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTime)
+func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
+ err := taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName)
+ if me, ok := err.(appengine.MultiError); ok {
+ for _, err := range me {
+ cb(err)
+ }
+ err = nil
+ }
+ return err
}
-//////// TQPurger
-
func (t tqImpl) Purge(queueName string) error {
return taskqueue.Purge(t.Context, queueName)
}
-//////// TQStatter
-
-func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) {
+func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
stats, err := taskqueue.QueueStats(t.Context, queueNames)
if err != nil {
- return nil, err
+ return err
}
- ret := make([]tq.Statistics, len(stats))
- for i, s := range stats {
- ret[i] = tq.Statistics(s)
+ for _, s := range stats {
+ cb((*tq.Statistics)(&s), nil)
}
- return ret, nil
+ return nil
}
« no previous file with comments | « impl/prod/memcache.go ('k') | service/datastore/context.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698