| Index: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
 | 
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
 | 
| deleted file mode 100644
 | 
| index 2d23b1d803ee3959a480d823cbd86364a53be40a..0000000000000000000000000000000000000000
 | 
| --- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
 | 
| +++ /dev/null
 | 
| @@ -1,265 +0,0 @@
 | 
| -// Copyright 2015 The Chromium Authors. All rights reserved.
 | 
| -// Use of this source code is governed by a BSD-style license that can be
 | 
| -// found in the LICENSE file.
 | 
| -
 | 
| -package memory
 | 
| -
 | 
| -import (
 | 
| -	"errors"
 | 
| -	"fmt"
 | 
| -	"infra/gae/libs/wrapper"
 | 
| -	"net/http"
 | 
| -	"sync"
 | 
| -	"sync/atomic"
 | 
| -
 | 
| -	"appengine/datastore"
 | 
| -	"appengine/taskqueue"
 | 
| -	pb "appengine_internal/taskqueue"
 | 
| -	"golang.org/x/net/context"
 | 
| -	"infra/libs/clock"
 | 
| -)
 | 
| -
 | 
| -var (
 | 
| -	currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
 | 
| -	defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
 | 
| -)
 | 
| -
 | 
| -//////////////////////////////// taskQueueData /////////////////////////////////
 | 
| -
 | 
| -type taskQueueData struct {
 | 
| -	sync.Mutex
 | 
| -	wrapper.BrokenFeatures
 | 
| -
 | 
| -	named    wrapper.QueueData
 | 
| -	archived wrapper.QueueData
 | 
| -}
 | 
| -
 | 
| -var (
 | 
| -	_ = memContextObj((*taskQueueData)(nil))
 | 
| -	_ = wrapper.TQTestable((*taskQueueData)(nil))
 | 
| -)
 | 
| -
 | 
| -func newTaskQueueData() memContextObj {
 | 
| -	return &taskQueueData{
 | 
| -		BrokenFeatures: wrapper.BrokenFeatures{
 | 
| -			DefaultError: newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)},
 | 
| -		named:    wrapper.QueueData{"default": {}},
 | 
| -		archived: wrapper.QueueData{"default": {}},
 | 
| -	}
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
 | 
| -func (t *taskQueueData) endTxn()                            {}
 | 
| -func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
 | 
| -	txn := obj.(*txnTaskQueueData)
 | 
| -	for qn, tasks := range txn.anony {
 | 
| -		for _, tsk := range tasks {
 | 
| -			tsk.Name = mkName(c, tsk.Name, t.named[qn])
 | 
| -			t.named[qn][tsk.Name] = tsk
 | 
| -		}
 | 
| -	}
 | 
| -	txn.anony = nil
 | 
| -}
 | 
| -func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
 | 
| -	return &txnTaskQueueData{
 | 
| -		BrokenFeatures: &t.BrokenFeatures,
 | 
| -		parent:         t,
 | 
| -		anony:          wrapper.AnonymousQueueData{},
 | 
| -	}, nil
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
 | 
| -	return nil
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) CreateQueue(queueName string) {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -
 | 
| -	if _, ok := t.named[queueName]; ok {
 | 
| -		panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
 | 
| -	}
 | 
| -	t.named[queueName] = map[string]*taskqueue.Task{}
 | 
| -	t.archived[queueName] = map[string]*taskqueue.Task{}
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -
 | 
| -	return dupQueue(t.named)
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -
 | 
| -	return dupQueue(t.archived)
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) resetTasksWithLock() {
 | 
| -	for queuename := range t.named {
 | 
| -		t.named[queuename] = map[string]*taskqueue.Task{}
 | 
| -		t.archived[queuename] = map[string]*taskqueue.Task{}
 | 
| -	}
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) ResetTasks() {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -
 | 
| -	t.resetTasksWithLock()
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) getQueueName(queueName string) (string, error) {
 | 
| -	if queueName == "" {
 | 
| -		queueName = "default"
 | 
| -	}
 | 
| -	if _, ok := t.named[queueName]; !ok {
 | 
| -		return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
 | 
| -	}
 | 
| -	return queueName, nil
 | 
| -}
 | 
| -
 | 
| -func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) (
 | 
| -	*taskqueue.Task, string, error) {
 | 
| -	queueName, err := t.getQueueName(queueName)
 | 
| -	if err != nil {
 | 
| -		return nil, "", err
 | 
| -	}
 | 
| -
 | 
| -	toSched := dupTask(task)
 | 
| -
 | 
| -	if toSched.Path == "" {
 | 
| -		return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
 | 
| -	}
 | 
| -
 | 
| -	if toSched.ETA.IsZero() {
 | 
| -		toSched.ETA = clock.Now(c).Add(toSched.Delay)
 | 
| -	} else if toSched.Delay != 0 {
 | 
| -		panic("taskqueue: both Delay and ETA are set")
 | 
| -	}
 | 
| -	toSched.Delay = 0
 | 
| -
 | 
| -	if toSched.Method == "" {
 | 
| -		toSched.Method = "POST"
 | 
| -	}
 | 
| -	if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
 | 
| -		return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method)
 | 
| -	}
 | 
| -	if toSched.Method != "POST" && toSched.Method != "PUT" {
 | 
| -		toSched.Payload = nil
 | 
| -	}
 | 
| -
 | 
| -	if _, ok := toSched.Header[currentNamespace]; !ok {
 | 
| -		if ns != "" {
 | 
| -			if toSched.Header == nil {
 | 
| -				toSched.Header = http.Header{}
 | 
| -			}
 | 
| -			toSched.Header[currentNamespace] = []string{ns}
 | 
| -		}
 | 
| -	}
 | 
| -	// TODO(riannucci): implement DefaultNamespace
 | 
| -
 | 
| -	if toSched.Name == "" {
 | 
| -		toSched.Name = mkName(c, "", t.named[queueName])
 | 
| -	} else {
 | 
| -		if !validTaskName.MatchString(toSched.Name) {
 | 
| -			return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME)
 | 
| -		}
 | 
| -	}
 | 
| -
 | 
| -	return toSched, queueName, nil
 | 
| -}
 | 
| -
 | 
| -/////////////////////////////// txnTaskQueueData ///////////////////////////////
 | 
| -
 | 
| -type txnTaskQueueData struct {
 | 
| -	*wrapper.BrokenFeatures
 | 
| -
 | 
| -	lock sync.Mutex
 | 
| -
 | 
| -	// boolean 0 or 1, use atomic.*Int32 to access.
 | 
| -	closed int32
 | 
| -	anony  wrapper.AnonymousQueueData
 | 
| -	parent *taskQueueData
 | 
| -}
 | 
| -
 | 
| -var (
 | 
| -	_ = memContextObj((*txnTaskQueueData)(nil))
 | 
| -	_ = wrapper.TQTestable((*txnTaskQueueData)(nil))
 | 
| -)
 | 
| -
 | 
| -func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
 | 
| -
 | 
| -func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
 | 
| -	panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
 | 
| -	return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) endTxn() {
 | 
| -	if atomic.LoadInt32(&t.closed) == 1 {
 | 
| -		panic("cannot end transaction twice")
 | 
| -	}
 | 
| -	atomic.StoreInt32(&t.closed, 1)
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) IsBroken() error {
 | 
| -	// Slightly different from the SDK... datastore and taskqueue each implement
 | 
| -	// this here, where in the SDK only datastore.transaction.Call does.
 | 
| -	if atomic.LoadInt32(&t.closed) == 1 {
 | 
| -		return fmt.Errorf("taskqueue: transaction context has expired")
 | 
| -	}
 | 
| -	return t.parent.IsBroken()
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) ResetTasks() {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -
 | 
| -	for queuename := range t.anony {
 | 
| -		t.anony[queuename] = nil
 | 
| -	}
 | 
| -	t.parent.resetTasksWithLock()
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) Lock() {
 | 
| -	t.lock.Lock()
 | 
| -	t.parent.Lock()
 | 
| -}
 | 
| -func (t *txnTaskQueueData) Unlock() {
 | 
| -	t.parent.Unlock()
 | 
| -	t.lock.Unlock()
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
 | 
| -	t.Lock()
 | 
| -	defer t.Unlock()
 | 
| -
 | 
| -	ret := make(wrapper.AnonymousQueueData, len(t.anony))
 | 
| -	for k, vs := range t.anony {
 | 
| -		ret[k] = make([]*taskqueue.Task, len(vs))
 | 
| -		for i, v := range vs {
 | 
| -			tsk := dupTask(v)
 | 
| -			tsk.Name = ""
 | 
| -			ret[k][i] = tsk
 | 
| -		}
 | 
| -	}
 | 
| -
 | 
| -	return ret
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
 | 
| -	return t.parent.GetTombstonedTasks()
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
 | 
| -	return t.parent.GetScheduledTasks()
 | 
| -}
 | 
| -
 | 
| -func (t *txnTaskQueueData) CreateQueue(queueName string) {
 | 
| -	t.parent.CreateQueue(queueName)
 | 
| -}
 | 
| 
 |