OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "infra/gae/libs/wrapper" | 9 "infra/gae/libs/wrapper" |
10 "infra/libs/clock" | |
10 "math/rand" | 11 "math/rand" |
11 "net/http" | 12 "net/http" |
12 "regexp" | 13 "regexp" |
13 "time" | |
14 | 14 |
15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
16 | 16 |
17 "appengine" | 17 "appengine" |
18 "appengine/taskqueue" | 18 "appengine/taskqueue" |
19 "appengine_internal" | 19 "appengine_internal" |
20 dbpb "appengine_internal/datastore" | 20 dbpb "appengine_internal/datastore" |
21 pb "appengine_internal/taskqueue" | 21 pb "appengine_internal/taskqueue" |
22 ) | 22 ) |
23 | 23 |
24 /////////////////////////////// public functions /////////////////////////////// | 24 /////////////////////////////// public functions /////////////////////////////// |
25 | 25 |
26 func useTQ(c context.Context) context.Context { | 26 func useTQ(c context.Context) context.Context { |
27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { | 27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { |
28 tqd := cur(ic).Get(memContextTQIdx) | 28 tqd := cur(ic).Get(memContextTQIdx) |
29 var ret interface { | 29 var ret interface { |
30 wrapper.TQTestable | 30 wrapper.TQTestable |
31 wrapper.TaskQueue | 31 wrapper.TaskQueue |
32 } | 32 } |
33 switch x := tqd.(type) { | 33 switch x := tqd.(type) { |
34 case *taskQueueData: | 34 case *taskQueueData: |
35 ret = &taskqueueImpl{ | 35 ret = &taskqueueImpl{ |
36 wrapper.DummyTQ(), | 36 wrapper.DummyTQ(), |
37 x, | 37 x, |
38 ic, | |
38 curGID(ic).namespace, | 39 curGID(ic).namespace, |
39 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
40 wrapper.GetMathRand(ic), | 40 wrapper.GetMathRand(ic), |
41 } | 41 } |
42 | 42 |
43 case *txnTaskQueueData: | 43 case *txnTaskQueueData: |
44 ret = &taskqueueTxnImpl{ | 44 ret = &taskqueueTxnImpl{ |
45 wrapper.DummyTQ(), | 45 wrapper.DummyTQ(), |
46 x, | 46 x, |
47 ic, | |
47 curGID(ic).namespace, | 48 curGID(ic).namespace, |
48 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
49 wrapper.GetMathRand(ic), | 49 wrapper.GetMathRand(ic), |
50 } | 50 } |
51 | 51 |
52 default: | 52 default: |
53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | 53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
54 } | 54 } |
55 return ret | 55 return ret |
56 }) | 56 }) |
57 } | 57 } |
58 | 58 |
59 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 59 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
60 | 60 |
61 type taskqueueImpl struct { | 61 type taskqueueImpl struct { |
62 wrapper.TaskQueue | 62 wrapper.TaskQueue |
63 *taskQueueData | 63 *taskQueueData |
64 | 64 |
65 ctx context.Context | |
65 ns string | 66 ns string |
66 timeNow func() time.Time | |
67 mathRand *rand.Rand | 67 mathRand *rand.Rand |
iannucci
2015/06/03 17:37:19
could we pull out mathRand the same way too? (e.g.
dnj
2015/06/03 18:21:26
Done.
| |
68 } | 68 } |
69 | 69 |
70 var ( | 70 var ( |
71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) | 71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) | 72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
73 ) | 73 ) |
74 | 74 |
75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { | 75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { |
76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) | 76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, clock.Now(t .ctx), t.mathRand) |
77 if err != nil { | 77 if err != nil { |
78 return nil, err | 78 return nil, err |
79 } | 79 } |
80 | 80 |
81 if _, ok := t.archived[queueName][toSched.Name]; ok { | 81 if _, ok := t.archived[queueName][toSched.Name]; ok { |
82 // SDK converts TOMBSTONE -> already added too | 82 // SDK converts TOMBSTONE -> already added too |
83 return nil, taskqueue.ErrTaskAlreadyAdded | 83 return nil, taskqueue.ErrTaskAlreadyAdded |
84 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 84 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
85 return nil, taskqueue.ErrTaskAlreadyAdded | 85 return nil, taskqueue.ErrTaskAlreadyAdded |
86 } else { | 86 } else { |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
157 }) | 157 }) |
158 return err | 158 return err |
159 } | 159 } |
160 | 160 |
161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
162 | 162 |
163 type taskqueueTxnImpl struct { | 163 type taskqueueTxnImpl struct { |
164 wrapper.TaskQueue | 164 wrapper.TaskQueue |
165 *txnTaskQueueData | 165 *txnTaskQueueData |
166 | 166 |
167 ctx context.Context | |
167 ns string | 168 ns string |
168 timeNow func() time.Time | |
169 mathRand *rand.Rand | 169 mathRand *rand.Rand |
170 } | 170 } |
171 | 171 |
172 var ( | 172 var ( |
173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) | 173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) | 174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
175 ) | 175 ) |
176 | 176 |
177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { | 177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { |
178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) | 178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, cloc k.Now(t.ctx), t.mathRand) |
179 if err != nil { | 179 if err != nil { |
180 return nil, err | 180 return nil, err |
181 } | 181 } |
182 | 182 |
183 numTasks := 0 | 183 numTasks := 0 |
184 for _, vs := range t.anony { | 184 for _, vs := range t.anony { |
185 numTasks += len(vs) | 185 numTasks += len(vs) |
186 } | 186 } |
187 if numTasks+1 > 5 { | 187 if numTasks+1 > 5 { |
188 // transactional tasks are actually implemented 'for real' as Ac tions which | 188 // transactional tasks are actually implemented 'for real' as Ac tions which |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | 298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
299 r := make(wrapper.QueueData, len(q)) | 299 r := make(wrapper.QueueData, len(q)) |
300 for k, q := range q { | 300 for k, q := range q { |
301 r[k] = make(map[string]*taskqueue.Task, len(q)) | 301 r[k] = make(map[string]*taskqueue.Task, len(q)) |
302 for tn, t := range q { | 302 for tn, t := range q { |
303 r[k][tn] = dupTask(t) | 303 r[k][tn] = dupTask(t) |
304 } | 304 } |
305 } | 305 } |
306 return r | 306 return r |
307 } | 307 } |
OLD | NEW |