| Index: impl/memory/taskqueue.go
|
| diff --git a/memory/taskqueue.go b/impl/memory/taskqueue.go
|
| similarity index 71%
|
| rename from memory/taskqueue.go
|
| rename to impl/memory/taskqueue.go
|
| index 4fcbaa30f857cb20ac631966b7fbed50d816552d..e80872b5fee84d0c843467d907cee05eb7035162 100644
|
| --- a/memory/taskqueue.go
|
| +++ b/impl/memory/taskqueue.go
|
| @@ -12,24 +12,26 @@ import (
|
| "golang.org/x/net/context"
|
|
|
| "github.com/luci/gae"
|
| - "github.com/luci/gae/dummy"
|
| + "github.com/luci/gae/impl/dummy"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
| + "github.com/luci/luci-go/common/mathrand"
|
| )
|
|
|
| /////////////////////////////// public functions ///////////////////////////////
|
|
|
| func useTQ(c context.Context) context.Context {
|
| - return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue {
|
| + return tq.SetFactory(c, func(ic context.Context) tq.Interface {
|
| tqd := cur(ic).Get(memContextTQIdx)
|
| if x, ok := tqd.(*taskQueueData); ok {
|
| return &taskqueueImpl{
|
| - dummy.TQ(),
|
| + dummy.TaskQueue(),
|
| x,
|
| ic,
|
| curGID(ic).namespace,
|
| }
|
| }
|
| return &taskqueueTxnImpl{
|
| - dummy.TQ(),
|
| + dummy.TaskQueue(),
|
| tqd.(*txnTaskQueueData),
|
| ic,
|
| curGID(ic).namespace,
|
| @@ -40,7 +42,7 @@ func useTQ(c context.Context) context.Context {
|
| //////////////////////////////// taskqueueImpl /////////////////////////////////
|
|
|
| type taskqueueImpl struct {
|
| - gae.TaskQueue
|
| + tq.Interface
|
| *taskQueueData
|
|
|
| ctx context.Context
|
| @@ -48,11 +50,11 @@ type taskqueueImpl struct {
|
| }
|
|
|
| var (
|
| - _ = gae.TaskQueue((*taskqueueImpl)(nil))
|
| - _ = gae.TQTestable((*taskqueueImpl)(nil))
|
| + _ = tq.Interface((*taskqueueImpl)(nil))
|
| + _ = tq.Testable((*taskqueueImpl)(nil))
|
| )
|
|
|
| -func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| +func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
|
| toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
|
| if err != nil {
|
| return nil, err
|
| @@ -60,9 +62,9 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
|
|
|
| if _, ok := t.archived[queueName][toSched.Name]; ok {
|
| // SDK converts TOMBSTONE -> already added too
|
| - return nil, gae.ErrTQTaskAlreadyAdded
|
| + return nil, tq.ErrTaskAlreadyAdded
|
| } else if _, ok := t.named[queueName][toSched.Name]; ok {
|
| - return nil, gae.ErrTQTaskAlreadyAdded
|
| + return nil, tq.ErrTaskAlreadyAdded
|
| } else {
|
| t.named[queueName][toSched.Name] = toSched
|
| }
|
| @@ -70,13 +72,13 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
|
| return dupTask(toSched), nil
|
| }
|
|
|
| -func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| +func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
|
| t.Lock()
|
| defer t.Unlock()
|
| return t.addLocked(task, queueName)
|
| }
|
|
|
| -func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
|
| +func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
|
| queueName, err := t.getQueueName(queueName)
|
| if err != nil {
|
| return err
|
| @@ -96,24 +98,24 @@ func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
|
| return nil
|
| }
|
|
|
| -func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error {
|
| +func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error {
|
| t.Lock()
|
| defer t.Unlock()
|
| return t.deleteLocked(task, queueName)
|
| }
|
|
|
| -func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) {
|
| +func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
|
| t.Lock()
|
| defer t.Unlock()
|
| return multi(tasks, queueName, t.addLocked)
|
| }
|
|
|
| -func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error {
|
| +func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| _, err := multi(tasks, queueName,
|
| - func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) {
|
| + func(tsk *tq.Task, qn string) (*tq.Task, error) {
|
| return nil, t.deleteLocked(tsk, qn)
|
| })
|
| return err
|
| @@ -122,7 +124,7 @@ func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error
|
| /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
|
|
|
| type taskqueueTxnImpl struct {
|
| - gae.TaskQueue
|
| + tq.Interface
|
| *txnTaskQueueData
|
|
|
| ctx context.Context
|
| @@ -130,11 +132,11 @@ type taskqueueTxnImpl struct {
|
| }
|
|
|
| var _ interface {
|
| - gae.TaskQueue
|
| - gae.TQTestable
|
| + tq.Interface
|
| + tq.Testable
|
| } = (*taskqueueTxnImpl)(nil)
|
|
|
| -func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
|
| +func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
|
| toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
|
| if err != nil {
|
| return nil, err
|
| @@ -166,7 +168,7 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
|
| return toRet, nil
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
|
| +func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) {
|
| err = t.run(func() (err error) {
|
| t.Lock()
|
| defer t.Unlock()
|
| @@ -176,7 +178,7 @@ func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae
|
| return
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
|
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) {
|
| err = t.run(func() (err error) {
|
| t.Lock()
|
| defer t.Unlock()
|
| @@ -192,12 +194,12 @@ var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
|
|
|
| const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
|
|
|
| -func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string {
|
| +func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
|
| _, ok := queue[cur]
|
| for !ok && cur == "" {
|
| name := [500]byte{}
|
| for i := 0; i < 500; i++ {
|
| - name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(validTaskChars))]
|
| + name[i] = validTaskChars[mathrand.Get(c).Intn(len(validTaskChars))]
|
| }
|
| cur = string(name[:])
|
| _, ok = queue[cur]
|
| @@ -205,8 +207,8 @@ func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string
|
| return cur
|
| }
|
|
|
| -func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) {
|
| - ret := []*gae.TQTask(nil)
|
| +func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) {
|
| + ret := []*tq.Task(nil)
|
| lme := gae.LazyMultiError{Size: len(tasks)}
|
| for i, task := range tasks {
|
| rt, err := f(task, queueName)
|
| @@ -216,8 +218,8 @@ func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*
|
| return ret, lme.Get()
|
| }
|
|
|
| -func dupTask(t *gae.TQTask) *gae.TQTask {
|
| - ret := &gae.TQTask{}
|
| +func dupTask(t *tq.Task) *tq.Task {
|
| + ret := &tq.Task{}
|
| *ret = *t
|
|
|
| if t.Header != nil {
|
| @@ -235,17 +237,17 @@ func dupTask(t *gae.TQTask) *gae.TQTask {
|
| }
|
|
|
| if t.RetryOptions != nil {
|
| - ret.RetryOptions = &gae.TQRetryOptions{}
|
| + ret.RetryOptions = &tq.RetryOptions{}
|
| *ret.RetryOptions = *t.RetryOptions
|
| }
|
|
|
| return ret
|
| }
|
|
|
| -func dupQueue(q gae.QueueData) gae.QueueData {
|
| - r := make(gae.QueueData, len(q))
|
| +func dupQueue(q tq.QueueData) tq.QueueData {
|
| + r := make(tq.QueueData, len(q))
|
| for k, q := range q {
|
| - r[k] = make(map[string]*gae.TQTask, len(q))
|
| + r[k] = make(map[string]*tq.Task, len(q))
|
| for tn, t := range q {
|
| r[k][tn] = dupTask(t)
|
| }
|
|
|