Chromium Code Reviews| Index: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..fd21ad97c7b2940ee805e27976d032f8948d2dce |
| --- /dev/null |
| +++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
| @@ -0,0 +1,269 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package memory |
| + |
| +import ( |
| + "errors" |
| + "fmt" |
| + "math/rand" |
| + "net/http" |
| + "sync" |
| + "sync/atomic" |
| + "time" |
| + |
| + "appengine/datastore" |
| + "appengine/taskqueue" |
| + pb "appengine_internal/taskqueue" |
| + |
| + "infra/gae/libs/wrapper" |
| +) |
| + |
| +var ( |
| + currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") |
| + defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace") |
| +) |
| + |
| +//////////////////////////////// taskQueueData ///////////////////////////////// |
| + |
| +type taskQueueData struct { |
| + sync.Mutex |
| + *wrapper.BrokenFeatures |
| + |
| + named wrapper.QueueData |
| + archived wrapper.QueueData |
| +} |
| + |
| +////////////////////////////// New(taskQueueData) ////////////////////////////// |
| + |
| +func newTaskQueueData() memContextObj { |
| + return &taskQueueData{ |
| + BrokenFeatures: wrapper.NewBrokenFeatures( |
| + newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)), |
| + named: wrapper.QueueData{"default": {}}, |
| + archived: wrapper.QueueData{"default": {}}, |
| + } |
| +} |
| + |
| +///////////////////////// memContextObj(taskQueueData) ///////////////////////// |
| + |
| +func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } |
| +func (t *taskQueueData) endTxn() {} |
| +func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) { |
| + txn := obj.(*txnTaskQueueData) |
| + for qn, tasks := range txn.anony { |
| + for _, tsk := range tasks { |
| + tsk.Name = mkName(rnd, tsk.Name, t.named[qn]) |
| + t.named[qn][tsk.Name] = tsk |
| + } |
| + } |
| + txn.anony = nil |
| +} |
| +func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
| + return &txnTaskQueueData{ |
| + BrokenFeatures: t.BrokenFeatures, |
| + parent: t, |
| + anony: wrapper.AnonymousQueueData{}, |
| + }, nil |
| +} |
| + |
| +////////////////////// wrapper.TQTestable(taskQueueData) /////////////////////// |
| + |
| +func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
| + return nil |
| +} |
| + |
| +func (t *taskQueueData) AddQueue(queueName string) { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + if _, ok := t.named[queueName]; ok { |
| + panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName)) |
| + } |
| + t.named[queueName] = map[string]*taskqueue.Task{} |
| + t.archived[queueName] = map[string]*taskqueue.Task{} |
| +} |
| + |
| +func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + return dupQueue(t.named) |
| +} |
| + |
| +func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + return dupQueue(t.archived) |
| +} |
| + |
| +func (t *taskQueueData) resetTasksWithLock() { |
| + for queuename := range t.named { |
| + t.named[queuename] = map[string]*taskqueue.Task{} |
| + t.archived[queuename] = map[string]*taskqueue.Task{} |
| + } |
| +} |
| + |
| +func (t *taskQueueData) ResetTasks() { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + t.resetTasksWithLock() |
| +} |
| + |
| +/////////////////////////// taskQueueData (private) //////////////////////////// |
| + |
| +func (t *taskQueueData) getQueueName(queueName string) (string, error) { |
| + if queueName == "" { |
| + queueName = "default" |
| + } |
| + if _, ok := t.named[queueName]; !ok { |
| + return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) |
| + } |
| + return queueName, nil |
| +} |
| + |
| +func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName string, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) { |
| + queueName, err := t.getQueueName(queueName) |
| + if err != nil { |
| + return nil, "", err |
| + } |
| + |
| + toSched := dupTask(task) |
| + |
| + if toSched.Path == "" { |
| + return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) |
| + } |
| + |
| + if toSched.ETA.IsZero() { |
| + toSched.ETA = now.Add(toSched.Delay) |
| + } else if toSched.Delay != 0 { |
| + panic("taskqueue: both Delay and ETA are set") |
| + } |
| + toSched.Delay = 0 |
| + |
| + if toSched.Method == "" { |
| + toSched.Method = "POST" |
| + } |
| + if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok { |
| + return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method) |
| + } |
| + if toSched.Method != "POST" && toSched.Method != "PUT" { |
| + toSched.Payload = nil |
| + } |
| + |
| + if _, ok := toSched.Header[currentNamespace]; !ok { |
| + if ns != "" { |
| + if toSched.Header == nil { |
| + toSched.Header = http.Header{} |
| + } |
| + toSched.Header[currentNamespace] = []string{ns} |
| + } |
| + } |
| + // TODO(riannucci): implement DefaultNamespace |
| + |
| + if toSched.Name == "" { |
| + toSched.Name = mkName(rnd, "", t.named[queueName]) |
| + } else { |
| + if !validTaskName.MatchString(toSched.Name) { |
| + return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME) |
| + } |
| + } |
| + |
| + return toSched, queueName, nil |
| +} |
| + |
| +/////////////////////////////// txnTaskQueueData /////////////////////////////// |
| + |
| +type txnTaskQueueData struct { |
| + *wrapper.BrokenFeatures |
| + |
| + lock sync.Mutex |
| + |
| + // boolean 0 or 1, use atomic.*Int32 to access. |
| + closed int32 |
| + anony wrapper.AnonymousQueueData |
| + parent *taskQueueData |
| +} |
| + |
| +/////////////////////// memContextObj(txnTaskQueueData) //////////////////////// |
| + |
| +func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } |
| + |
| +func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) { |
| + panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) |
| +} |
| + |
| +func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
| + return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") |
| +} |
| + |
| +func (t *txnTaskQueueData) endTxn() { |
| + if atomic.LoadInt32(&t.closed) == 1 { |
| + panic("cannot end transaction twice") |
| + } |
| + atomic.StoreInt32(&t.closed, 1) |
| +} |
| + |
| +/////////////////// wrapper.BrokenFeatures(txnTaskQueueData) /////////////////// |
| + |
| +func (t *txnTaskQueueData) IsBroken() error { |
| + // Slightly different from the SDK... datastore and taskqueue each implement |
| + // this here, where in the SDK only datastore.transaction.Call does. |
| + if atomic.LoadInt32(&t.closed) == 1 { |
| + return fmt.Errorf("taskqueue: transaction context has expired") |
| + } |
| + return t.parent.IsBroken() |
| +} |
| + |
| +///////////////////// wrapper.TQTestable(txnTaskQueueData) ///////////////////// |
| + |
| +func (t *txnTaskQueueData) ResetTasks() { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + for queuename := range t.anony { |
| + t.anony[queuename] = []*taskqueue.Task{} |
|
M-A Ruel
2015/05/25 18:21:10
you could set it to nil and it'd be fine
iannucci
2015/05/27 19:33:32
done
|
| + } |
| + t.parent.resetTasksWithLock() |
| +} |
| + |
| +func (t *txnTaskQueueData) Lock() { |
| + t.lock.Lock() |
| + t.parent.Lock() |
| +} |
| +func (t *txnTaskQueueData) Unlock() { |
| + t.parent.Unlock() |
| + t.lock.Unlock() |
| +} |
| + |
| +func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + ret := make(wrapper.AnonymousQueueData, len(t.anony)) |
| + for k, vs := range t.anony { |
| + ret[k] = make([]*taskqueue.Task, len(vs)) |
| + for i, v := range vs { |
| + tsk := dupTask(v) |
| + tsk.Name = "" |
| + ret[k][i] = tsk |
| + } |
| + } |
| + |
| + return ret |
| +} |
| + |
| +func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData { |
| + return t.parent.GetTombstonedTasks() |
| +} |
| + |
| +func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { |
| + return t.parent.GetScheduledTasks() |
| +} |
| + |
| +func (t *txnTaskQueueData) AddQueue(queueName string) { |
| + t.parent.AddQueue(queueName) |
| +} |