| Index: impl/memory/taskqueue_data.go
|
| diff --git a/impl/memory/taskqueue_data.go b/impl/memory/taskqueue_data.go
|
| index 906da1c8fe7a9a6ef687c9a384c01436106ffecd..72375f435e447eca7e9ad6156270213e448c54c5 100644
|
| --- a/impl/memory/taskqueue_data.go
|
| +++ b/impl/memory/taskqueue_data.go
|
| @@ -106,7 +106,7 @@ func (t *taskQueueData) ResetTasks() {
|
| t.resetTasksWithLock()
|
| }
|
|
|
| -func (t *taskQueueData) getQueueName(queueName string) (string, error) {
|
| +func (t *taskQueueData) getQueueNameLocked(queueName string) (string, error) {
|
| if queueName == "" {
|
| queueName = "default"
|
| }
|
| @@ -116,6 +116,17 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) {
|
| return queueName, nil
|
| }
|
|
|
| +func (t *taskQueueData) purgeLocked(queueName string) error {
|
| + queueName, err := t.getQueueNameLocked(queueName)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + t.named[queueName] = map[string]*tq.Task{}
|
| + t.archived[queueName] = map[string]*tq.Task{}
|
| + return nil
|
| +}
|
| +
|
| var tqOkMethods = map[string]struct{}{
|
| "GET": {},
|
| "POST": {},
|
| @@ -124,13 +135,8 @@ var tqOkMethods = map[string]struct{}{
|
| "DELETE": {},
|
| }
|
|
|
| -func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, string, error) {
|
| - queueName, err := t.getQueueName(queueName)
|
| - if err != nil {
|
| - return nil, "", err
|
| - }
|
| -
|
| - toSched := dupTask(task)
|
| +func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, queueName string) (*tq.Task, error) {
|
| + toSched := task.Duplicate()
|
|
|
| if toSched.Path == "" {
|
| toSched.Path = "/_ah/queue/" + queueName
|
| @@ -147,7 +153,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu
|
| toSched.Method = "POST"
|
| }
|
| if _, ok := tqOkMethods[toSched.Method]; !ok {
|
| - return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method)
|
| + return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Method)
|
| }
|
| if toSched.Method != "POST" && toSched.Method != "PUT" {
|
| toSched.Payload = nil
|
| @@ -167,11 +173,11 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu
|
| toSched.Name = mkName(c, "", t.named[queueName])
|
| } else {
|
| if !validTaskName.MatchString(toSched.Name) {
|
| - return nil, "", errors.New("INVALID_TASK_NAME")
|
| + return nil, errors.New("INVALID_TASK_NAME")
|
| }
|
| }
|
|
|
| - return toSched, queueName, nil
|
| + return toSched, nil
|
| }
|
|
|
| /////////////////////////////// txnTaskQueueData ///////////////////////////////
|
| @@ -201,15 +207,6 @@ func (t *txnTaskQueueData) endTxn() {
|
| atomic.StoreInt32(&t.closed, 1)
|
| }
|
|
|
| -func (t *txnTaskQueueData) run(f func() error) error {
|
| - // Slightly different from the SDK... datastore and taskqueue each implement
|
| - // this here, where in the SDK only datastore.transaction.Call does.
|
| - if atomic.LoadInt32(&t.closed) == 1 {
|
| - return fmt.Errorf("taskqueue: transaction context has expired")
|
| - }
|
| - return f()
|
| -}
|
| -
|
| func (t *txnTaskQueueData) ResetTasks() {
|
| t.Lock()
|
| defer t.Unlock()
|
| @@ -237,7 +234,7 @@ func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
|
| for k, vs := range t.anony {
|
| ret[k] = make([]*tq.Task, len(vs))
|
| for i, v := range vs {
|
| - tsk := dupTask(v)
|
| + tsk := v.Duplicate()
|
| tsk.Name = ""
|
| ret[k][i] = tsk
|
| }
|
|
|