Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1154213012: Add context-aware "time" library wrapper. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Removed goroutine safety from testtimer. Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "infra/gae/libs/wrapper" 9 "infra/gae/libs/wrapper"
10 "infra/libs/clock"
10 "math/rand" 11 "math/rand"
11 "net/http" 12 "net/http"
12 "regexp" 13 "regexp"
13 "time"
14 14
15 "golang.org/x/net/context" 15 "golang.org/x/net/context"
16 16
17 "appengine" 17 "appengine"
18 "appengine/taskqueue" 18 "appengine/taskqueue"
19 "appengine_internal" 19 "appengine_internal"
20 dbpb "appengine_internal/datastore" 20 dbpb "appengine_internal/datastore"
21 pb "appengine_internal/taskqueue" 21 pb "appengine_internal/taskqueue"
22 ) 22 )
23 23
24 /////////////////////////////// public functions /////////////////////////////// 24 /////////////////////////////// public functions ///////////////////////////////
25 25
26 func useTQ(c context.Context) context.Context { 26 func useTQ(c context.Context) context.Context {
27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { 27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
28 tqd := cur(ic).Get(memContextTQIdx) 28 tqd := cur(ic).Get(memContextTQIdx)
29 var ret interface { 29 var ret interface {
30 wrapper.TQTestable 30 wrapper.TQTestable
31 wrapper.TaskQueue 31 wrapper.TaskQueue
32 } 32 }
33 switch x := tqd.(type) { 33 switch x := tqd.(type) {
34 case *taskQueueData: 34 case *taskQueueData:
35 ret = &taskqueueImpl{ 35 ret = &taskqueueImpl{
36 wrapper.DummyTQ(), 36 wrapper.DummyTQ(),
37 x, 37 x,
38 ic,
38 curGID(ic).namespace, 39 curGID(ic).namespace,
39 func() time.Time { return wrapper.GetTimeNow(ic) },
40 wrapper.GetMathRand(ic), 40 wrapper.GetMathRand(ic),
41 } 41 }
42 42
43 case *txnTaskQueueData: 43 case *txnTaskQueueData:
44 ret = &taskqueueTxnImpl{ 44 ret = &taskqueueTxnImpl{
45 wrapper.DummyTQ(), 45 wrapper.DummyTQ(),
46 x, 46 x,
47 ic,
47 curGID(ic).namespace, 48 curGID(ic).namespace,
48 func() time.Time { return wrapper.GetTimeNow(ic) },
49 wrapper.GetMathRand(ic), 49 wrapper.GetMathRand(ic),
50 } 50 }
51 51
52 default: 52 default:
53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) 53 panic(fmt.Errorf("TQ: bad type: %v", tqd))
54 } 54 }
55 return ret 55 return ret
56 }) 56 })
57 } 57 }
58 58
59 //////////////////////////////// taskqueueImpl ///////////////////////////////// 59 //////////////////////////////// taskqueueImpl /////////////////////////////////
60 60
61 type taskqueueImpl struct { 61 type taskqueueImpl struct {
62 wrapper.TaskQueue 62 wrapper.TaskQueue
63 *taskQueueData 63 *taskQueueData
64 64
65 ctx context.Context
65 ns string 66 ns string
66 timeNow func() time.Time
67 mathRand *rand.Rand 67 mathRand *rand.Rand
iannucci 2015/06/03 17:37:19 could we pull out mathRand the same way too? (e.g.
dnj 2015/06/03 18:21:26 Done.
68 } 68 }
69 69
70 var ( 70 var (
71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) 71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) 72 _ = wrapper.TQTestable((*taskqueueImpl)(nil))
73 ) 73 )
74 74
75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { 75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) 76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, clock.Now(t .ctx), t.mathRand)
77 if err != nil { 77 if err != nil {
78 return nil, err 78 return nil, err
79 } 79 }
80 80
81 if _, ok := t.archived[queueName][toSched.Name]; ok { 81 if _, ok := t.archived[queueName][toSched.Name]; ok {
82 // SDK converts TOMBSTONE -> already added too 82 // SDK converts TOMBSTONE -> already added too
83 return nil, taskqueue.ErrTaskAlreadyAdded 83 return nil, taskqueue.ErrTaskAlreadyAdded
84 } else if _, ok := t.named[queueName][toSched.Name]; ok { 84 } else if _, ok := t.named[queueName][toSched.Name]; ok {
85 return nil, taskqueue.ErrTaskAlreadyAdded 85 return nil, taskqueue.ErrTaskAlreadyAdded
86 } else { 86 } else {
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
157 }) 157 })
158 return err 158 return err
159 } 159 }
160 160
161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 161 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
162 162
163 type taskqueueTxnImpl struct { 163 type taskqueueTxnImpl struct {
164 wrapper.TaskQueue 164 wrapper.TaskQueue
165 *txnTaskQueueData 165 *txnTaskQueueData
166 166
167 ctx context.Context
167 ns string 168 ns string
168 timeNow func() time.Time
169 mathRand *rand.Rand 169 mathRand *rand.Rand
170 } 170 }
171 171
172 var ( 172 var (
173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) 173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) 174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
175 ) 175 )
176 176
177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { 177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) 178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, cloc k.Now(t.ctx), t.mathRand)
179 if err != nil { 179 if err != nil {
180 return nil, err 180 return nil, err
181 } 181 }
182 182
183 numTasks := 0 183 numTasks := 0
184 for _, vs := range t.anony { 184 for _, vs := range t.anony {
185 numTasks += len(vs) 185 numTasks += len(vs)
186 } 186 }
187 if numTasks+1 > 5 { 187 if numTasks+1 > 5 {
188 // transactional tasks are actually implemented 'for real' as Ac tions which 188 // transactional tasks are actually implemented 'for real' as Ac tions which
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { 298 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
299 r := make(wrapper.QueueData, len(q)) 299 r := make(wrapper.QueueData, len(q))
300 for k, q := range q { 300 for k, q := range q {
301 r[k] = make(map[string]*taskqueue.Task, len(q)) 301 r[k] = make(map[string]*taskqueue.Task, len(q))
302 for tn, t := range q { 302 for tn, t := range q {
303 r[k][tn] = dupTask(t) 303 r[k][tn] = dupTask(t)
304 } 304 }
305 } 305 }
306 return r 306 return r
307 } 307 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698