Index: impl/memory/taskqueue_data.go |
diff --git a/impl/memory/taskqueue_data.go b/impl/memory/taskqueue_data.go |
index 906da1c8fe7a9a6ef687c9a384c01436106ffecd..72375f435e447eca7e9ad6156270213e448c54c5 100644 |
--- a/impl/memory/taskqueue_data.go |
+++ b/impl/memory/taskqueue_data.go |
@@ -106,7 +106,7 @@ func (t *taskQueueData) ResetTasks() { |
t.resetTasksWithLock() |
} |
-func (t *taskQueueData) getQueueName(queueName string) (string, error) { |
+func (t *taskQueueData) getQueueNameLocked(queueName string) (string, error) { |
if queueName == "" { |
queueName = "default" |
} |
@@ -116,6 +116,17 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) { |
return queueName, nil |
} |
+func (t *taskQueueData) purgeLocked(queueName string) error { |
+ queueName, err := t.getQueueNameLocked(queueName) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ t.named[queueName] = map[string]*tq.Task{} |
+ t.archived[queueName] = map[string]*tq.Task{} |
+ return nil |
+} |
+ |
var tqOkMethods = map[string]struct{}{ |
"GET": {}, |
"POST": {}, |
@@ -124,13 +135,8 @@ var tqOkMethods = map[string]struct{}{ |
"DELETE": {}, |
} |
-func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, string, error) { |
- queueName, err := t.getQueueName(queueName) |
- if err != nil { |
- return nil, "", err |
- } |
- |
- toSched := dupTask(task) |
+func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, error) { |
+ toSched := task.Duplicate() |
if toSched.Path == "" { |
toSched.Path = "/_ah/queue/" + queueName |
@@ -147,7 +153,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu |
toSched.Method = "POST" |
} |
if _, ok := tqOkMethods[toSched.Method]; !ok { |
- return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method) |
+ return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Method) |
} |
if toSched.Method != "POST" && toSched.Method != "PUT" { |
toSched.Payload = nil |
@@ -167,11 +173,11 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu |
toSched.Name = mkName(c, "", t.named[queueName]) |
} else { |
if !validTaskName.MatchString(toSched.Name) { |
- return nil, "", errors.New("INVALID_TASK_NAME") |
+ return nil, errors.New("INVALID_TASK_NAME") |
} |
} |
- return toSched, queueName, nil |
+ return toSched, nil |
} |
/////////////////////////////// txnTaskQueueData /////////////////////////////// |
@@ -201,15 +207,6 @@ func (t *txnTaskQueueData) endTxn() { |
atomic.StoreInt32(&t.closed, 1) |
} |
-func (t *txnTaskQueueData) run(f func() error) error { |
- // Slightly different from the SDK... datastore and taskqueue each implement |
- // this here, where in the SDK only datastore.transaction.Call does. |
- if atomic.LoadInt32(&t.closed) == 1 { |
- return fmt.Errorf("taskqueue: transaction context has expired") |
- } |
- return f() |
-} |
- |
func (t *txnTaskQueueData) ResetTasks() { |
t.Lock() |
defer t.Unlock() |
@@ -237,7 +234,7 @@ func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData { |
for k, vs := range t.anony { |
ret[k] = make([]*tq.Task, len(vs)) |
for i, v := range vs { |
- tsk := dupTask(v) |
+ tsk := v.Duplicate() |
tsk.Name = "" |
ret[k][i] = tsk |
} |