Index: impl/memory/taskqueue.go |
diff --git a/impl/memory/taskqueue.go b/impl/memory/taskqueue.go |
index 7b09af1a6602857a1836155a73c180395ae12ece..2af0f5a7117f4345322c298b1d7ddad19003c98d 100644 |
--- a/impl/memory/taskqueue.go |
+++ b/impl/memory/taskqueue.go |
@@ -5,12 +5,11 @@ |
package memory |
import ( |
- "net/http" |
"regexp" |
+ "sync/atomic" |
"golang.org/x/net/context" |
- "github.com/luci/gae/impl/dummy" |
tq "github.com/luci/gae/service/taskqueue" |
"github.com/luci/luci-go/common/errors" |
"github.com/luci/luci-go/common/mathrand" |
@@ -19,18 +18,16 @@ import ( |
/////////////////////////////// public functions /////////////////////////////// |
func useTQ(c context.Context) context.Context { |
- return tq.SetFactory(c, func(ic context.Context) tq.Interface { |
+ return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface { |
tqd := cur(ic).Get(memContextTQIdx) |
if x, ok := tqd.(*taskQueueData); ok { |
return &taskqueueImpl{ |
- dummy.TaskQueue(), |
x, |
ic, |
curGID(ic).namespace, |
} |
} |
return &taskqueueTxnImpl{ |
- dummy.TaskQueue(), |
tqd.(*txnTaskQueueData), |
ic, |
curGID(ic).namespace, |
@@ -41,7 +38,6 @@ func useTQ(c context.Context) context.Context { |
//////////////////////////////// taskqueueImpl ///////////////////////////////// |
type taskqueueImpl struct { |
- tq.Interface |
*taskQueueData |
ctx context.Context |
@@ -49,12 +45,12 @@ type taskqueueImpl struct { |
} |
var ( |
- _ = tq.Interface((*taskqueueImpl)(nil)) |
+ _ = tq.RawInterface((*taskqueueImpl)(nil)) |
_ = tq.Testable((*taskqueueImpl)(nil)) |
) |
func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
- toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
+ toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
} |
@@ -68,21 +64,10 @@ func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er |
t.named[queueName][toSched.Name] = toSched |
} |
- return dupTask(toSched), nil |
-} |
- |
-func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
- t.Lock() |
- defer t.Unlock() |
- return t.addLocked(task, queueName) |
+ return toSched.Duplicate(), nil |
} |
func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
- queueName, err := t.getQueueName(queueName) |
- if err != nil { |
- return err |
- } |
- |
if _, ok := t.archived[queueName][task.Name]; ok { |
return errors.New("TOMBSTONED_TASK") |
} |
@@ -97,33 +82,72 @@ func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
return nil |
} |
-func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { |
+func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error { |
t.Lock() |
defer t.Unlock() |
- return t.deleteLocked(task, queueName) |
+ |
+ queueName, err := t.getQueueNameLocked(queueName) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ for _, task := range tasks { |
+ cb(t.addLocked(task, queueName)) |
+ } |
+ return nil |
} |
-func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) { |
+func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error { |
t.Lock() |
defer t.Unlock() |
- return multi(tasks, queueName, t.addLocked) |
+ |
+ queueName, err := t.getQueueNameLocked(queueName) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ for _, task := range tasks { |
+ cb(t.deleteLocked(task, queueName)) |
+ } |
+ return nil |
} |
-func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
+func (t *taskqueueImpl) Purge(queueName string) error { |
t.Lock() |
defer t.Unlock() |
- _, err := multi(tasks, queueName, |
- func(tsk *tq.Task, qn string) (*tq.Task, error) { |
- return nil, t.deleteLocked(tsk, qn) |
- }) |
- return err |
+ return t.purgeLocked(queueName) |
+} |
+ |
+func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ for _, qn := range queueNames { |
+ qn, err := t.getQueueNameLocked(qn) |
+ if err != nil { |
+ cb(nil, err) |
+ } else { |
+ s := tq.Statistics{ |
+ Tasks: len(t.named[qn]), |
+ } |
+ for _, t := range t.named[qn] { |
+ if s.OldestETA.IsZero() { |
+ s.OldestETA = t.ETA |
+ } else if t.ETA.Before(s.OldestETA) { |
+ s.OldestETA = t.ETA |
+ } |
+ } |
+ cb(&s, nil) |
+ } |
+ } |
+ |
+ return nil |
} |
/////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
type taskqueueTxnImpl struct { |
- tq.Interface |
*txnTaskQueueData |
ctx context.Context |
@@ -131,12 +155,12 @@ type taskqueueTxnImpl struct { |
} |
var _ interface { |
- tq.Interface |
+ tq.RawInterface |
tq.Testable |
} = (*taskqueueTxnImpl)(nil) |
func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
- toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
+ toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
if err != nil { |
return nil, err |
} |
@@ -161,30 +185,41 @@ func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, |
// We should verify that the .Name for a task added in a transaction is |
// meaningless. Maybe names generated in a transaction are somehow |
// guaranteed to be meaningful? |
- toRet := dupTask(toSched) |
+ toRet := toSched.Duplicate() |
toRet.Name = "" |
return toRet, nil |
} |
-func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) { |
- err = t.run(func() (err error) { |
- t.Lock() |
- defer t.Unlock() |
- retTask, err = t.addLocked(task, queueName) |
- return |
- }) |
- return |
+func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error { |
+ if atomic.LoadInt32(&t.closed) == 1 { |
+ return errors.New("taskqueue: transaction context has expired") |
+ } |
+ |
+ t.Lock() |
+ defer t.Unlock() |
+ |
+ queueName, err := t.parent.getQueueNameLocked(queueName) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ for _, task := range tasks { |
+ cb(t.addLocked(task, queueName)) |
+ } |
+ return nil |
} |
-func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) { |
- err = t.run(func() (err error) { |
- t.Lock() |
- defer t.Unlock() |
- retTasks, err = multi(tasks, queueName, t.addLocked) |
- return |
- }) |
- return |
+func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { |
+ return errors.New("taskqueue: cannot DeleteMulti from a transaction") |
+} |
+ |
+func (t *taskqueueTxnImpl) Purge(string) error { |
+ return errors.New("taskqueue: cannot Purge from a transaction") |
+} |
+ |
+func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { |
+ return errors.New("taskqueue: cannot Stats from a transaction") |
} |
////////////////////////////// private functions /////////////////////////////// |
@@ -206,49 +241,12 @@ func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
return cur |
} |
-func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) { |
- ret := []*tq.Task(nil) |
- lme := errors.LazyMultiError{Size: len(tasks)} |
- for i, task := range tasks { |
- rt, err := f(task, queueName) |
- ret = append(ret, rt) |
- lme.Assign(i, err) |
- } |
- return ret, lme.Get() |
-} |
- |
-func dupTask(t *tq.Task) *tq.Task { |
- ret := &tq.Task{} |
- *ret = *t |
- |
- if t.Header != nil { |
- ret.Header = make(http.Header, len(t.Header)) |
- for k, vs := range t.Header { |
- newVs := make([]string, len(vs)) |
- copy(newVs, vs) |
- ret.Header[k] = newVs |
- } |
- } |
- |
- if t.Payload != nil { |
- ret.Payload = make([]byte, len(t.Payload)) |
- copy(ret.Payload, t.Payload) |
- } |
- |
- if t.RetryOptions != nil { |
- ret.RetryOptions = &tq.RetryOptions{} |
- *ret.RetryOptions = *t.RetryOptions |
- } |
- |
- return ret |
-} |
- |
func dupQueue(q tq.QueueData) tq.QueueData { |
r := make(tq.QueueData, len(q)) |
for k, q := range q { |
r[k] = make(map[string]*tq.Task, len(q)) |
for tn, t := range q { |
- r[k][tn] = dupTask(t) |
+ r[k][tn] = t.Duplicate() |
} |
} |
return r |