| Index: go/src/infra/gae/libs/wrapper/memory/taskqueue.go
|
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..3e56c4a3b2decc2a0e2075a33a30b58d3dd0b592
|
| --- /dev/null
|
| +++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go
|
| @@ -0,0 +1,299 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package memory
|
| +
|
| +import (
|
| + "fmt"
|
| + "infra/gae/libs/wrapper"
|
| + "math/rand"
|
| + "net/http"
|
| + "regexp"
|
| + "time"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| + "appengine"
|
| + "appengine/taskqueue"
|
| + "appengine_internal"
|
| + dbpb "appengine_internal/datastore"
|
| + pb "appengine_internal/taskqueue"
|
| +)
|
| +
|
| +/////////////////////////////// public functions ///////////////////////////////
|
| +
|
| +// UseTQ adds a wrapper.TaskQueue implementation to context, accessible
|
| +// by wrapper.GetTQ(c)
|
| +func UseTQ(c context.Context) context.Context {
|
| + return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueue {
|
| + tqd := cur(ic).Get("TQ")
|
| + var ret interface {
|
| + wrapper.TQTestable
|
| + wrapper.TaskQueue
|
| + }
|
| + switch x := tqd.(type) {
|
| + case *taskQueueData:
|
| + ret = &taskqueueImpl{
|
| + wrapper.DummyTQ(),
|
| + x,
|
| + curGID(ic).namespace,
|
| + func() time.Time { return wrapper.GetTimeNow(ic) },
|
| + wrapper.GetMathRand(ic),
|
| + }
|
| +
|
| + case *txnTaskQueueData:
|
| + ret = &taskqueueTxnImpl{
|
| + wrapper.DummyTQ(),
|
| + x,
|
| + curGID(ic).namespace,
|
| + func() time.Time { return wrapper.GetTimeNow(ic) },
|
| + wrapper.GetMathRand(ic),
|
| + }
|
| +
|
| + default:
|
| + panic(fmt.Errorf("TQ: bad type: %v", tqd))
|
| + }
|
| + return ret
|
| + })
|
| +}
|
| +
|
| +//////////////////////////////// taskqueueImpl /////////////////////////////////
|
| +
|
| +type taskqueueImpl struct {
|
| + wrapper.TaskQueue
|
| + *taskQueueData
|
| +
|
| + ns string
|
| + timeNow func() time.Time
|
| + mathRand *rand.Rand
|
| +}
|
| +
|
| +func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
|
| + toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow(), t.mathRand)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + if _, ok := t.archived[queueName][toSched.Name]; ok {
|
| + // SDK converts TOMBSTONE -> already added too
|
| + return nil, taskqueue.ErrTaskAlreadyAdded
|
| + } else if _, ok := t.named[queueName][toSched.Name]; ok {
|
| + return nil, taskqueue.ErrTaskAlreadyAdded
|
| + } else {
|
| + t.named[queueName][toSched.Name] = toSched
|
| + }
|
| +
|
| + return dupTask(toSched), nil
|
| +}
|
| +
|
| +func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
|
| + if err := t.IsBroken(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return t.addLocked(task, queueName)
|
| +}
|
| +
|
| +func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) error {
|
| + queueName, err := t.getQueueName(queueName)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + if _, ok := t.archived[queueName][task.Name]; ok {
|
| + return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
|
| + }
|
| +
|
| + if _, ok := t.named[queueName][task.Name]; !ok {
|
| + return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
|
| + }
|
| +
|
| + t.archived[queueName][task.Name] = t.named[queueName][task.Name]
|
| + delete(t.named[queueName], task.Name)
|
| +
|
| + return nil
|
| +}
|
| +
|
| +func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
|
| + if err := t.IsBroken(); err != nil {
|
| + return err
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return t.deleteLocked(task, queueName)
|
| +}
|
| +
|
| +func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) {
|
| + if err := t.IsBroken(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return multi(tasks, queueName, t.addLocked)
|
| +}
|
| +
|
| +func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) error {
|
| + if err := t.IsBroken(); err != nil {
|
| + return err
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + _, err := multi(tasks, queueName,
|
| + func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
|
| + return nil, t.deleteLocked(tsk, qn)
|
| + })
|
| + return err
|
| +}
|
| +
|
| +/////////////////////////////// taskqueueTxnImpl ///////////////////////////////
|
| +
|
| +type taskqueueTxnImpl struct {
|
| + wrapper.TaskQueue
|
| + *txnTaskQueueData
|
| +
|
| + ns string
|
| + timeNow func() time.Time
|
| + mathRand *rand.Rand
|
| +}
|
| +
|
| +func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
|
| + toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.timeNow(), t.mathRand)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + numTasks := 0
|
| + for _, vs := range t.anony {
|
| + numTasks += len(vs)
|
| + }
|
| + if numTasks+1 > 5 {
|
| + // transactional tasks are actually implemented 'for real' as Actions which
|
| + // ride on the datastore. The current datastore implementation only allows
|
| + // a maximum of 5 Actions per transaction, and more than that result in a
|
| + // BAD_REQUEST.
|
| + return nil, newDSError(dbpb.Error_BAD_REQUEST)
|
| + }
|
| +
|
| + t.anony[queueName] = append(t.anony[queueName], toSched)
|
| +
|
| + // the fact that we have generated a unique name for this task queue item is
|
| + // an implementation detail.
|
| + // TODO(riannucci): now that I think about this... it may not actually be true.
|
| + // We should verify that the .Name for a task added in a transaction is
|
| + // meaningless. Maybe names generated in a transaction are somehow
|
| + // guaranteed to be meaningful?
|
| + toRet := dupTask(toSched)
|
| + toRet.Name = ""
|
| +
|
| + return toRet, nil
|
| +}
|
| +
|
| +func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
|
| + if err := t.IsBroken(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return t.addLocked(task, queueName)
|
| +}
|
| +
|
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) {
|
| + if err := t.IsBroken(); err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + return multi(tasks, queueName, t.addLocked)
|
| +}
|
| +
|
| +////////////////////////////// private functions ///////////////////////////////
|
| +
|
| +var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
|
| +
|
| +const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
|
| +
|
| +func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string {
|
| + _, ok := queue[cur]
|
| + for !ok && cur == "" {
|
| + name := [500]byte{}
|
| + for i := 0; i < 500; i++ {
|
| + name[i] = validTaskChars[rnd.Intn(len(validTaskChars))]
|
| + }
|
| + cur = string(name[:])
|
| + _, ok = queue[cur]
|
| + }
|
| + return cur
|
| +}
|
| +
|
| +func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.APIError {
|
| + return &appengine_internal.APIError{Service: "taskqueue", Code: int32(code)}
|
| +}
|
| +
|
| +func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, string) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
|
| + ret := []*taskqueue.Task(nil)
|
| + me := appengine.MultiError(nil)
|
| + foundErr := false
|
| + for _, task := range tasks {
|
| + rt, err := f(task, queueName)
|
| + ret = append(ret, rt)
|
| + me = append(me, err)
|
| + if err != nil {
|
| + foundErr = true
|
| + }
|
| + }
|
| + if !foundErr {
|
| + me = nil
|
| + }
|
| + return ret, me
|
| +}
|
| +
|
| +func dupTask(t *taskqueue.Task) *taskqueue.Task {
|
| + ret := &taskqueue.Task{}
|
| + *ret = *t
|
| +
|
| + if t.Header != nil {
|
| + ret.Header = make(http.Header, len(t.Header))
|
| + for k, vs := range t.Header {
|
| + newVs := make([]string, len(vs))
|
| + copy(newVs, vs)
|
| + ret.Header[k] = newVs
|
| + }
|
| + }
|
| +
|
| + if t.Payload != nil {
|
| + ret.Payload = make([]byte, len(t.Payload))
|
| + copy(ret.Payload, t.Payload)
|
| + }
|
| +
|
| + if t.RetryOptions != nil {
|
| + ret.RetryOptions = &taskqueue.RetryOptions{}
|
| + *ret.RetryOptions = *t.RetryOptions
|
| + }
|
| +
|
| + return ret
|
| +}
|
| +
|
| +func dupQueue(q wrapper.QueueData) wrapper.QueueData {
|
| + r := make(wrapper.QueueData, len(q))
|
| + for k, q := range q {
|
| + r[k] = make(map[string]*taskqueue.Task, len(q))
|
| + for tn, t := range q {
|
| + r[k][tn] = dupTask(t)
|
| + }
|
| + }
|
| + return r
|
| +}
|
|
|