Index: go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
diff --git a/go/src/infra/gae/libs/gae/memory/taskqueue.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
similarity index 53% |
rename from go/src/infra/gae/libs/gae/memory/taskqueue.go |
rename to go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
index b0e04b201d5b73a179d3df15f8cdf7e7538ee835..4d05752a7235fd0e8ae12166ee712628c91e646e 100644 |
--- a/go/src/infra/gae/libs/gae/memory/taskqueue.go |
+++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue.go |
@@ -5,29 +5,33 @@ |
package memory |
import ( |
- "errors" |
"fmt" |
+ "infra/gae/libs/wrapper" |
"net/http" |
"regexp" |
"golang.org/x/net/context" |
- "infra/gae/libs/gae" |
+ "appengine" |
+ "appengine/taskqueue" |
+ "appengine_internal" |
+ dbpb "appengine_internal/datastore" |
+ pb "appengine_internal/taskqueue" |
) |
/////////////////////////////// public functions /////////////////////////////// |
func useTQ(c context.Context) context.Context { |
- return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { |
+ return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueue { |
tqd := cur(ic).Get(memContextTQIdx) |
var ret interface { |
- gae.TQTestable |
- gae.TaskQueue |
+ wrapper.TQTestable |
+ wrapper.TaskQueue |
} |
switch x := tqd.(type) { |
case *taskQueueData: |
ret = &taskqueueImpl{ |
- gae.DummyTQ(), |
+ wrapper.DummyTQ(), |
x, |
ic, |
curGID(ic).namespace, |
@@ -35,7 +39,7 @@ func useTQ(c context.Context) context.Context { |
case *txnTaskQueueData: |
ret = &taskqueueTxnImpl{ |
- gae.DummyTQ(), |
+ wrapper.DummyTQ(), |
x, |
ic, |
curGID(ic).namespace, |
@@ -51,7 +55,7 @@ func useTQ(c context.Context) context.Context { |
//////////////////////////////// taskqueueImpl ///////////////////////////////// |
type taskqueueImpl struct { |
- gae.TaskQueue |
+ wrapper.TaskQueue |
*taskQueueData |
ctx context.Context |
@@ -59,11 +63,11 @@ type taskqueueImpl struct { |
} |
var ( |
- _ = gae.TaskQueue((*taskqueueImpl)(nil)) |
- _ = gae.TQTestable((*taskqueueImpl)(nil)) |
+ _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
+ _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
) |
-func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
+func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
@@ -71,9 +75,9 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa |
if _, ok := t.archived[queueName][toSched.Name]; ok { |
// SDK converts TOMBSTONE -> already added too |
- return nil, gae.ErrTQTaskAlreadyAdded |
+ return nil, taskqueue.ErrTaskAlreadyAdded |
} else if _, ok := t.named[queueName][toSched.Name]; ok { |
- return nil, gae.ErrTQTaskAlreadyAdded |
+ return nil, taskqueue.ErrTaskAlreadyAdded |
} else { |
t.named[queueName][toSched.Name] = toSched |
} |
@@ -81,28 +85,29 @@ func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa |
return dupTask(toSched), nil |
} |
-func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) { |
- err = t.RunIfNotBroken(func() (err error) { |
- t.Lock() |
- defer t.Unlock() |
- retTask, err = t.addLocked(task, queueName) |
- return |
- }) |
- return |
+func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
+ if err := t.IsBroken(); err != nil { |
+ return nil, err |
+ } |
+ |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ return t.addLocked(task, queueName) |
} |
-func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { |
+func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) error { |
queueName, err := t.getQueueName(queueName) |
if err != nil { |
return err |
} |
if _, ok := t.archived[queueName][task.Name]; ok { |
- return errors.New("TOMBSTONED_TASK") |
+ return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) |
} |
if _, ok := t.named[queueName][task.Name]; !ok { |
- return errors.New("UNKNOWN_TASK") |
+ return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) |
} |
t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
@@ -111,41 +116,47 @@ func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { |
return nil |
} |
-func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { |
- return t.RunIfNotBroken(func() error { |
- t.Lock() |
- defer t.Unlock() |
- return t.deleteLocked(task, queueName) |
- }) |
-} |
+func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { |
+ if err := t.IsBroken(); err != nil { |
+ return err |
+ } |
-func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) { |
- err = t.RunIfNotBroken(func() (err error) { |
- t.Lock() |
- defer t.Unlock() |
- retTasks, err = multi(tasks, queueName, t.addLocked) |
- return |
- }) |
- return |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ return t.deleteLocked(task, queueName) |
} |
-func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { |
- return t.RunIfNotBroken(func() error { |
- t.Lock() |
- defer t.Unlock() |
+func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) { |
+ if err := t.IsBroken(); err != nil { |
+ return nil, err |
+ } |
+ |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ return multi(tasks, queueName, t.addLocked) |
+} |
- _, err := multi(tasks, queueName, |
- func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { |
- return nil, t.deleteLocked(tsk, qn) |
- }) |
+func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) error { |
+ if err := t.IsBroken(); err != nil { |
return err |
- }) |
+ } |
+ |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ _, err := multi(tasks, queueName, |
+ func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { |
+ return nil, t.deleteLocked(tsk, qn) |
+ }) |
+ return err |
} |
/////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
type taskqueueTxnImpl struct { |
- gae.TaskQueue |
+ wrapper.TaskQueue |
*txnTaskQueueData |
ctx context.Context |
@@ -153,11 +164,11 @@ type taskqueueTxnImpl struct { |
} |
var ( |
- _ = gae.TaskQueue((*taskqueueTxnImpl)(nil)) |
- _ = gae.TQTestable((*taskqueueTxnImpl)(nil)) |
+ _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
+ _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
) |
-func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTask, error) { |
+func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
@@ -172,7 +183,7 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T |
// ride on the datastore. The current datastore implementation only allows |
// a maximum of 5 Actions per transaction, and more than that result in a |
// BAD_REQUEST. |
- return nil, errors.New("BAD_REQUEST") |
+ return nil, newDSError(dbpb.Error_BAD_REQUEST) |
} |
t.anony[queueName] = append(t.anony[queueName], toSched) |
@@ -189,24 +200,26 @@ func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T |
return toRet, nil |
} |
-func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQTask, err error) { |
- err = t.RunIfNotBroken(func() (err error) { |
- t.Lock() |
- defer t.Unlock() |
- retTask, err = t.addLocked(task, queueName) |
- return |
- }) |
- return |
+func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.Task, error) { |
+ if err := t.IsBroken(); err != nil { |
+ return nil, err |
+ } |
+ |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ return t.addLocked(task, queueName) |
} |
-func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTasks []*gae.TQTask, err error) { |
- err = t.RunIfNotBroken(func() (err error) { |
- t.Lock() |
- defer t.Unlock() |
- retTasks, err = multi(tasks, queueName, t.addLocked) |
- return |
- }) |
- return |
+func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*taskqueue.Task, error) { |
+ if err := t.IsBroken(); err != nil { |
+ return nil, err |
+ } |
+ |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ return multi(tasks, queueName, t.addLocked) |
} |
////////////////////////////// private functions /////////////////////////////// |
@@ -215,12 +228,12 @@ var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_" |
-func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string { |
+func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) string { |
_, ok := queue[cur] |
for !ok && cur == "" { |
name := [500]byte{} |
for i := 0; i < 500; i++ { |
- name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(validTaskChars))] |
+ name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len(validTaskChars))] |
} |
cur = string(name[:]) |
_, ok = queue[cur] |
@@ -228,9 +241,13 @@ func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string |
return cur |
} |
-func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*gae.TQTask, error)) ([]*gae.TQTask, error) { |
- ret := []*gae.TQTask(nil) |
- me := gae.MultiError(nil) |
+func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.APIError { |
+ return &appengine_internal.APIError{Service: "taskqueue", Code: int32(code)} |
+} |
+ |
+func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, string) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { |
+ ret := []*taskqueue.Task(nil) |
+ me := appengine.MultiError(nil) |
foundErr := false |
for _, task := range tasks { |
rt, err := f(task, queueName) |
@@ -246,8 +263,8 @@ func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (* |
return ret, me |
} |
-func dupTask(t *gae.TQTask) *gae.TQTask { |
- ret := &gae.TQTask{} |
+func dupTask(t *taskqueue.Task) *taskqueue.Task { |
+ ret := &taskqueue.Task{} |
*ret = *t |
if t.Header != nil { |
@@ -265,17 +282,17 @@ func dupTask(t *gae.TQTask) *gae.TQTask { |
} |
if t.RetryOptions != nil { |
- ret.RetryOptions = &gae.TQRetryOptions{} |
+ ret.RetryOptions = &taskqueue.RetryOptions{} |
*ret.RetryOptions = *t.RetryOptions |
} |
return ret |
} |
-func dupQueue(q gae.QueueData) gae.QueueData { |
- r := make(gae.QueueData, len(q)) |
+func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
+ r := make(wrapper.QueueData, len(q)) |
for k, q := range q { |
- r[k] = make(map[string]*gae.TQTask, len(q)) |
+ r[k] = make(map[string]*taskqueue.Task, len(q)) |
for tn, t := range q { |
r[k][tn] = dupTask(t) |
} |