Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Unified Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1230303003: Revert "Refactor current GAE abstraction library to be free of the SDK*" (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/gae/libs/wrapper/memory/taskqueue.go
diff --git a/go/src/infra/gae/libs/gae/memory/taskqueue.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go
similarity index 53%
rename from go/src/infra/gae/libs/gae/memory/taskqueue.go
rename to go/src/infra/gae/libs/wrapper/memory/taskqueue.go
index b0e04b201d5b73a179d3df15f8cdf7e7538ee835..4d05752a7235fd0e8ae12166ee712628c91e646e 100644
--- a/go/src/infra/gae/libs/gae/memory/taskqueue.go
+++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go
@@ -5,29 +5,33 @@
package memory
import (
- "errors"
"fmt"
+ "infra/gae/libs/wrapper"
"net/http"
"regexp"
"golang.org/x/net/context"
- "infra/gae/libs/gae"
+ "appengine"
+ "appengine/taskqueue"
+ "appengine_internal"
+ dbpb "appengine_internal/datastore"
+ pb "appengine_internal/taskqueue"
)
/////////////////////////////// public functions ///////////////////////////////
func useTQ(c context.Context) context.Context {
- return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue {
+ return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueue {
tqd := cur(ic).Get(memContextTQIdx)
var ret interface {
- gae.TQTestable
- gae.TaskQueue
+ wrapper.TQTestable
+ wrapper.TaskQueue
}
switch x := tqd.(type) {
case *taskQueueData:
ret = &taskqueueImpl{
- gae.DummyTQ(),
+ wrapper.DummyTQ(),
x,
ic,
curGID(ic).namespace,
@@ -35,7 +39,7 @@ func useTQ(c context.Context) context.Context {
case *txnTaskQueueData:
ret = &taskqueueTxnImpl{
- gae.DummyTQ(),
+ wrapper.DummyTQ(),
x,
ic,
curGID(ic).namespace,
@@ -51,7 +55,7 @@ func useTQ(c context.Context) context.Context {
//////////////////////////////// taskqueueImpl /////////////////////////////////
type taskqueueImpl struct {
- gae.TaskQueue
+ wrapper.TaskQueue
*taskQueueData
ctx context.Context
@@ -59,11 +63,11 @@ type taskqueueImpl struct {
}
var (
- _ = gae.TaskQueue((*taskqueueImpl)(nil))
- _ = gae.TQTestable((*taskqueueImpl)(nil))
+ _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
+ _ = wrapper.TQTestable((*taskqueueImpl)(nil))
)
-func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
+func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
if err != nil {
return nil, err
@@ -71,9 +75,9 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
if _, ok := t.archived[queueName][toSched.Name]; ok {
// SDK converts TOMBSTONE -> already added too
- return nil, gae.ErrTQTaskAlreadyAdded
+ return nil, taskqueue.ErrTaskAlreadyAdded
} else if _, ok := t.named[queueName][toSched.Name]; ok {
- return nil, gae.ErrTQTaskAlreadyAdded
+ return nil, taskqueue.ErrTaskAlreadyAdded
} else {
t.named[queueName][toSched.Name] = toSched
}
@@ -81,28 +85,29 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
return dupTask(toSched), nil
}
-func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
- err = t.RunIfNotBroken(func() (err error) {
- t.Lock()
- defer t.Unlock()
- retTask, err = t.addLocked(task, queueName)
- return
- })
- return
+func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
+ if err := t.IsBroken(); err != nil {
+ return nil, err
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ return t.addLocked(task, queueName)
}
-func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
+func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) error {
queueName, err := t.getQueueName(queueName)
if err != nil {
return err
}
if _, ok := t.archived[queueName][task.Name]; ok {
- return errors.New("TOMBSTONED_TASK")
+ return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
}
if _, ok := t.named[queueName][task.Name]; !ok {
- return errors.New("UNKNOWN_TASK")
+ return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
}
t.archived[queueName][task.Name] = t.named[queueName][task.Name]
@@ -111,41 +116,47 @@ func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
return nil
}
-func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error {
- return t.RunIfNotBroken(func() error {
- t.Lock()
- defer t.Unlock()
- return t.deleteLocked(task, queueName)
- })
-}
+func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
+ if err := t.IsBroken(); err != nil {
+ return err
+ }
-func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
- err = t.RunIfNotBroken(func() (err error) {
- t.Lock()
- defer t.Unlock()
- retTasks, err = multi(tasks, queueName, t.addLocked)
- return
- })
- return
+ t.Lock()
+ defer t.Unlock()
+
+ return t.deleteLocked(task, queueName)
}
-func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error {
- return t.RunIfNotBroken(func() error {
- t.Lock()
- defer t.Unlock()
+func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) {
+ if err := t.IsBroken(); err != nil {
+ return nil, err
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ return multi(tasks, queueName, t.addLocked)
+}
- _, err := multi(tasks, queueName,
- func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) {
- return nil, t.deleteLocked(tsk, qn)
- })
+func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) error {
+ if err := t.IsBroken(); err != nil {
return err
- })
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ _, err := multi(tasks, queueName,
+ func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
+ return nil, t.deleteLocked(tsk, qn)
+ })
+ return err
}
/////////////////////////////// taskqueueTxnImpl ///////////////////////////////
type taskqueueTxnImpl struct {
- gae.TaskQueue
+ wrapper.TaskQueue
*txnTaskQueueData
ctx context.Context
@@ -153,11 +164,11 @@ type taskqueueTxnImpl struct {
}
var (
- _ = gae.TaskQueue((*taskqueueTxnImpl)(nil))
- _ = gae.TQTestable((*taskqueueTxnImpl)(nil))
+ _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
+ _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
)
-func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) {
+func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
if err != nil {
return nil, err
@@ -172,7 +183,7 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
// ride on the datastore. The current datastore implementation only allows
// a maximum of 5 Actions per transaction, and more than that result in a
// BAD_REQUEST.
- return nil, errors.New("BAD_REQUEST")
+ return nil, newDSError(dbpb.Error_BAD_REQUEST)
}
t.anony[queueName] = append(t.anony[queueName], toSched)
@@ -189,24 +200,26 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
return toRet, nil
}
-func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) {
- err = t.RunIfNotBroken(func() (err error) {
- t.Lock()
- defer t.Unlock()
- retTask, err = t.addLocked(task, queueName)
- return
- })
- return
+func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) {
+ if err := t.IsBroken(); err != nil {
+ return nil, err
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ return t.addLocked(task, queueName)
}
-func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) {
- err = t.RunIfNotBroken(func() (err error) {
- t.Lock()
- defer t.Unlock()
- retTasks, err = multi(tasks, queueName, t.addLocked)
- return
- })
- return
+func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) {
+ if err := t.IsBroken(); err != nil {
+ return nil, err
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ return multi(tasks, queueName, t.addLocked)
}
////////////////////////////// private functions ///////////////////////////////
@@ -215,12 +228,12 @@ var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_"
-func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string {
+func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) string {
_, ok := queue[cur]
for !ok && cur == "" {
name := [500]byte{}
for i := 0; i < 500; i++ {
- name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(validTaskChars))]
+ name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len(validTaskChars))]
}
cur = string(name[:])
_, ok = queue[cur]
@@ -228,9 +241,13 @@ func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string
return cur
}
-func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) {
- ret := []*gae.TQTask(nil)
- me := gae.MultiError(nil)
+func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.APIError {
+ return &appengine_internal.APIError{Service: "taskqueue", Code: int32(code)}
+}
+
+func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, string) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
+ ret := []*taskqueue.Task(nil)
+ me := appengine.MultiError(nil)
foundErr := false
for _, task := range tasks {
rt, err := f(task, queueName)
@@ -246,8 +263,8 @@ func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*
return ret, me
}
-func dupTask(t *gae.TQTask) *gae.TQTask {
- ret := &gae.TQTask{}
+func dupTask(t *taskqueue.Task) *taskqueue.Task {
+ ret := &taskqueue.Task{}
*ret = *t
if t.Header != nil {
@@ -265,17 +282,17 @@ func dupTask(t *gae.TQTask) *gae.TQTask {
}
if t.RetryOptions != nil {
- ret.RetryOptions = &gae.TQRetryOptions{}
+ ret.RetryOptions = &taskqueue.RetryOptions{}
*ret.RetryOptions = *t.RetryOptions
}
return ret
}
-func dupQueue(q gae.QueueData) gae.QueueData {
- r := make(gae.QueueData, len(q))
+func dupQueue(q wrapper.QueueData) wrapper.QueueData {
+ r := make(wrapper.QueueData, len(q))
for k, q := range q {
- r[k] = make(map[string]*gae.TQTask, len(q))
+ r[k] = make(map[string]*taskqueue.Task, len(q))
for tn, t := range q {
r[k][tn] = dupTask(t)
}
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/plist_test.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698