Index: impl/memory/taskqueue.go |
diff --git a/memory/taskqueue.go b/impl/memory/taskqueue.go |
similarity index 71% |
rename from memory/taskqueue.go |
rename to impl/memory/taskqueue.go |
index 4fcbaa30f857cb20ac631966b7fbed50d816552d..e80872b5fee84d0c843467d907cee05eb7035162 100644 |
--- a/memory/taskqueue.go |
+++ b/impl/memory/taskqueue.go |
@@ -12,24 +12,26 @@ import ( |
"golang.org/x/net/context" |
"github.com/luci/gae" |
- "github.com/luci/gae/dummy" |
+ "github.com/luci/gae/impl/dummy" |
+ tq "github.com/luci/gae/service/taskqueue" |
+ "github.com/luci/luci-go/common/mathrand" |
) |
/////////////////////////////// public functions /////////////////////////////// |
func useTQ(c context.Context) context.Context { |
- return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { |
+ return tq.SetFactory(c, func(ic context.Context) tq.Interface { |
tqd := cur(ic).Get(memContextTQIdx) |
if x, ok := tqd.(*taskQueueData); ok { |
return &taskqueueImpl{ |
- dummy.TQ(), |
+ dummy.TaskQueue(), |
x, |
ic, |
curGID(ic).namespace, |
} |
} |
return &taskqueueTxnImpl{ |
- dummy.TQ(), |
+ dummy.TaskQueue(), |
tqd.(*txnTaskQueueData), |
ic, |
curGID(ic).namespace, |
@@ -40,7 +42,7 @@ func useTQ(c context.Context) context.Context { |
//////////////////////////////// taskqueueImpl ///////////////////////////////// |
type taskqueueImpl struct { |
- gae.TaskQueue |
+ tq.Interface |
*taskQueueData |
ctx context.Context |
@@ -48,11 +50,11 @@ type taskqueueImpl struct { |
} |
var ( |
- _ = gae.TaskQueue((*taskqueueImpl)(nil)) |
- _ = gae.TQTestable((*taskqueueImpl)(nil)) |
+ _ = tq.Interface((*taskqueueImpl)(nil)) |
+ _ = tq.Testable((*taskqueueImpl)(nil)) |
) |
-func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
+func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
@@ -60,9 +62,9 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa |
if _, ok := t.archived[queueName][toSched.Name]; ok { |
// SDK converts TOMBSTONE -> already added too |
- return nil, gae.ErrTQTaskAlreadyAdded |
+ return nil, tq.ErrTaskAlreadyAdded |
} else if _, ok := t.named[queueName][toSched.Name]; ok { |
- return nil, gae.ErrTQTaskAlreadyAdded |
+ return nil, tq.ErrTaskAlreadyAdded |
} else { |
t.named[queueName][toSched.Name] = toSched |
} |
@@ -70,13 +72,13 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa |
return dupTask(toSched), nil |
} |
-func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
+func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
t.Lock() |
defer t.Unlock() |
return t.addLocked(task, queueName) |
} |
-func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { |
+func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
queueName, err := t.getQueueName(queueName) |
if err != nil { |
return err |
@@ -96,24 +98,24 @@ func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { |
return nil |
} |
-func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { |
+func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { |
t.Lock() |
defer t.Unlock() |
return t.deleteLocked(task, queueName) |
} |
-func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask, error) { |
+func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) { |
t.Lock() |
defer t.Unlock() |
return multi(tasks, queueName, t.addLocked) |
} |
-func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { |
+func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
t.Lock() |
defer t.Unlock() |
_, err := multi(tasks, queueName, |
- func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { |
+ func(tsk *tq.Task, qn string) (*tq.Task, error) { |
return nil, t.deleteLocked(tsk, qn) |
}) |
return err |
@@ -122,7 +124,7 @@ func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error |
/////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
type taskqueueTxnImpl struct { |
- gae.TaskQueue |
+ tq.Interface |
*txnTaskQueueData |
ctx context.Context |
@@ -130,11 +132,11 @@ type taskqueueTxnImpl struct { |
} |
var _ interface { |
- gae.TaskQueue |
- gae.TQTestable |
+ tq.Interface |
+ tq.Testable |
} = (*taskqueueTxnImpl)(nil) |
-func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
+func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
@@ -166,7 +168,7 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T |
return toRet, nil |
} |
-func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) { |
+func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) { |
err = t.run(func() (err error) { |
t.Lock() |
defer t.Unlock() |
@@ -176,7 +178,7 @@ func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae |
return |
} |
-func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) { |
+func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) { |
err = t.run(func() (err error) { |
t.Lock() |
defer t.Unlock() |
@@ -192,12 +194,12 @@ var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_" |
-func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string { |
+func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
_, ok := queue[cur] |
for !ok && cur == "" { |
name := [500]byte{} |
for i := 0; i < 500; i++ { |
- name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(validTaskChars))] |
+ name[i] = validTaskChars[mathrand.Get(c).Intn(len(validTaskChars))] |
} |
cur = string(name[:]) |
_, ok = queue[cur] |
@@ -205,8 +207,8 @@ func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string |
return cur |
} |
-func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) { |
- ret := []*gae.TQTask(nil) |
+func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) { |
+ ret := []*tq.Task(nil) |
lme := gae.LazyMultiError{Size: len(tasks)} |
for i, task := range tasks { |
rt, err := f(task, queueName) |
@@ -216,8 +218,8 @@ func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (* |
return ret, lme.Get() |
} |
-func dupTask(t *gae.TQTask) *gae.TQTask { |
- ret := &gae.TQTask{} |
+func dupTask(t *tq.Task) *tq.Task { |
+ ret := &tq.Task{} |
*ret = *t |
if t.Header != nil { |
@@ -235,17 +237,17 @@ func dupTask(t *gae.TQTask) *gae.TQTask { |
} |
if t.RetryOptions != nil { |
- ret.RetryOptions = &gae.TQRetryOptions{} |
+ ret.RetryOptions = &tq.RetryOptions{} |
*ret.RetryOptions = *t.RetryOptions |
} |
return ret |
} |
-func dupQueue(q gae.QueueData) gae.QueueData { |
- r := make(gae.QueueData, len(q)) |
+func dupQueue(q tq.QueueData) tq.QueueData { |
+ r := make(tq.QueueData, len(q)) |
for k, q := range q { |
- r[k] = make(map[string]*gae.TQTask, len(q)) |
+ r[k] = make(map[string]*tq.Task, len(q)) |
for tn, t := range q { |
r[k][tn] = dupTask(t) |
} |