Index: go/src/infra/gae/libs/gae/memory/taskqueue_data.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/gae/memory/taskqueue_data.go |
similarity index 69% |
rename from go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
rename to go/src/infra/gae/libs/gae/memory/taskqueue_data.go |
index 2d23b1d803ee3959a480d823cbd86364a53be40a..3e96e4fa7891a1162b9918edf8c8f85cdc3456d8 100644 |
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go |
+++ b/go/src/infra/gae/libs/gae/memory/taskqueue_data.go |
@@ -7,16 +7,14 @@ package memory |
import ( |
"errors" |
"fmt" |
- "infra/gae/libs/wrapper" |
+ "golang.org/x/net/context" |
"net/http" |
"sync" |
"sync/atomic" |
- "appengine/datastore" |
- "appengine/taskqueue" |
- pb "appengine_internal/taskqueue" |
- "golang.org/x/net/context" |
- "infra/libs/clock" |
+ "infra/gae/libs/gae" |
+ |
+ "github.com/luci/luci-go/common/clock" |
) |
var ( |
@@ -28,23 +26,23 @@ var ( |
type taskQueueData struct { |
sync.Mutex |
- wrapper.BrokenFeatures |
+ gae.BrokenFeatures |
- named wrapper.QueueData |
- archived wrapper.QueueData |
+ named gae.QueueData |
+ archived gae.QueueData |
} |
var ( |
_ = memContextObj((*taskQueueData)(nil)) |
- _ = wrapper.TQTestable((*taskQueueData)(nil)) |
+ _ = gae.TQTestable((*taskQueueData)(nil)) |
) |
func newTaskQueueData() memContextObj { |
return &taskQueueData{ |
- BrokenFeatures: wrapper.BrokenFeatures{ |
- DefaultError: newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)}, |
- named: wrapper.QueueData{"default": {}}, |
- archived: wrapper.QueueData{"default": {}}, |
+ BrokenFeatures: gae.BrokenFeatures{ |
+ DefaultError: errors.New("TRANSIENT_ERROR")}, |
+ named: gae.QueueData{"default": {}}, |
+ archived: gae.QueueData{"default": {}}, |
} |
} |
@@ -60,15 +58,15 @@ func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { |
} |
txn.anony = nil |
} |
-func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
+func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) { |
return &txnTaskQueueData{ |
BrokenFeatures: &t.BrokenFeatures, |
parent: t, |
- anony: wrapper.AnonymousQueueData{}, |
+ anony: gae.AnonymousQueueData{}, |
}, nil |
} |
-func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
+func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData { |
return nil |
} |
@@ -79,18 +77,18 @@ func (t *taskQueueData) CreateQueue(queueName string) { |
if _, ok := t.named[queueName]; ok { |
panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName)) |
} |
- t.named[queueName] = map[string]*taskqueue.Task{} |
- t.archived[queueName] = map[string]*taskqueue.Task{} |
+ t.named[queueName] = map[string]*gae.TQTask{} |
+ t.archived[queueName] = map[string]*gae.TQTask{} |
} |
-func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData { |
+func (t *taskQueueData) GetScheduledTasks() gae.QueueData { |
t.Lock() |
defer t.Unlock() |
return dupQueue(t.named) |
} |
-func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { |
+func (t *taskQueueData) GetTombstonedTasks() gae.QueueData { |
t.Lock() |
defer t.Unlock() |
@@ -98,9 +96,9 @@ func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { |
} |
func (t *taskQueueData) resetTasksWithLock() { |
- for queuename := range t.named { |
- t.named[queuename] = map[string]*taskqueue.Task{} |
- t.archived[queuename] = map[string]*taskqueue.Task{} |
+ for queueName := range t.named { |
+ t.named[queueName] = map[string]*gae.TQTask{} |
+ t.archived[queueName] = map[string]*gae.TQTask{} |
} |
} |
@@ -116,13 +114,20 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) { |
queueName = "default" |
} |
if _, ok := t.named[queueName]; !ok { |
- return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) |
+ return "", errors.New("UNKNOWN_QUEUE") |
} |
return queueName, nil |
} |
-func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) ( |
- *taskqueue.Task, string, error) { |
+var tqOkMethods = map[string]struct{}{ |
+ "GET": {}, |
+ "POST": {}, |
+ "HEAD": {}, |
+ "PUT": {}, |
+ "DELETE": {}, |
+} |
+ |
+func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) { |
queueName, err := t.getQueueName(queueName) |
if err != nil { |
return nil, "", err |
@@ -131,7 +136,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T |
toSched := dupTask(task) |
if toSched.Path == "" { |
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) |
+ return nil, "", errors.New("INVALID_URL") |
} |
if toSched.ETA.IsZero() { |
@@ -144,7 +149,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T |
if toSched.Method == "" { |
toSched.Method = "POST" |
} |
- if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok { |
+ if _, ok := tqOkMethods[toSched.Method]; !ok { |
return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method) |
} |
if toSched.Method != "POST" && toSched.Method != "PUT" { |
@@ -165,7 +170,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T |
toSched.Name = mkName(c, "", t.named[queueName]) |
} else { |
if !validTaskName.MatchString(toSched.Name) { |
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME) |
+ return nil, "", errors.New("INVALID_TASK_NAME") |
} |
} |
@@ -175,19 +180,19 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T |
/////////////////////////////// txnTaskQueueData /////////////////////////////// |
type txnTaskQueueData struct { |
- *wrapper.BrokenFeatures |
+ *gae.BrokenFeatures |
lock sync.Mutex |
// boolean 0 or 1, use atomic.*Int32 to access. |
closed int32 |
- anony wrapper.AnonymousQueueData |
+ anony gae.AnonymousQueueData |
parent *taskQueueData |
} |
var ( |
_ = memContextObj((*txnTaskQueueData)(nil)) |
- _ = wrapper.TQTestable((*txnTaskQueueData)(nil)) |
+ _ = gae.TQTestable((*txnTaskQueueData)(nil)) |
) |
func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } |
@@ -196,7 +201,7 @@ func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { |
panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) |
} |
-func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
+func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) { |
return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") |
} |
@@ -207,13 +212,13 @@ func (t *txnTaskQueueData) endTxn() { |
atomic.StoreInt32(&t.closed, 1) |
} |
-func (t *txnTaskQueueData) IsBroken() error { |
+func (t *txnTaskQueueData) RunIfNotBroken(f func() error) error { |
// Slightly different from the SDK... datastore and taskqueue each implement |
// this here, where in the SDK only datastore.transaction.Call does. |
if atomic.LoadInt32(&t.closed) == 1 { |
return fmt.Errorf("taskqueue: transaction context has expired") |
} |
- return t.parent.IsBroken() |
+ return t.parent.RunIfNotBroken(f) |
} |
func (t *txnTaskQueueData) ResetTasks() { |
@@ -235,13 +240,13 @@ func (t *txnTaskQueueData) Unlock() { |
t.lock.Unlock() |
} |
-func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
+func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData { |
t.Lock() |
defer t.Unlock() |
- ret := make(wrapper.AnonymousQueueData, len(t.anony)) |
+ ret := make(gae.AnonymousQueueData, len(t.anony)) |
for k, vs := range t.anony { |
- ret[k] = make([]*taskqueue.Task, len(vs)) |
+ ret[k] = make([]*gae.TQTask, len(vs)) |
for i, v := range vs { |
tsk := dupTask(v) |
tsk.Name = "" |
@@ -252,11 +257,11 @@ func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { |
return ret |
} |
-func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData { |
+func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData { |
return t.parent.GetTombstonedTasks() |
} |
-func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { |
+func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData { |
return t.parent.GetScheduledTasks() |
} |