Chromium Code Reviews| Index: go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..89d39701c915ede0bb4619cf959c4bc648c87778 |
| --- /dev/null |
| +++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
| @@ -0,0 +1,302 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package memory |
| + |
| +import ( |
| + "fmt" |
| + "golang.org/x/net/context" |
| + "math/rand" |
| + "net/http" |
| + "regexp" |
| + "time" |
| + |
| + "appengine" |
| + "appengine/taskqueue" |
| + "appengine_internal" |
| + dbpb "appengine_internal/datastore" |
| + pb "appengine_internal/taskqueue" |
| + |
| + "infra/gae/libs/wrapper" |
| +) |
| + |
| +/////////////////////////////// public functions /////////////////////////////// |
| + |
| +// UseTQ adds a wrapper.TaskQueue implementation to context, accessible |
| +// by wrapper.GetTQ(c) |
| +func UseTQ(c context.Context) context.Context { |
| + return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueue { |
| + tqd := cur(ic).Get("TQ") |
| + var ret interface { |
| + wrapper.TQTestable |
| + wrapper.TaskQueue |
| + } |
| + switch x := tqd.(type) { |
| + case *taskQueueData: |
| + ret = &taskqueueImpl{ |
| + wrapper.DummyTQ(), |
| + x, |
| + curGID(ic).namespace, |
| + func() time.Time { return wrapper.GetTimeNow(ic) }, |
| + wrapper.GetMathRand(ic), |
| + } |
| + |
| + case *txnTaskQueueData: |
| + ret = &taskqueueTxnImpl{ |
| + wrapper.DummyTQ(), |
| + x, |
| + curGID(ic).namespace, |
| + func() time.Time { return wrapper.GetTimeNow(ic) }, |
| + wrapper.GetMathRand(ic), |
| + } |
| + |
| + default: |
| + panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
| + } |
| + return ret |
| + }) |
| +} |
| + |
| +//////////////////////////////// taskqueueImpl ///////////////////////////////// |
| + |
| +type taskqueueImpl struct { |
| + wrapper.TaskQueue |
| + *taskQueueData |
| + |
| + ns string |
| + timeNow func() time.Time |
| + mathRand *rand.Rand |
| +} |
| + |
| +func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
| + toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow(), t.mathRand) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + if _, ok := t.archived[queueName][toSched.Name]; ok { |
| + // SDK converts TOMBSTONE -> already added too |
| + return nil, taskqueue.ErrTaskAlreadyAdded |
| + } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| + return nil, taskqueue.ErrTaskAlreadyAdded |
| + } else { |
| + t.named[queueName][toSched.Name] = toSched |
| + } |
| + |
| + return dupTask(toSched), nil |
| +} |
| + |
| +func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
| + err := t.IsBroken() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + return t.addLocked(task, queueName) |
| +} |
| + |
| +func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) error { |
| + queueName, err := t.getQueueName(queueName) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + if _, ok := t.archived[queueName][task.Name]; ok { |
| + return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) |
| + } |
| + |
| + if _, ok := t.named[queueName][task.Name]; !ok { |
| + return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) |
| + } |
| + |
| + t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
| + delete(t.named[queueName], task.Name) |
| + |
| + return nil |
| +} |
| + |
| +func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { |
| + err := t.IsBroken() |
| + if err != nil { |
| + return err |
| + } |
| + |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + return t.deleteLocked(task, queueName) |
| +} |
| + |
| +func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) { |
| + err := t.IsBroken() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + return multi(tasks, queueName, t.addLocked) |
| +} |
| + |
| +func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) error { |
| + err := t.IsBroken() |
| + if err != nil { |
| + return err |
| + } |
| + |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + _, err = multi(tasks, queueName, |
| + func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { |
| + return nil, t.deleteLocked(tsk, qn) |
| + }) |
| + return err |
| +} |
| + |
| +/////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| + |
| +type taskqueueTxnImpl struct { |
| + wrapper.TaskQueue |
| + *txnTaskQueueData |
| + |
| + ns string |
| + timeNow func() time.Time |
| + mathRand *rand.Rand |
|
M-A Ruel
2015/05/25 18:21:10
Why not always create its own rand with Seet(0) ?
iannucci
2015/05/27 19:33:32
So that the test writer can control it
|
| +} |
| + |
| +func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
| + toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.timeNow(), t.mathRand) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + numTasks := 0 |
| + for _, vs := range t.anony { |
| + numTasks += len(vs) |
| + } |
| + if numTasks+1 > 5 { |
| + // transactional tasks are actually implemented 'for real' as Actions which |
| + // ride on the datastore. The current datastore implementation only allows |
| + // a maximum of 5 Actions per transaction, and more than that result in a |
| + // BAD_REQUEST. |
| + return nil, newDSError(dbpb.Error_BAD_REQUEST) |
| + } |
| + |
| + t.anony[queueName] = append(t.anony[queueName], toSched) |
| + |
| + // the fact that we have generated a unique name for this task queue item is |
| + // an implementation detail. |
| + // TODO(riannucci): now that I think about this... it may not actually be true. |
| + // We should verify that the .Name for a task added in a transaction is |
| + // meaningless. Maybe names generated in a transaction are somehow |
| + // guaranteed to be meaningful? |
| + toRet := dupTask(toSched) |
| + toRet.Name = "" |
| + |
| + return toRet, nil |
| +} |
| + |
| +func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
| + err := t.IsBroken() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + return t.addLocked(task, queueName) |
| +} |
| + |
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) { |
| + err := t.IsBroken() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + return multi(tasks, queueName, t.addLocked) |
| +} |
| + |
| +////////////////////////////// private functions /////////////////////////////// |
| + |
| +var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| + |
| +const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_" |
| + |
| +func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string { |
| + _, ok := queue[cur] |
| + for !ok && cur == "" { |
| + name := [500]byte{} |
| + for i := 0; i < 500; i++ { |
|
M-A Ruel
2015/05/25 18:21:10
Always 500 is excessive IMHO
iannucci
2015/05/27 19:33:32
That's the actual name limit.
|
| + name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] |
| + } |
| + cur = string(name[:]) |
| + _, ok = queue[cur] |
| + } |
| + return cur |
| +} |
| + |
| +func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.APIError { |
| + return &appengine_internal.APIError{Service: "taskqueue", Code: int32(code)} |
| +} |
| + |
| +func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, string) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { |
| + ret := []*taskqueue.Task(nil) |
| + me := appengine.MultiError(nil) |
| + foundErr := false |
| + for _, task := range tasks { |
| + rt, err := f(task, queueName) |
| + ret = append(ret, rt) |
| + me = append(me, err) |
| + if err != nil { |
| + foundErr = true |
| + } |
| + } |
| + if !foundErr { |
| + me = nil |
| + } |
| + return ret, me |
| +} |
| + |
| +func dupTask(t *taskqueue.Task) *taskqueue.Task { |
| + ret := &taskqueue.Task{} |
| + *ret = *t |
| + |
| + if t.Header != nil { |
| + ret.Header = make(http.Header, len(t.Header)) |
| + for k, vs := range t.Header { |
| + newVs := make([]string, len(vs)) |
| + copy(newVs, vs) |
| + ret.Header[k] = newVs |
| + } |
| + } |
| + |
| + if t.Payload != nil { |
| + ret.Payload = make([]byte, len(t.Payload)) |
| + copy(ret.Payload, t.Payload) |
| + } |
| + |
| + if t.RetryOptions != nil { |
| + ret.RetryOptions = &taskqueue.RetryOptions{} |
| + *ret.RetryOptions = *t.RetryOptions |
| + } |
| + |
| + return ret |
| +} |
| + |
| +func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
| + r := make(wrapper.QueueData, len(q)) |
| + for k, q := range q { |
| + r[k] = make(map[string]*taskqueue.Task, len(q)) |
| + for tn, t := range q { |
| + r[k][tn] = dupTask(t) |
| + } |
| + } |
| + return r |
| +} |