| Index: impl/memory/taskqueue_data.go
|
| diff --git a/memory/taskqueue_data.go b/impl/memory/taskqueue_data.go
|
| similarity index 78%
|
| rename from memory/taskqueue_data.go
|
| rename to impl/memory/taskqueue_data.go
|
| index 064a181f8c55f65c1d7de17d6bb26933cef1eab0..c8420a86d79108513b034438fcfcf1c110d77e42 100644
|
| --- a/memory/taskqueue_data.go
|
| +++ b/impl/memory/taskqueue_data.go
|
| @@ -13,7 +13,8 @@ import (
|
|
|
| "golang.org/x/net/context"
|
|
|
| - "github.com/luci/gae"
|
| + rds "github.com/luci/gae/service/rawdatastore"
|
| + tq "github.com/luci/gae/service/taskqueue"
|
| "github.com/luci/luci-go/common/clock"
|
| )
|
|
|
| @@ -27,19 +28,19 @@ var (
|
| type taskQueueData struct {
|
| sync.Mutex
|
|
|
| - named gae.QueueData
|
| - archived gae.QueueData
|
| + named tq.QueueData
|
| + archived tq.QueueData
|
| }
|
|
|
| var (
|
| _ = memContextObj((*taskQueueData)(nil))
|
| - _ = gae.TQTestable((*taskQueueData)(nil))
|
| + _ = tq.Testable((*taskQueueData)(nil))
|
| )
|
|
|
| func newTaskQueueData() memContextObj {
|
| return &taskQueueData{
|
| - named: gae.QueueData{"default": {}},
|
| - archived: gae.QueueData{"default": {}},
|
| + named: tq.QueueData{"default": {}},
|
| + archived: tq.QueueData{"default": {}},
|
| }
|
| }
|
|
|
| @@ -55,14 +56,14 @@ func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
|
| }
|
| txn.anony = nil
|
| }
|
| -func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj {
|
| +func (t *taskQueueData) mkTxn(*rds.TransactionOptions) memContextObj {
|
| return &txnTaskQueueData{
|
| parent: t,
|
| - anony: gae.AnonymousQueueData{},
|
| + anony: tq.AnonymousQueueData{},
|
| }
|
| }
|
|
|
| -func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
|
| +func (t *taskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
|
| return nil
|
| }
|
|
|
| @@ -73,18 +74,18 @@ func (t *taskQueueData) CreateQueue(queueName string) {
|
| if _, ok := t.named[queueName]; ok {
|
| panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
|
| }
|
| - t.named[queueName] = map[string]*gae.TQTask{}
|
| - t.archived[queueName] = map[string]*gae.TQTask{}
|
| + t.named[queueName] = map[string]*tq.Task{}
|
| + t.archived[queueName] = map[string]*tq.Task{}
|
| }
|
|
|
| -func (t *taskQueueData) GetScheduledTasks() gae.QueueData {
|
| +func (t *taskQueueData) GetScheduledTasks() tq.QueueData {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| return dupQueue(t.named)
|
| }
|
|
|
| -func (t *taskQueueData) GetTombstonedTasks() gae.QueueData {
|
| +func (t *taskQueueData) GetTombstonedTasks() tq.QueueData {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| @@ -93,8 +94,8 @@ func (t *taskQueueData) GetTombstonedTasks() gae.QueueData {
|
|
|
| func (t *taskQueueData) resetTasksWithLock() {
|
| for queueName := range t.named {
|
| - t.named[queueName] = map[string]*gae.TQTask{}
|
| - t.archived[queueName] = map[string]*gae.TQTask{}
|
| + t.named[queueName] = map[string]*tq.Task{}
|
| + t.archived[queueName] = map[string]*tq.Task{}
|
| }
|
| }
|
|
|
| @@ -123,7 +124,7 @@ var tqOkMethods = map[string]struct{}{
|
| "DELETE": {},
|
| }
|
|
|
| -func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) {
|
| +func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, string, error) {
|
| queueName, err := t.getQueueName(queueName)
|
| if err != nil {
|
| return nil, "", err
|
| @@ -180,18 +181,18 @@ type txnTaskQueueData struct {
|
|
|
| // boolean 0 or 1, use atomic.*Int32 to access.
|
| closed int32
|
| - anony gae.AnonymousQueueData
|
| + anony tq.AnonymousQueueData
|
| parent *taskQueueData
|
| }
|
|
|
| var (
|
| _ = memContextObj((*txnTaskQueueData)(nil))
|
| - _ = gae.TQTestable((*txnTaskQueueData)(nil))
|
| + _ = tq.Testable((*txnTaskQueueData)(nil))
|
| )
|
|
|
| -func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
|
| -func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic("impossible") }
|
| -func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { panic("impossible") }
|
| +func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
|
| +func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic("impossible") }
|
| +func (t *txnTaskQueueData) mkTxn(*rds.TransactionOptions) memContextObj { panic("impossible") }
|
|
|
| func (t *txnTaskQueueData) endTxn() {
|
| if atomic.LoadInt32(&t.closed) == 1 {
|
| @@ -228,13 +229,13 @@ func (t *txnTaskQueueData) Unlock() {
|
| t.lock.Unlock()
|
| }
|
|
|
| -func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
|
| +func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| - ret := make(gae.AnonymousQueueData, len(t.anony))
|
| + ret := make(tq.AnonymousQueueData, len(t.anony))
|
| for k, vs := range t.anony {
|
| - ret[k] = make([]*gae.TQTask, len(vs))
|
| + ret[k] = make([]*tq.Task, len(vs))
|
| for i, v := range vs {
|
| tsk := dupTask(v)
|
| tsk.Name = ""
|
| @@ -245,11 +246,11 @@ func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
|
| return ret
|
| }
|
|
|
| -func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData {
|
| +func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData {
|
| return t.parent.GetTombstonedTasks()
|
| }
|
|
|
| -func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData {
|
| +func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData {
|
| return t.parent.GetScheduledTasks()
|
| }
|
|
|
|
|