| Index: impl/memory/taskqueue.go
|
| diff --git a/impl/memory/taskqueue.go b/impl/memory/taskqueue.go
|
| index 7b09af1a6602857a1836155a73c180395ae12ece..2af0f5a7117f4345322c298b1d7ddad19003c98d 100644
|
| --- a/impl/memory/taskqueue.go
|
| +++ b/impl/memory/taskqueue.go
|
| @@ -5,12 +5,11 @@
|
| package memory
|
|
|
| import (
|
| - "net/http"
|
| "regexp"
|
| + "sync/atomic"
|
|
|
| "golang.org/x/net/context"
|
|
|
| - "github.com/luci/gae/impl/dummy"
|
| tq "github.com/luci/gae/service/taskqueue"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/mathrand"
|
| @@ -19,18 +18,16 @@ import (
|
| /////////////////////////////// public functions ///////////////////////////////
|
|
|
| func useTQ(c context.Context) context.Context {
|
| - return tq.SetFactory(c, func(ic context.Context) tq.Interface {
|
| + return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
|
| tqd := cur(ic).Get(memContextTQIdx)
|
| if x, ok := tqd.(*taskQueueData); ok {
|
| return &taskqueueImpl{
|
| - dummy.TaskQueue(),
|
| x,
|
| ic,
|
| curGID(ic).namespace,
|
| }
|
| }
|
| return &taskqueueTxnImpl{
|
| - dummy.TaskQueue(),
|
| tqd.(*txnTaskQueueData),
|
| ic,
|
| curGID(ic).namespace,
|
| @@ -41,7 +38,6 @@ func useTQ(c context.Context) context.Context {
|
| //////////////////////////////// taskqueueImpl /////////////////////////////////
|
|
|
| type taskqueueImpl struct {
|
| - tq.Interface
|
| *taskQueueData
|
|
|
| ctx context.Context
|
| @@ -49,12 +45,12 @@ type taskqueueImpl struct {
|
| }
|
|
|
| var (
|
| - _ = tq.Interface((*taskqueueImpl)(nil))
|
| + _ = tq.RawInterface((*taskqueueImpl)(nil))
|
| _ = tq.Testable((*taskqueueImpl)(nil))
|
| )
|
|
|
| func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
|
| - toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
|
| + toSched, err := t.prepTask(t.ctx, t.ns, task, queueName)
|
| if err != nil {
|
| return nil, err
|
| }
|
| @@ -68,21 +64,10 @@ func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
|
| t.named[queueName][toSched.Name] = toSched
|
| }
|
|
|
| - return dupTask(toSched), nil
|
| -}
|
| -
|
| -func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - return t.addLocked(task, queueName)
|
| + return toSched.Duplicate(), nil
|
| }
|
|
|
| func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
|
| - queueName, err := t.getQueueName(queueName)
|
| - if err != nil {
|
| - return err
|
| - }
|
| -
|
| if _, ok := t.archived[queueName][task.Name]; ok {
|
| return errors.New("TOMBSTONED_TASK")
|
| }
|
| @@ -97,33 +82,72 @@ func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
|
| return nil
|
| }
|
|
|
| -func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error {
|
| +func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
|
| t.Lock()
|
| defer t.Unlock()
|
| - return t.deleteLocked(task, queueName)
|
| +
|
| + queueName, err := t.getQueueNameLocked(queueName)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + for _, task := range tasks {
|
| + cb(t.addLocked(task, queueName))
|
| + }
|
| + return nil
|
| }
|
|
|
| -func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) {
|
| +func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error {
|
| t.Lock()
|
| defer t.Unlock()
|
| - return multi(tasks, queueName, t.addLocked)
|
| +
|
| + queueName, err := t.getQueueNameLocked(queueName)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + for _, task := range tasks {
|
| + cb(t.deleteLocked(task, queueName))
|
| + }
|
| + return nil
|
| }
|
|
|
| -func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
|
| +func (t *taskqueueImpl) Purge(queueName string) error {
|
| t.Lock()
|
| defer t.Unlock()
|
|
|
| - _, err := multi(tasks, queueName,
|
| - func(tsk *tq.Task, qn string) (*tq.Task, error) {
|
| - return nil, t.deleteLocked(tsk, qn)
|
| - })
|
| - return err
|
| + return t.purgeLocked(queueName)
|
| +}
|
| +
|
| +func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + for _, qn := range queueNames {
|
| + qn, err := t.getQueueNameLocked(qn)
|
| + if err != nil {
|
| + cb(nil, err)
|
| + } else {
|
| + s := tq.Statistics{
|
| + Tasks: len(t.named[qn]),
|
| + }
|
| + for _, t := range t.named[qn] {
|
| + if s.OldestETA.IsZero() {
|
| + s.OldestETA = t.ETA
|
| + } else if t.ETA.Before(s.OldestETA) {
|
| + s.OldestETA = t.ETA
|
| + }
|
| + }
|
| + cb(&s, nil)
|
| + }
|
| + }
|
| +
|
| + return nil
|
| }
|
|
|
| /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
|
|
|
| type taskqueueTxnImpl struct {
|
| - tq.Interface
|
| *txnTaskQueueData
|
|
|
| ctx context.Context
|
| @@ -131,12 +155,12 @@ type taskqueueTxnImpl struct {
|
| }
|
|
|
| var _ interface {
|
| - tq.Interface
|
| + tq.RawInterface
|
| tq.Testable
|
| } = (*taskqueueTxnImpl)(nil)
|
|
|
| func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
|
| - toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
|
| + toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
|
| if err != nil {
|
| return nil, err
|
| }
|
| @@ -161,30 +185,41 @@ func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
|
| // We should verify that the .Name for a task added in a transaction is
|
| // meaningless. Maybe names generated in a transaction are somehow
|
| // guaranteed to be meaningful?
|
| - toRet := dupTask(toSched)
|
| + toRet := toSched.Duplicate()
|
| toRet.Name = ""
|
|
|
| return toRet, nil
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) {
|
| - err = t.run(func() (err error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - retTask, err = t.addLocked(task, queueName)
|
| - return
|
| - })
|
| - return
|
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
|
| + if atomic.LoadInt32(&t.closed) == 1 {
|
| + return errors.New("taskqueue: transaction context has expired")
|
| + }
|
| +
|
| + t.Lock()
|
| + defer t.Unlock()
|
| +
|
| + queueName, err := t.parent.getQueueNameLocked(queueName)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + for _, task := range tasks {
|
| + cb(t.addLocked(task, queueName))
|
| + }
|
| + return nil
|
| }
|
|
|
| -func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) {
|
| - err = t.run(func() (err error) {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| - retTasks, err = multi(tasks, queueName, t.addLocked)
|
| - return
|
| - })
|
| - return
|
| +func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
|
| + return errors.New("taskqueue: cannot DeleteMulti from a transaction")
|
| +}
|
| +
|
| +func (t *taskqueueTxnImpl) Purge(string) error {
|
| + return errors.New("taskqueue: cannot Purge from a transaction")
|
| +}
|
| +
|
| +func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
|
| + return errors.New("taskqueue: cannot Stats from a transaction")
|
| }
|
|
|
| ////////////////////////////// private functions ///////////////////////////////
|
| @@ -206,49 +241,12 @@ func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
|
| return cur
|
| }
|
|
|
| -func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) {
|
| - ret := []*tq.Task(nil)
|
| - lme := errors.LazyMultiError{Size: len(tasks)}
|
| - for i, task := range tasks {
|
| - rt, err := f(task, queueName)
|
| - ret = append(ret, rt)
|
| - lme.Assign(i, err)
|
| - }
|
| - return ret, lme.Get()
|
| -}
|
| -
|
| -func dupTask(t *tq.Task) *tq.Task {
|
| - ret := &tq.Task{}
|
| - *ret = *t
|
| -
|
| - if t.Header != nil {
|
| - ret.Header = make(http.Header, len(t.Header))
|
| - for k, vs := range t.Header {
|
| - newVs := make([]string, len(vs))
|
| - copy(newVs, vs)
|
| - ret.Header[k] = newVs
|
| - }
|
| - }
|
| -
|
| - if t.Payload != nil {
|
| - ret.Payload = make([]byte, len(t.Payload))
|
| - copy(ret.Payload, t.Payload)
|
| - }
|
| -
|
| - if t.RetryOptions != nil {
|
| - ret.RetryOptions = &tq.RetryOptions{}
|
| - *ret.RetryOptions = *t.RetryOptions
|
| - }
|
| -
|
| - return ret
|
| -}
|
| -
|
| func dupQueue(q tq.QueueData) tq.QueueData {
|
| r := make(tq.QueueData, len(q))
|
| for k, q := range q {
|
| r[k] = make(map[string]*tq.Task, len(q))
|
| for tn, t := range q {
|
| - r[k][tn] = dupTask(t)
|
| + r[k][tn] = t.Duplicate()
|
| }
|
| }
|
| return r
|
|
|