| Index: go/src/infra/gae/libs/wrapper/memory/taskqueueData.go
|
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueueData.go b/go/src/infra/gae/libs/wrapper/memory/taskqueueData.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fd21ad97c7b2940ee805e27976d032f8948d2dce
|
| --- /dev/null
|
| +++ b/go/src/infra/gae/libs/wrapper/memory/taskqueueData.go
|
| @@ -0,0 +1,269 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package memory
|
| +
|
| +import (
|
| + "errors"
|
| + "fmt"
|
| + "math/rand"
|
| + "net/http"
|
| + "sync"
|
| + "sync/atomic"
|
| + "time"
|
| +
|
| + "appengine/datastore"
|
| + "appengine/taskqueue"
|
| + pb "appengine_internal/taskqueue"
|
| +
|
| + "infra/gae/libs/wrapper"
|
| +)
|
| +
|
| +var (
|
| + currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
|
| + defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
|
| +)
|
| +
|
| +//////////////////////////////// taskQueueData /////////////////////////////////
|
| +
|
| +type taskQueueData struct {
|
| + sync.Mutex
|
| + *wrapper.BrokenFeatures
|
| +
|
| + named wrapper.QueueData
|
| + archived wrapper.QueueData
|
| +}
|
| +
|
| +////////////////////////////// New(taskQueueData) //////////////////////////////
|
| +
|
| +func newTaskQueueData() memContextObj {
|
| + return &taskQueueData{
|
| + BrokenFeatures: wrapper.NewBrokenFeatures(
|
| + newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)),
|
| + named: wrapper.QueueData{"default": {}},
|
| + archived: wrapper.QueueData{"default": {}},
|
| + }
|
| +}
|
| +
|
| +///////////////////////// memContextObj(taskQueueData) /////////////////////////
|
| +
|
| +func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
|
| +func (t *taskQueueData) endTxn() {}
|
| +func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) {
|
| + txn := obj.(*txnTaskQueueData)
|
| + for qn, tasks := range txn.anony {
|
| + for _, tsk := range tasks {
|
| + tsk.Name = mkName(rnd, tsk.Name, t.named[qn])
|
| + t.named[qn][tsk.Name] = tsk
|
| + }
|
| + }
|
| + txn.anony = nil
|
| +}
|
| +func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
|
| + return &txnTaskQueueData{
|
| + BrokenFeatures: t.BrokenFeatures,
|
| + parent: t,
|
| + anony: wrapper.AnonymousQueueData{},
|
| + }, nil
|
| +}
|
| +
|
| +////////////////////// wrapper.TQTestable(taskQueueData) ///////////////////////
|
| +
|
| +func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
|
| + return nil
|
| +}
|
| +
|
| +func (t *taskQueueData) AddQueue(queueName string) {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + if _, ok := t.named[queueName]; ok {
|
| + panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
|
| + }
|
| + t.named[queueName] = map[string]*taskqueue.Task{}
|
| + t.archived[queueName] = map[string]*taskqueue.Task{}
|
| +}
|
| +
|
| +func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return dupQueue(t.named)
|
| +}
|
| +
|
| +func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return dupQueue(t.archived)
|
| +}
|
| +
|
| +func (t *taskQueueData) resetTasksWithLock() {
|
| + for queuename := range t.named {
|
| + t.named[queuename] = map[string]*taskqueue.Task{}
|
| + t.archived[queuename] = map[string]*taskqueue.Task{}
|
| + }
|
| +}
|
| +
|
| +func (t *taskQueueData) ResetTasks() {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + t.resetTasksWithLock()
|
| +}
|
| +
|
| +/////////////////////////// taskQueueData (private) ////////////////////////////
|
| +
|
| +func (t *taskQueueData) getQueueName(queueName string) (string, error) {
|
| + if queueName == "" {
|
| + queueName = "default"
|
| + }
|
| + if _, ok := t.named[queueName]; !ok {
|
| + return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
|
| + }
|
| + return queueName, nil
|
| +}
|
| +
|
| +func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName string, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) {
|
| + queueName, err := t.getQueueName(queueName)
|
| + if err != nil {
|
| + return nil, "", err
|
| + }
|
| +
|
| + toSched := dupTask(task)
|
| +
|
| + if toSched.Path == "" {
|
| + return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
|
| + }
|
| +
|
| + if toSched.ETA.IsZero() {
|
| + toSched.ETA = now.Add(toSched.Delay)
|
| + } else if toSched.Delay != 0 {
|
| + panic("taskqueue: both Delay and ETA are set")
|
| + }
|
| + toSched.Delay = 0
|
| +
|
| + if toSched.Method == "" {
|
| + toSched.Method = "POST"
|
| + }
|
| + if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
|
| + return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method)
|
| + }
|
| + if toSched.Method != "POST" && toSched.Method != "PUT" {
|
| + toSched.Payload = nil
|
| + }
|
| +
|
| + if _, ok := toSched.Header[currentNamespace]; !ok {
|
| + if ns != "" {
|
| + if toSched.Header == nil {
|
| + toSched.Header = http.Header{}
|
| + }
|
| + toSched.Header[currentNamespace] = []string{ns}
|
| + }
|
| + }
|
| + // TODO(riannucci): implement DefaultNamespace
|
| +
|
| + if toSched.Name == "" {
|
| + toSched.Name = mkName(rnd, "", t.named[queueName])
|
| + } else {
|
| + if !validTaskName.MatchString(toSched.Name) {
|
| + return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME)
|
| + }
|
| + }
|
| +
|
| + return toSched, queueName, nil
|
| +}
|
| +
|
| +/////////////////////////////// txnTaskQueueData ///////////////////////////////
|
| +
|
| +type txnTaskQueueData struct {
|
| + *wrapper.BrokenFeatures
|
| +
|
| + lock sync.Mutex
|
| +
|
| + // boolean 0 or 1, use atomic.*Int32 to access.
|
| + closed int32
|
| + anony wrapper.AnonymousQueueData
|
| + parent *taskQueueData
|
| +}
|
| +
|
| +/////////////////////// memContextObj(txnTaskQueueData) ////////////////////////
|
| +
|
| +func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
|
| +
|
| +func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) {
|
| + panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
|
| + return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) endTxn() {
|
| + if atomic.LoadInt32(&t.closed) == 1 {
|
| + panic("cannot end transaction twice")
|
| + }
|
| + atomic.StoreInt32(&t.closed, 1)
|
| +}
|
| +
|
| +/////////////////// wrapper.BrokenFeatures(txnTaskQueueData) ///////////////////
|
| +
|
| +func (t *txnTaskQueueData) IsBroken() error {
|
| + // Slightly different from the SDK... datastore and taskqueue each implement
|
| + // this here, where in the SDK only datastore.transaction.Call does.
|
| + if atomic.LoadInt32(&t.closed) == 1 {
|
| + return fmt.Errorf("taskqueue: transaction context has expired")
|
| + }
|
| + return t.parent.IsBroken()
|
| +}
|
| +
|
| +///////////////////// wrapper.TQTestable(txnTaskQueueData) /////////////////////
|
| +
|
| +func (t *txnTaskQueueData) ResetTasks() {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + for queuename := range t.anony {
|
| + t.anony[queuename] = []*taskqueue.Task{}
|
| + }
|
| + t.parent.resetTasksWithLock()
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) Lock() {
|
| + t.lock.Lock()
|
| + t.parent.Lock()
|
| +}
|
| +func (t *txnTaskQueueData) Unlock() {
|
| + t.parent.Unlock()
|
| + t.lock.Unlock()
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + ret := make(wrapper.AnonymousQueueData, len(t.anony))
|
| + for k, vs := range t.anony {
|
| + ret[k] = make([]*taskqueue.Task, len(vs))
|
| + for i, v := range vs {
|
| + tsk := dupTask(v)
|
| + tsk.Name = ""
|
| + ret[k][i] = tsk
|
| + }
|
| + }
|
| +
|
| + return ret
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
|
| + return t.parent.GetTombstonedTasks()
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
|
| + return t.parent.GetScheduledTasks()
|
| +}
|
| +
|
| +func (t *txnTaskQueueData) AddQueue(queueName string) {
|
| + t.parent.AddQueue(queueName)
|
| +}
|
|
|