Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Unified Diff: impl/memory/taskqueue.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: address comments Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/memory/memcache_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/memory/taskqueue.go
diff --git a/impl/memory/taskqueue.go b/impl/memory/taskqueue.go
index 7b09af1a6602857a1836155a73c180395ae12ece..2af0f5a7117f4345322c298b1d7ddad19003c98d 100644
--- a/impl/memory/taskqueue.go
+++ b/impl/memory/taskqueue.go
@@ -5,12 +5,11 @@
package memory
import (
- "net/http"
"regexp"
+ "sync/atomic"
"golang.org/x/net/context"
- "github.com/luci/gae/impl/dummy"
tq "github.com/luci/gae/service/taskqueue"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/mathrand"
@@ -19,18 +18,16 @@ import (
/////////////////////////////// public functions ///////////////////////////////
func useTQ(c context.Context) context.Context {
- return tq.SetFactory(c, func(ic context.Context) tq.Interface {
+ return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
tqd := cur(ic).Get(memContextTQIdx)
if x, ok := tqd.(*taskQueueData); ok {
return &taskqueueImpl{
- dummy.TaskQueue(),
x,
ic,
curGID(ic).namespace,
}
}
return &taskqueueTxnImpl{
- dummy.TaskQueue(),
tqd.(*txnTaskQueueData),
ic,
curGID(ic).namespace,
@@ -41,7 +38,6 @@ func useTQ(c context.Context) context.Context {
//////////////////////////////// taskqueueImpl /////////////////////////////////
type taskqueueImpl struct {
- tq.Interface
*taskQueueData
ctx context.Context
@@ -49,12 +45,12 @@ type taskqueueImpl struct {
}
var (
- _ = tq.Interface((*taskqueueImpl)(nil))
+ _ = tq.RawInterface((*taskqueueImpl)(nil))
_ = tq.Testable((*taskqueueImpl)(nil))
)
func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
- toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
+ toSched, err := t.prepTask(t.ctx, t.ns, task, queueName)
if err != nil {
return nil, err
}
@@ -68,21 +64,10 @@ func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
t.named[queueName][toSched.Name] = toSched
}
- return dupTask(toSched), nil
-}
-
-func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
- t.Lock()
- defer t.Unlock()
- return t.addLocked(task, queueName)
+ return toSched.Duplicate(), nil
}
func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
- queueName, err := t.getQueueName(queueName)
- if err != nil {
- return err
- }
-
if _, ok := t.archived[queueName][task.Name]; ok {
return errors.New("TOMBSTONED_TASK")
}
@@ -97,33 +82,72 @@ func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
return nil
}
-func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error {
+func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
t.Lock()
defer t.Unlock()
- return t.deleteLocked(task, queueName)
+
+ queueName, err := t.getQueueNameLocked(queueName)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tasks {
+ cb(t.addLocked(task, queueName))
+ }
+ return nil
}
-func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
+func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
t.Lock()
defer t.Unlock()
- return multi(tasks, queueName, t.addLocked)
+
+ queueName, err := t.getQueueNameLocked(queueName)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tasks {
+ cb(t.deleteLocked(task, queueName))
+ }
+ return nil
}
-func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
+func (t *taskqueueImpl) Purge(queueName string) error {
t.Lock()
defer t.Unlock()
- _, err := multi(tasks, queueName,
- func(tsk *tq.Task, qn string) (*tq.Task, error) {
- return nil, t.deleteLocked(tsk, qn)
- })
- return err
+ return t.purgeLocked(queueName)
+}
+
+func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
+ t.Lock()
+ defer t.Unlock()
+
+ for _, qn := range queueNames {
+ qn, err := t.getQueueNameLocked(qn)
+ if err != nil {
+ cb(nil, err)
+ } else {
+ s := tq.Statistics{
+ Tasks: len(t.named[qn]),
+ }
+ for _, t := range t.named[qn] {
+ if s.OldestETA.IsZero() {
+ s.OldestETA = t.ETA
+ } else if t.ETA.Before(s.OldestETA) {
+ s.OldestETA = t.ETA
+ }
+ }
+ cb(&s, nil)
+ }
+ }
+
+ return nil
}
/////////////////////////////// taskqueueTxnImpl ///////////////////////////////
type taskqueueTxnImpl struct {
- tq.Interface
*txnTaskQueueData
ctx context.Context
@@ -131,12 +155,12 @@ type taskqueueTxnImpl struct {
}
var _ interface {
- tq.Interface
+ tq.RawInterface
tq.Testable
} = (*taskqueueTxnImpl)(nil)
func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
- toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
+ toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
if err != nil {
return nil, err
}
@@ -161,30 +185,41 @@ func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
// We should verify that the .Name for a task added in a transaction is
// meaningless. Maybe names generated in a transaction are somehow
// guaranteed to be meaningful?
- toRet := dupTask(toSched)
+ toRet := toSched.Duplicate()
toRet.Name = ""
return toRet, nil
}
-func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) {
- err = t.run(func() (err error) {
- t.Lock()
- defer t.Unlock()
- retTask, err = t.addLocked(task, queueName)
- return
- })
- return
+func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
+ if atomic.LoadInt32(&t.closed) == 1 {
+ return errors.New("taskqueue: transaction context has expired")
+ }
+
+ t.Lock()
+ defer t.Unlock()
+
+ queueName, err := t.parent.getQueueNameLocked(queueName)
+ if err != nil {
+ return err
+ }
+
+ for _, task := range tasks {
+ cb(t.addLocked(task, queueName))
+ }
+ return nil
}
-func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) {
- err = t.run(func() (err error) {
- t.Lock()
- defer t.Unlock()
- retTasks, err = multi(tasks, queueName, t.addLocked)
- return
- })
- return
+func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
+ return errors.New("taskqueue: cannot DeleteMulti from a transaction")
+}
+
+func (t *taskqueueTxnImpl) Purge(string) error {
+ return errors.New("taskqueue: cannot Purge from a transaction")
+}
+
+func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
+ return errors.New("taskqueue: cannot Stats from a transaction")
}
////////////////////////////// private functions ///////////////////////////////
@@ -206,49 +241,12 @@ func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
return cur
}
-func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) {
- ret := []*tq.Task(nil)
- lme := errors.LazyMultiError{Size: len(tasks)}
- for i, task := range tasks {
- rt, err := f(task, queueName)
- ret = append(ret, rt)
- lme.Assign(i, err)
- }
- return ret, lme.Get()
-}
-
-func dupTask(t *tq.Task) *tq.Task {
- ret := &tq.Task{}
- *ret = *t
-
- if t.Header != nil {
- ret.Header = make(http.Header, len(t.Header))
- for k, vs := range t.Header {
- newVs := make([]string, len(vs))
- copy(newVs, vs)
- ret.Header[k] = newVs
- }
- }
-
- if t.Payload != nil {
- ret.Payload = make([]byte, len(t.Payload))
- copy(ret.Payload, t.Payload)
- }
-
- if t.RetryOptions != nil {
- ret.RetryOptions = &tq.RetryOptions{}
- *ret.RetryOptions = *t.RetryOptions
- }
-
- return ret
-}
-
func dupQueue(q tq.QueueData) tq.QueueData {
r := make(tq.QueueData, len(q))
for k, q := range q {
r[k] = make(map[string]*tq.Task, len(q))
for tn, t := range q {
- r[k][tn] = dupTask(t)
+ r[k][tn] = t.Duplicate()
}
}
return r
« no previous file with comments | « impl/memory/memcache_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698