Chromium Code Reviews| Index: impl/memory/taskqueue.go |
| diff --git a/impl/memory/taskqueue.go b/impl/memory/taskqueue.go |
| index 7b09af1a6602857a1836155a73c180395ae12ece..f8237678095109749f4935394153891abbaf2f7b 100644 |
| --- a/impl/memory/taskqueue.go |
| +++ b/impl/memory/taskqueue.go |
| @@ -5,12 +5,11 @@ |
| package memory |
| import ( |
| - "net/http" |
| "regexp" |
| + "sync/atomic" |
| "golang.org/x/net/context" |
| - "github.com/luci/gae/impl/dummy" |
| tq "github.com/luci/gae/service/taskqueue" |
| "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/mathrand" |
| @@ -19,18 +18,16 @@ import ( |
| /////////////////////////////// public functions /////////////////////////////// |
| func useTQ(c context.Context) context.Context { |
| - return tq.SetFactory(c, func(ic context.Context) tq.Interface { |
| + return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface { |
| tqd := cur(ic).Get(memContextTQIdx) |
| if x, ok := tqd.(*taskQueueData); ok { |
| return &taskqueueImpl{ |
| - dummy.TaskQueue(), |
| x, |
| ic, |
| curGID(ic).namespace, |
| } |
| } |
| return &taskqueueTxnImpl{ |
| - dummy.TaskQueue(), |
| tqd.(*txnTaskQueueData), |
| ic, |
| curGID(ic).namespace, |
| @@ -41,7 +38,6 @@ func useTQ(c context.Context) context.Context { |
| //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| type taskqueueImpl struct { |
| - tq.Interface |
| *taskQueueData |
| ctx context.Context |
| @@ -49,12 +45,12 @@ type taskqueueImpl struct { |
| } |
| var ( |
| - _ = tq.Interface((*taskqueueImpl)(nil)) |
| + _ = tq.RawInterface((*taskqueueImpl)(nil)) |
| _ = tq.Testable((*taskqueueImpl)(nil)) |
| ) |
| func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
| - toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
| + toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) |
| if err != nil { |
| return nil, err |
| } |
| @@ -68,21 +64,10 @@ func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er |
| t.named[queueName][toSched.Name] = toSched |
| } |
| - return dupTask(toSched), nil |
| -} |
| - |
| -func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
| - t.Lock() |
| - defer t.Unlock() |
| - return t.addLocked(task, queueName) |
| + return toSched.Duplicate(), nil |
| } |
| func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
| - queueName, err := t.getQueueName(queueName) |
| - if err != nil { |
| - return err |
| - } |
| - |
| if _, ok := t.archived[queueName][task.Name]; ok { |
| return errors.New("TOMBSTONED_TASK") |
| } |
| @@ -97,33 +82,79 @@ func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
| return nil |
| } |
| -func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { |
| +func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error { |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + queueName, err := t.getQueueName(queueName) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + for _, task := range tasks { |
| + cb(t.addLocked(task, queueName)) |
| + } |
| + return nil |
| +} |
| + |
| +func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) error { |
| t.Lock() |
| defer t.Unlock() |
| - return t.deleteLocked(task, queueName) |
| + |
| + queueName, err := t.getQueueName(queueName) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + for _, task := range tasks { |
| + cb(t.deleteLocked(task, queueName)) |
|
dnj
2015/08/03 22:37:25
Consider calling "deleteLocked" in the loop, aggre
iannucci
2015/08/04 01:21:21
yeah mumble. I ended up doing a thing for all of t
|
| + } |
| + return nil |
| } |
| -func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error) { |
| +func (t *taskqueueImpl) Purge(queueName string) error { |
| t.Lock() |
| defer t.Unlock() |
| - return multi(tasks, queueName, t.addLocked) |
| + |
| + queueName, err := t.getQueueName(queueName) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + t.named[queueName] = map[string]*tq.Task{} |
|
dnj
2015/08/03 22:37:25
Consider making this a "taskQueueData" method: res
iannucci
2015/08/04 01:21:21
Done.
|
| + t.archived[queueName] = map[string]*tq.Task{} |
| + return nil |
| } |
| -func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
| +func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { |
| t.Lock() |
| defer t.Unlock() |
| - _, err := multi(tasks, queueName, |
| - func(tsk *tq.Task, qn string) (*tq.Task, error) { |
| - return nil, t.deleteLocked(tsk, qn) |
| - }) |
| - return err |
| + for _, qn := range queueNames { |
| + qn, err := t.getQueueName(qn) |
| + if err != nil { |
| + cb(nil, err) |
| + } else { |
| + s := tq.Statistics{ |
| + Tasks: len(t.named[qn]), |
| + } |
| + for _, t := range t.named[qn] { |
| + if s.OldestETA.IsZero() { |
| + s.OldestETA = t.ETA |
| + } else if t.ETA.Before(s.OldestETA) { |
| + s.OldestETA = t.ETA |
| + } |
| + } |
| + cb(&s, nil) |
| + } |
| + } |
| + |
| + return nil |
| } |
| /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| type taskqueueTxnImpl struct { |
| - tq.Interface |
| *txnTaskQueueData |
| ctx context.Context |
| @@ -131,12 +162,12 @@ type taskqueueTxnImpl struct { |
| } |
| var _ interface { |
| - tq.Interface |
| + tq.RawInterface |
| tq.Testable |
| } = (*taskqueueTxnImpl)(nil) |
| func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
| - toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
| + toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
| if err != nil { |
| return nil, err |
| } |
| @@ -161,30 +192,41 @@ func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, |
| // We should verify that the .Name for a task added in a transaction is |
| // meaningless. Maybe names generated in a transaction are somehow |
| // guaranteed to be meaningful? |
| - toRet := dupTask(toSched) |
| + toRet := toSched.Duplicate() |
| toRet.Name = "" |
| return toRet, nil |
| } |
| -func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Task, err error) { |
| - err = t.run(func() (err error) { |
| - t.Lock() |
| - defer t.Unlock() |
| - retTask, err = t.addLocked(task, queueName) |
| - return |
| - }) |
| - return |
| +func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error { |
| + if atomic.LoadInt32(&t.closed) == 1 { |
| + return errors.New("taskqueue: transaction context has expired") |
| + } |
| + |
| + t.Lock() |
| + defer t.Unlock() |
| + |
| + queueName, err := t.parent.getQueueName(queueName) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + for _, task := range tasks { |
| + cb(t.addLocked(task, queueName)) |
| + } |
| + return nil |
| } |
| -func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTasks []*tq.Task, err error) { |
| - err = t.run(func() (err error) { |
| - t.Lock() |
| - defer t.Unlock() |
| - retTasks, err = multi(tasks, queueName, t.addLocked) |
| - return |
| - }) |
| - return |
| +func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { |
| + return errors.New("taskqueue: cannot DeleteMulti from a transaction") |
| +} |
| + |
| +func (t *taskqueueTxnImpl) Purge(string) error { |
| + return errors.New("taskqueue: cannot Purge from a transaction") |
| +} |
| + |
| +func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { |
| + return errors.New("taskqueue: cannot Stats from a transaction") |
| } |
| ////////////////////////////// private functions /////////////////////////////// |
| @@ -206,49 +248,12 @@ func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
| return cur |
| } |
| -func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Task, error)) ([]*tq.Task, error) { |
| - ret := []*tq.Task(nil) |
| - lme := errors.LazyMultiError{Size: len(tasks)} |
| - for i, task := range tasks { |
| - rt, err := f(task, queueName) |
| - ret = append(ret, rt) |
| - lme.Assign(i, err) |
| - } |
| - return ret, lme.Get() |
| -} |
| - |
| -func dupTask(t *tq.Task) *tq.Task { |
| - ret := &tq.Task{} |
| - *ret = *t |
| - |
| - if t.Header != nil { |
| - ret.Header = make(http.Header, len(t.Header)) |
| - for k, vs := range t.Header { |
| - newVs := make([]string, len(vs)) |
| - copy(newVs, vs) |
| - ret.Header[k] = newVs |
| - } |
| - } |
| - |
| - if t.Payload != nil { |
| - ret.Payload = make([]byte, len(t.Payload)) |
| - copy(ret.Payload, t.Payload) |
| - } |
| - |
| - if t.RetryOptions != nil { |
| - ret.RetryOptions = &tq.RetryOptions{} |
| - *ret.RetryOptions = *t.RetryOptions |
| - } |
| - |
| - return ret |
| -} |
| - |
| func dupQueue(q tq.QueueData) tq.QueueData { |
| r := make(tq.QueueData, len(q)) |
| for k, q := range q { |
| r[k] = make(map[string]*tq.Task, len(q)) |
| for tn, t := range q { |
| - r[k][tn] = dupTask(t) |
| + r[k][tn] = t.Duplicate() |
| } |
| } |
| return r |