Index: impl/memory/taskqueue_data.go |
diff --git a/memory/taskqueue_data.go b/impl/memory/taskqueue_data.go |
similarity index 78% |
rename from memory/taskqueue_data.go |
rename to impl/memory/taskqueue_data.go |
index 064a181f8c55f65c1d7de17d6bb26933cef1eab0..c8420a86d79108513b034438fcfcf1c110d77e42 100644 |
--- a/memory/taskqueue_data.go |
+++ b/impl/memory/taskqueue_data.go |
@@ -13,7 +13,8 @@ import ( |
"golang.org/x/net/context" |
- "github.com/luci/gae" |
+ rds "github.com/luci/gae/service/rawdatastore" |
+ tq "github.com/luci/gae/service/taskqueue" |
"github.com/luci/luci-go/common/clock" |
) |
@@ -27,19 +28,19 @@ var ( |
type taskQueueData struct { |
sync.Mutex |
- named gae.QueueData |
- archived gae.QueueData |
+ named tq.QueueData |
+ archived tq.QueueData |
} |
var ( |
_ = memContextObj((*taskQueueData)(nil)) |
- _ = gae.TQTestable((*taskQueueData)(nil)) |
+ _ = tq.Testable((*taskQueueData)(nil)) |
) |
func newTaskQueueData() memContextObj { |
return &taskQueueData{ |
- named: gae.QueueData{"default": {}}, |
- archived: gae.QueueData{"default": {}}, |
+ named: tq.QueueData{"default": {}}, |
+ archived: tq.QueueData{"default": {}}, |
} |
} |
@@ -55,14 +56,14 @@ func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { |
} |
txn.anony = nil |
} |
-func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { |
+func (t *taskQueueData) mkTxn(*rds.TransactionOptions) memContextObj { |
return &txnTaskQueueData{ |
parent: t, |
- anony: gae.AnonymousQueueData{}, |
+ anony: tq.AnonymousQueueData{}, |
} |
} |
-func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData { |
+func (t *taskQueueData) GetTransactionTasks() tq.AnonymousQueueData { |
return nil |
} |
@@ -73,18 +74,18 @@ func (t *taskQueueData) CreateQueue(queueName string) { |
if _, ok := t.named[queueName]; ok { |
panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName)) |
} |
- t.named[queueName] = map[string]*gae.TQTask{} |
- t.archived[queueName] = map[string]*gae.TQTask{} |
+ t.named[queueName] = map[string]*tq.Task{} |
+ t.archived[queueName] = map[string]*tq.Task{} |
} |
-func (t *taskQueueData) GetScheduledTasks() gae.QueueData { |
+func (t *taskQueueData) GetScheduledTasks() tq.QueueData { |
t.Lock() |
defer t.Unlock() |
return dupQueue(t.named) |
} |
-func (t *taskQueueData) GetTombstonedTasks() gae.QueueData { |
+func (t *taskQueueData) GetTombstonedTasks() tq.QueueData { |
t.Lock() |
defer t.Unlock() |
@@ -93,8 +94,8 @@ func (t *taskQueueData) GetTombstonedTasks() gae.QueueData { |
func (t *taskQueueData) resetTasksWithLock() { |
for queueName := range t.named { |
- t.named[queueName] = map[string]*gae.TQTask{} |
- t.archived[queueName] = map[string]*gae.TQTask{} |
+ t.named[queueName] = map[string]*tq.Task{} |
+ t.archived[queueName] = map[string]*tq.Task{} |
} |
} |
@@ -123,7 +124,7 @@ var tqOkMethods = map[string]struct{}{ |
"DELETE": {}, |
} |
-func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) { |
+func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, string, error) { |
queueName, err := t.getQueueName(queueName) |
if err != nil { |
return nil, "", err |
@@ -180,18 +181,18 @@ type txnTaskQueueData struct { |
// boolean 0 or 1, use atomic.*Int32 to access. |
closed int32 |
- anony gae.AnonymousQueueData |
+ anony tq.AnonymousQueueData |
parent *taskQueueData |
} |
var ( |
_ = memContextObj((*txnTaskQueueData)(nil)) |
- _ = gae.TQTestable((*txnTaskQueueData)(nil)) |
+ _ = tq.Testable((*txnTaskQueueData)(nil)) |
) |
-func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } |
-func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic("impossible") } |
-func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { panic("impossible") } |
+func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } |
+func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic("impossible") } |
+func (t *txnTaskQueueData) mkTxn(*rds.TransactionOptions) memContextObj { panic("impossible") } |
func (t *txnTaskQueueData) endTxn() { |
if atomic.LoadInt32(&t.closed) == 1 { |
@@ -228,13 +229,13 @@ func (t *txnTaskQueueData) Unlock() { |
t.lock.Unlock() |
} |
-func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData { |
+func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData { |
t.Lock() |
defer t.Unlock() |
- ret := make(gae.AnonymousQueueData, len(t.anony)) |
+ ret := make(tq.AnonymousQueueData, len(t.anony)) |
for k, vs := range t.anony { |
- ret[k] = make([]*gae.TQTask, len(vs)) |
+ ret[k] = make([]*tq.Task, len(vs)) |
for i, v := range vs { |
tsk := dupTask(v) |
tsk.Name = "" |
@@ -245,11 +246,11 @@ func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData { |
return ret |
} |
-func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData { |
+func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData { |
return t.parent.GetTombstonedTasks() |
} |
-func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData { |
+func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { |
return t.parent.GetScheduledTasks() |
} |