| Index: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
|
| diff --git a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
|
| index 81a2978fe504f0074c04023dc413da9bd3fe0552..2d23b1d803ee3959a480d823cbd86364a53be40a 100644
|
| --- a/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
|
| +++ b/go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go
|
| @@ -8,15 +8,15 @@ import (
|
| "errors"
|
| "fmt"
|
| "infra/gae/libs/wrapper"
|
| - "math/rand"
|
| "net/http"
|
| "sync"
|
| "sync/atomic"
|
| - "time"
|
|
|
| "appengine/datastore"
|
| "appengine/taskqueue"
|
| pb "appengine_internal/taskqueue"
|
| + "golang.org/x/net/context"
|
| + "infra/libs/clock"
|
| )
|
|
|
| var (
|
| @@ -50,11 +50,11 @@ func newTaskQueueData() memContextObj {
|
|
|
| func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
|
| func (t *taskQueueData) endTxn() {}
|
| -func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) {
|
| +func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
|
| txn := obj.(*txnTaskQueueData)
|
| for qn, tasks := range txn.anony {
|
| for _, tsk := range tasks {
|
| - tsk.Name = mkName(rnd, tsk.Name, t.named[qn])
|
| + tsk.Name = mkName(c, tsk.Name, t.named[qn])
|
| t.named[qn][tsk.Name] = tsk
|
| }
|
| }
|
| @@ -121,7 +121,8 @@ func (t *taskQueueData) getQueueName(queueName string) (string, error) {
|
| return queueName, nil
|
| }
|
|
|
| -func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName string, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) {
|
| +func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.Task, queueName string) (
|
| + *taskqueue.Task, string, error) {
|
| queueName, err := t.getQueueName(queueName)
|
| if err != nil {
|
| return nil, "", err
|
| @@ -134,7 +135,7 @@ func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri
|
| }
|
|
|
| if toSched.ETA.IsZero() {
|
| - toSched.ETA = now.Add(toSched.Delay)
|
| + toSched.ETA = clock.Now(c).Add(toSched.Delay)
|
| } else if toSched.Delay != 0 {
|
| panic("taskqueue: both Delay and ETA are set")
|
| }
|
| @@ -161,7 +162,7 @@ func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri
|
| // TODO(riannucci): implement DefaultNamespace
|
|
|
| if toSched.Name == "" {
|
| - toSched.Name = mkName(rnd, "", t.named[queueName])
|
| + toSched.Name = mkName(c, "", t.named[queueName])
|
| } else {
|
| if !validTaskName.MatchString(toSched.Name) {
|
| return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_TASK_NAME)
|
| @@ -191,7 +192,7 @@ var (
|
|
|
| func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
|
|
|
| -func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) {
|
| +func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
|
| panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
|
| }
|
|
|
|
|