Index: impl/prod/taskqueue.go |
diff --git a/prod/taskqueue.go b/impl/prod/taskqueue.go |
similarity index 66% |
rename from prod/taskqueue.go |
rename to impl/prod/taskqueue.go |
index d0fdd8a9af5462b97ea74dd26f13ef8cae291cc5..9a9fb3882d7c060174942f79663acef2579cf7b8 100644 |
--- a/prod/taskqueue.go |
+++ b/impl/prod/taskqueue.go |
@@ -9,6 +9,7 @@ import ( |
"reflect" |
"github.com/luci/gae" |
+ tq "github.com/luci/gae/service/taskqueue" |
"golang.org/x/net/context" |
"google.golang.org/appengine/taskqueue" |
) |
@@ -16,7 +17,7 @@ import ( |
// useTQ adds a gae.TaskQueue implementation to context, accessible |
// by gae.GetTQ(c) |
func useTQ(c context.Context) context.Context { |
- return gae.SetTQFactory(c, func(ci context.Context) gae.TaskQueue { |
+ return tq.SetFactory(c, func(ci context.Context) tq.Interface { |
return tqImpl{ci} |
}) |
} |
@@ -26,16 +27,16 @@ type tqImpl struct { |
} |
func init() { |
- const TASK_EXPECTED_FIELDS = 9 |
+ const taskExpectedFields = 9 |
// Runtime-assert that the number of fields in the Task structs is 9, to |
// avoid missing additional fields if they're added later. |
// all other type assertions are statically enforced by o2n() and tqF2R() |
oldType := reflect.TypeOf((*taskqueue.Task)(nil)) |
- newType := reflect.TypeOf((*gae.TQTask)(nil)) |
+ newType := reflect.TypeOf((*tq.Task)(nil)) |
if oldType.NumField() != newType.NumField() || |
- oldType.NumField() != TASK_EXPECTED_FIELDS { |
+ oldType.NumField() != taskExpectedFields { |
panic(fmt.Errorf( |
"prod/taskqueue:init() field count differs: %v, %v", |
oldType, newType)) |
@@ -43,12 +44,12 @@ func init() { |
} |
// tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a |
-// *gae.TQTask, and passes through an error. |
-func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) { |
+// *tq.Task, and passes through an error. |
+func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) { |
if err != nil { |
return nil, err |
} |
- n := gae.TQTask{} |
+ n := tq.Task{} |
n.Path = o.Path |
n.Payload = o.Payload |
n.Header = o.Header |
@@ -57,12 +58,12 @@ func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) { |
n.Delay = o.Delay |
n.ETA = o.ETA |
n.RetryCount = o.RetryCount |
- n.RetryOptions = (*gae.TQRetryOptions)(o.RetryOptions) |
+ n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions) |
return &n, nil |
} |
-// tqF2R (TQ fake-to-real) converts a *gae.TQTask to a *taskqueue.Task. |
-func tqF2R(n *gae.TQTask) *taskqueue.Task { |
+// tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task. |
+func tqF2R(n *tq.Task) *taskqueue.Task { |
o := taskqueue.Task{} |
o.Path = n.Path |
o.Payload = n.Payload |
@@ -77,20 +78,20 @@ func tqF2R(n *gae.TQTask) *taskqueue.Task { |
} |
// tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of |
-// *taskqueue.Task to a slice of *gae.TQTask |
-func tqMR2FErr(os []*taskqueue.Task, err error) ([]*gae.TQTask, error) { |
+// *taskqueue.Task to a slice of *tq.Task |
+func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) { |
if err != nil { |
return nil, gae.FixError(err) |
} |
- ret := make([]*gae.TQTask, len(os)) |
+ ret := make([]*tq.Task, len(os)) |
for i, t := range os { |
ret[i], _ = tqR2FErr(t, nil) |
} |
return ret, nil |
} |
-// tqMF2R (TQ multi-fake-to-real) converts []*gae.TQTask to []*taskqueue.Task. |
-func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task { |
+// tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. |
+func tqMF2R(ns []*tq.Task) []*taskqueue.Task { |
ret := make([]*taskqueue.Task, len(ns)) |
for i, t := range ns { |
ret[i] = tqF2R(t) |
@@ -100,31 +101,31 @@ func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task { |
//////// TQSingleReadWriter |
-func (t tqImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
+func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) |
} |
-func (t tqImpl) Delete(task *gae.TQTask, queueName string) error { |
+func (t tqImpl) Delete(task *tq.Task, queueName string) error { |
return taskqueue.Delete(t.Context, tqF2R(task), queueName) |
} |
//////// TQMultiReadWriter |
-func (t tqImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) { |
+func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) { |
return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)) |
} |
-func (t tqImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { |
+func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName)) |
} |
//////// TQLeaser |
-func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*gae.TQTask, error) { |
+func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task, error) { |
return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTime)) |
} |
-func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag string) ([]*gae.TQTask, error) { |
+func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag string) ([]*tq.Task, error) { |
return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, leaseTime, tag)) |
} |
-func (t tqImpl) ModifyLease(task *gae.TQTask, queueName string, leaseTime int) error { |
+func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) error { |
return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTime) |
} |
@@ -136,14 +137,14 @@ func (t tqImpl) Purge(queueName string) error { |
//////// TQStatter |
-func (t tqImpl) QueueStats(queueNames []string) ([]gae.TQStatistics, error) { |
+func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) { |
stats, err := taskqueue.QueueStats(t.Context, queueNames) |
if err != nil { |
return nil, err |
} |
- ret := make([]gae.TQStatistics, len(stats)) |
+ ret := make([]tq.Statistics, len(stats)) |
for i, s := range stats { |
- ret[i] = gae.TQStatistics(s) |
+ ret[i] = tq.Statistics(s) |
} |
return ret, nil |
} |