Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(352)

Unified Diff: go/src/infra/gae/libs/gae/memory/taskqueue_data.go

Issue 1222903002: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: more fixes Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/gae/libs/gae/memory/taskqueue_data.go
diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/gae/memory/taskqueue_data.go
similarity index 69%
rename from go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
rename to go/src/infra/gae/libs/gae/memory/taskqueue_data.go
index 2d23b1d803ee3959a480d823cbd86364a53be40a..3e96e4fa7891a1162b9918edf8c8f85cdc3456d8 100644
--- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
+++ b/go/src/infra/gae/libs/gae/memory/taskqueue_data.go
@@ -7,16 +7,14 @@ package memory
import (
"errors"
"fmt"
- "infra/gae/libs/wrapper"
+ "golang.org/x/net/context"
"net/http"
"sync"
"sync/atomic"
- "appengine/datastore"
- "appengine/taskqueue"
- pb "appengine_internal/taskqueue"
- "golang.org/x/net/context"
- "infra/libs/clock"
+ "infra/gae/libs/gae"
+
+ "github.com/luci/luci-go/common/clock"
)
var (
@@ -28,23 +26,23 @@ var (
type taskQueueData struct {
sync.Mutex
- wrapper.BrokenFeatures
+ gae.BrokenFeatures
- named wrapper.QueueData
- archived wrapper.QueueData
+ named gae.QueueData
+ archived gae.QueueData
}
var (
_ = memContextObj((*taskQueueData)(nil))
- _ = wrapper.TQTestable((*taskQueueData)(nil))
+ _ = gae.TQTestable((*taskQueueData)(nil))
)
func newTaskQueueData() memContextObj {
return &taskQueueData{
- BrokenFeatures: wrapper.BrokenFeatures{
- DefaultError: newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)},
- named: wrapper.QueueData{"default": {}},
- archived: wrapper.QueueData{"default": {}},
+ BrokenFeatures: gae.BrokenFeatures{
+ DefaultError: errors.New("TRANSIENT_ERROR")},
+ named: gae.QueueData{"default": {}},
+ archived: gae.QueueData{"default": {}},
}
}
@@ -60,15 +58,15 @@ func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
}
txn.anony = nil
}
-func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
+func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) {
return &txnTaskQueueData{
BrokenFeatures: &t.BrokenFeatures,
parent: t,
- anony: wrapper.AnonymousQueueData{},
+ anony: gae.AnonymousQueueData{},
}, nil
}
-func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
+func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
return nil
}
@@ -79,18 +77,18 @@ func (t *taskQueueData) CreateQueue(queueName string) {
if _, ok := t.named[queueName]; ok {
panic(fmt.Errorf("memory/taskqueue: cannot add the same queue twice! %q", queueName))
}
- t.named[queueName] = map[string]*taskqueue.Task{}
- t.archived[queueName] = map[string]*taskqueue.Task{}
+ t.named[queueName] = map[string]*gae.TQTask{}
+ t.archived[queueName] = map[string]*gae.TQTask{}
}
-func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
+func (t *taskQueueData) GetScheduledTasks() gae.QueueData {
t.Lock()
defer t.Unlock()
return dupQueue(t.named)
}
-func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
+func (t *taskQueueData) GetTombstonedTasks() gae.QueueData {
t.Lock()
defer t.Unlock()
@@ -98,9 +96,9 @@ func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
}
func (t *taskQueueData) resetTasksWithLock() {
- for queuename := range t.named {
- t.named[queuename] = map[string]*taskqueue.Task{}
- t.archived[queuename] = map[string]*taskqueue.Task{}
+ for queueName := range t.named {
+ t.named[queueName] = map[string]*gae.TQTask{}
+ t.archived[queueName] = map[string]*gae.TQTask{}
}
}
@@ -116,13 +114,20 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) {
queueName = "default"
}
if _, ok := t.named[queueName]; !ok {
- return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
+ return "", errors.New("UNKNOWN_QUEUE")
}
return queueName, nil
}
-func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) (
- *taskqueue.Task, string, error) {
+var tqOkMethods = map[string]struct{}{
+ "GET": {},
+ "POST": {},
+ "HEAD": {},
+ "PUT": {},
+ "DELETE": {},
+}
+
+func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) {
queueName, err := t.getQueueName(queueName)
if err != nil {
return nil, "", err
@@ -131,7 +136,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
toSched := dupTask(task)
if toSched.Path == "" {
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
+ return nil, "", errors.New("INVALID_URL")
}
if toSched.ETA.IsZero() {
@@ -144,7 +149,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
if toSched.Method == "" {
toSched.Method = "POST"
}
- if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
+ if _, ok := tqOkMethods[toSched.Method]; !ok {
return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.Method)
}
if toSched.Method != "POST" && toSched.Method != "PUT" {
@@ -165,7 +170,7 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
toSched.Name = mkName(c, "", t.named[queueName])
} else {
if !validTaskName.MatchString(toSched.Name) {
- return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME)
+ return nil, "", errors.New("INVALID_TASK_NAME")
}
}
@@ -175,19 +180,19 @@ func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T
/////////////////////////////// txnTaskQueueData ///////////////////////////////
type txnTaskQueueData struct {
- *wrapper.BrokenFeatures
+ *gae.BrokenFeatures
lock sync.Mutex
// boolean 0 or 1, use atomic.*Int32 to access.
closed int32
- anony wrapper.AnonymousQueueData
+ anony gae.AnonymousQueueData
parent *taskQueueData
}
var (
_ = memContextObj((*txnTaskQueueData)(nil))
- _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
+ _ = gae.TQTestable((*txnTaskQueueData)(nil))
)
func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
@@ -196,7 +201,7 @@ func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
}
-func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
+func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) {
return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
}
@@ -207,13 +212,13 @@ func (t *txnTaskQueueData) endTxn() {
atomic.StoreInt32(&t.closed, 1)
}
-func (t *txnTaskQueueData) IsBroken() error {
+func (t *txnTaskQueueData) RunIfNotBroken(f func() error) error {
// Slightly different from the SDK... datastore and taskqueue each implement
// this here, where in the SDK only datastore.transaction.Call does.
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("taskqueue: transaction context has expired")
}
- return t.parent.IsBroken()
+ return t.parent.RunIfNotBroken(f)
}
func (t *txnTaskQueueData) ResetTasks() {
@@ -235,13 +240,13 @@ func (t *txnTaskQueueData) Unlock() {
t.lock.Unlock()
}
-func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
+func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
t.Lock()
defer t.Unlock()
- ret := make(wrapper.AnonymousQueueData, len(t.anony))
+ ret := make(gae.AnonymousQueueData, len(t.anony))
for k, vs := range t.anony {
- ret[k] = make([]*taskqueue.Task, len(vs))
+ ret[k] = make([]*gae.TQTask, len(vs))
for i, v := range vs {
tsk := dupTask(v)
tsk.Name = ""
@@ -252,11 +257,11 @@ func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
return ret
}
-func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
+func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData {
return t.parent.GetTombstonedTasks()
}
-func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
+func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData {
return t.parent.GetScheduledTasks()
}

Powered by Google App Engine
This is Rietveld 408576698