Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Unified Diff: go/src/infra/gae/libs/gae/memory/taskqueue.go

Issue 1240573002: Reland: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: expand coverage range to fit 32bit test expectations Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/gae/libs/gae/memory/taskqueue.go
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue.go b/go/src/infra/gae/libs/gae/memory/taskqueue.go
similarity index 53%
rename from go/src/infra/gae/libs/wrapper/memory/taskqueue.go
rename to go/src/infra/gae/libs/gae/memory/taskqueue.go
index 4d05752a7235fd0e8ae12166ee712628c91e646e..b0e04b201d5b73a179d3df15f8cdf7e7538ee835 100644
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue.go
+++ b/go/src/infra/gae/libs/gae/memory/taskqueue.go
@@ -5,33 +5,29 @@
package memory
import (
+ "errors"
"fmt"
- "infra/gae/libs/wrapper"
"net/http"
"regexp"
"golang.org/x/net/context"
- "appengine"
- "appengine/taskqueue"
- "appengine_internal"
- dbpb "appengine_internal/datastore"
- pb "appengine_internal/taskqueue"
+ "infra/gae/libs/gae"
)
/////////////////////////////// public functions ///////////////////////////////
func useTQ(c context.Context) context.Context {
- return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueue {
+ return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue {
tqd := cur(ic).Get(memContextTQIdx)
var ret interface {
- wrapper.TQTestable
- wrapper.TaskQueue
+ gae.TQTestable
+ gae.TaskQueue
}
switch x := tqd.(type) {
case *taskQueueData:
ret = &taskqueueImpl{
- wrapper.DummyTQ(),
+ gae.DummyTQ(),
x,
ic,
curGID(ic).namespace,
@@ -39,7 +35,7 @@ func useTQ(c context.Context) context.Context {
case *txnTaskQueueData:
ret = &taskqueueTxnImpl{
- wrapper.DummyTQ(),
+ gae.DummyTQ(),
x,
ic,
curGID(ic).namespace,
@@ -55,7 +51,7 @@ func useTQ(c context.Context) context.Context {
//////////////////////////////// taskqueueImpl /////////////////////////////////
type taskqueueImpl struct {
- wrapper.TaskQueue
+ gae.TaskQueue
*taskQueueData
ctx context.Context
@@ -63,11 +59,11 @@ type taskqueueImpl struct {
}
var (
- _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
- _ = wrapper.TQTestable((*taskqueueImpl)(nil))
+ _ = gae.TaskQueue((*taskqueueImpl)(nil))
+ _ = gae.TQTestable((*taskqueueImpl)(nil))
)
-func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
+func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
if err != nil {
return nil, err
@@ -75,9 +71,9 @@ func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
if _, ok := t.archived[queueName][toSched.Name]; ok {
// SDK converts TOMBSTONE -> already added too
- return nil, taskqueue.ErrTaskAlreadyAdded
+ return nil, gae.ErrTQTaskAlreadyAdded
} else if _, ok := t.named[queueName][toSched.Name]; ok {
- return nil, taskqueue.ErrTaskAlreadyAdded
+ return nil, gae.ErrTQTaskAlreadyAdded
} else {
t.named[queueName][toSched.Name] = toSched
}
@@ -85,29 +81,28 @@ func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
return dupTask(toSched), nil
}
-func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
- if err := t.IsBroken(); err != nil {
- return nil, err
- }
-
- t.Lock()
- defer t.Unlock()
-
- return t.addLocked(task, queueName)
+func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
+ err = t.RunIfNotBroken(func() (err error) {
+ t.Lock()
+ defer t.Unlock()
+ retTask, err = t.addLocked(task, queueName)
+ return
+ })
+ return
}
-func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) error {
+func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
queueName, err := t.getQueueName(queueName)
if err != nil {
return err
}
if _, ok := t.archived[queueName][task.Name]; ok {
- return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
+ return errors.New("TOMBSTONED_TASK")
}
if _, ok := t.named[queueName][task.Name]; !ok {
- return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
+ return errors.New("UNKNOWN_TASK")
}
t.archived[queueName][task.Name] = t.named[queueName][task.Name]
@@ -116,47 +111,41 @@ func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err
return nil
}
-func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
- if err := t.IsBroken(); err != nil {
- return err
- }
-
- t.Lock()
- defer t.Unlock()
-
- return t.deleteLocked(task, queueName)
+func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error {
+ return t.RunIfNotBroken(func() error {
+ t.Lock()
+ defer t.Unlock()
+ return t.deleteLocked(task, queueName)
+ })
}
-func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) {
- if err := t.IsBroken(); err != nil {
- return nil, err
- }
-
- t.Lock()
- defer t.Unlock()
-
- return multi(tasks, queueName, t.addLocked)
+func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
+ err = t.RunIfNotBroken(func() (err error) {
+ t.Lock()
+ defer t.Unlock()
+ retTasks, err = multi(tasks, queueName, t.addLocked)
+ return
+ })
+ return
}
-func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) error {
- if err := t.IsBroken(); err != nil {
- return err
- }
-
- t.Lock()
- defer t.Unlock()
+func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error {
+ return t.RunIfNotBroken(func() error {
+ t.Lock()
+ defer t.Unlock()
- _, err := multi(tasks, queueName,
- func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
- return nil, t.deleteLocked(tsk, qn)
- })
- return err
+ _, err := multi(tasks, queueName,
+ func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) {
+ return nil, t.deleteLocked(tsk, qn)
+ })
+ return err
+ })
}
/////////////////////////////// taskqueueTxnImpl ///////////////////////////////
type taskqueueTxnImpl struct {
- wrapper.TaskQueue
+ gae.TaskQueue
*txnTaskQueueData
ctx context.Context
@@ -164,11 +153,11 @@ type taskqueueTxnImpl struct {
}
var (
- _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
- _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
+ _ = gae.TaskQueue((*taskqueueTxnImpl)(nil))
+ _ = gae.TQTestable((*taskqueueTxnImpl)(nil))
)
-func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
+func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
if err != nil {
return nil, err
@@ -183,7 +172,7 @@ func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
// ride on the datastore. The current datastore implementation only allows
// a maximum of 5 Actions per transaction, and more than that result in a
// BAD_REQUEST.
- return nil, newDSError(dbpb.Error_BAD_REQUEST)
+ return nil, errors.New("BAD_REQUEST")
}
t.anony[queueName] = append(t.anony[queueName], toSched)
@@ -200,26 +189,24 @@ func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
return toRet, nil
}
-func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
- if err := t.IsBroken(); err != nil {
- return nil, err
- }
-
- t.Lock()
- defer t.Unlock()
-
- return t.addLocked(task, queueName)
+func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
+ err = t.RunIfNotBroken(func() (err error) {
+ t.Lock()
+ defer t.Unlock()
+ retTask, err = t.addLocked(task, queueName)
+ return
+ })
+ return
}
-func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) {
- if err := t.IsBroken(); err != nil {
- return nil, err
- }
-
- t.Lock()
- defer t.Unlock()
-
- return multi(tasks, queueName, t.addLocked)
+func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
+ err = t.RunIfNotBroken(func() (err error) {
+ t.Lock()
+ defer t.Unlock()
+ retTasks, err = multi(tasks, queueName, t.addLocked)
+ return
+ })
+ return
}
////////////////////////////// private functions ///////////////////////////////
@@ -228,12 +215,12 @@ var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
-func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) string {
+func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string {
_, ok := queue[cur]
for !ok && cur == "" {
name := [500]byte{}
for i := 0; i < 500; i++ {
- name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len(validTaskChars))]
+ name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(validTaskChars))]
}
cur = string(name[:])
_, ok = queue[cur]
@@ -241,13 +228,9 @@ func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str
return cur
}
-func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.APIError {
- return &appengine_internal.APIError{Service: "taskqueue", Code: int32(code)}
-}
-
-func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, string) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
- ret := []*taskqueue.Task(nil)
- me := appengine.MultiError(nil)
+func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) {
+ ret := []*gae.TQTask(nil)
+ me := gae.MultiError(nil)
foundErr := false
for _, task := range tasks {
rt, err := f(task, queueName)
@@ -263,8 +246,8 @@ func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st
return ret, me
}
-func dupTask(t *taskqueue.Task) *taskqueue.Task {
- ret := &taskqueue.Task{}
+func dupTask(t *gae.TQTask) *gae.TQTask {
+ ret := &gae.TQTask{}
*ret = *t
if t.Header != nil {
@@ -282,17 +265,17 @@ func dupTask(t *taskqueue.Task) *taskqueue.Task {
}
if t.RetryOptions != nil {
- ret.RetryOptions = &taskqueue.RetryOptions{}
+ ret.RetryOptions = &gae.TQRetryOptions{}
*ret.RetryOptions = *t.RetryOptions
}
return ret
}
-func dupQueue(q wrapper.QueueData) wrapper.QueueData {
- r := make(wrapper.QueueData, len(q))
+func dupQueue(q gae.QueueData) gae.QueueData {
+ r := make(gae.QueueData, len(q))
for k, q := range q {
- r[k] = make(map[string]*taskqueue.Task, len(q))
+ r[k] = make(map[string]*gae.TQTask, len(q))
for tn, t := range q {
r[k][tn] = dupTask(t)
}
« no previous file with comments | « go/src/infra/gae/libs/gae/memory/raw_datstore_test.go ('k') | go/src/infra/gae/libs/gae/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698