OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "fmt" | |
9 "golang.org/x/net/context" | |
10 "math/rand" | |
11 "net/http" | |
12 "regexp" | |
13 "time" | |
14 | |
15 "appengine" | |
16 "appengine/taskqueue" | |
17 "appengine_internal" | |
18 dbpb "appengine_internal/datastore" | |
19 pb "appengine_internal/taskqueue" | |
20 | |
21 "infra/gae/libs/wrapper" | |
22 ) | |
23 | |
24 /////////////////////////////// public functions /////////////////////////////// | |
25 | |
26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible | |
27 // by wrapper.GetTQ(c) | |
28 func UseTQ(c context.Context) context.Context { | |
29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { | |
30 tqd := cur(ic).Get("TQ") | |
31 var ret interface { | |
32 wrapper.TQTestable | |
33 wrapper.TaskQueue | |
34 } | |
35 switch x := tqd.(type) { | |
36 case *taskQueueData: | |
37 ret = &taskqueueImpl{ | |
38 wrapper.DummyTQ(), | |
39 x, | |
40 curGID(ic).namespace, | |
41 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
42 wrapper.GetMathRand(ic), | |
43 } | |
44 | |
45 case *txnTaskQueueData: | |
46 ret = &taskqueueTxnImpl{ | |
47 wrapper.DummyTQ(), | |
48 x, | |
49 curGID(ic).namespace, | |
50 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
51 wrapper.GetMathRand(ic), | |
52 } | |
53 | |
54 default: | |
55 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | |
56 } | |
57 return ret | |
58 }) | |
59 } | |
60 | |
61 //////////////////////////////// taskqueueImpl ///////////////////////////////// | |
62 | |
63 type taskqueueImpl struct { | |
64 wrapper.TaskQueue | |
65 *taskQueueData | |
66 | |
67 ns string | |
68 timeNow func() time.Time | |
69 mathRand *rand.Rand | |
70 } | |
71 | |
72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { | |
73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) | |
74 if err != nil { | |
75 return nil, err | |
76 } | |
77 | |
78 if _, ok := t.archived[queueName][toSched.Name]; ok { | |
79 // SDK converts TOMBSTONE -> already added too | |
80 return nil, taskqueue.ErrTaskAlreadyAdded | |
81 } else if _, ok := t.named[queueName][toSched.Name]; ok { | |
82 return nil, taskqueue.ErrTaskAlreadyAdded | |
83 } else { | |
84 t.named[queueName][toSched.Name] = toSched | |
85 } | |
86 | |
87 return dupTask(toSched), nil | |
88 } | |
89 | |
90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) { | |
91 err := t.IsBroken() | |
92 if err != nil { | |
93 return nil, err | |
94 } | |
95 | |
96 t.Lock() | |
97 defer t.Unlock() | |
98 | |
99 return t.addLocked(task, queueName) | |
100 } | |
101 | |
102 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or { | |
103 queueName, err := t.getQueueName(queueName) | |
104 if err != nil { | |
105 return err | |
106 } | |
107 | |
108 if _, ok := t.archived[queueName][task.Name]; ok { | |
109 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) | |
110 } | |
111 | |
112 if _, ok := t.named[queueName][task.Name]; !ok { | |
113 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) | |
114 } | |
115 | |
116 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | |
117 delete(t.named[queueName], task.Name) | |
118 | |
119 return nil | |
120 } | |
121 | |
122 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { | |
123 err := t.IsBroken() | |
124 if err != nil { | |
125 return err | |
126 } | |
127 | |
128 t.Lock() | |
129 defer t.Unlock() | |
130 | |
131 return t.deleteLocked(task, queueName) | |
132 } | |
133 | |
134 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) { | |
135 err := t.IsBroken() | |
136 if err != nil { | |
137 return nil, err | |
138 } | |
139 | |
140 return multi(tasks, queueName, t.addLocked) | |
141 } | |
142 | |
143 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror { | |
144 err := t.IsBroken() | |
145 if err != nil { | |
146 return err | |
147 } | |
148 | |
149 t.Lock() | |
150 defer t.Unlock() | |
151 | |
152 _, err = multi(tasks, queueName, | |
153 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { | |
154 return nil, t.deleteLocked(tsk, qn) | |
155 }) | |
156 return err | |
157 } | |
158 | |
159 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | |
160 | |
161 type taskqueueTxnImpl struct { | |
162 wrapper.TaskQueue | |
163 *txnTaskQueueData | |
164 | |
165 ns string | |
166 timeNow func() time.Time | |
167 mathRand *rand.Rand | |
M-A Ruel
2015/05/25 18:21:10
Why not always create its own rand with Seet(0) ?
iannucci
2015/05/27 19:33:32
So that the test writer can control it
| |
168 } | |
169 | |
170 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { | |
171 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) | |
172 if err != nil { | |
173 return nil, err | |
174 } | |
175 | |
176 numTasks := 0 | |
177 for _, vs := range t.anony { | |
178 numTasks += len(vs) | |
179 } | |
180 if numTasks+1 > 5 { | |
181 // transactional tasks are actually implemented 'for real' as Ac tions which | |
182 // ride on the datastore. The current datastore implementation o nly allows | |
183 // a maximum of 5 Actions per transaction, and more than that re sult in a | |
184 // BAD_REQUEST. | |
185 return nil, newDSError(dbpb.Error_BAD_REQUEST) | |
186 } | |
187 | |
188 t.anony[queueName] = append(t.anony[queueName], toSched) | |
189 | |
190 // the fact that we have generated a unique name for this task queue ite m is | |
191 // an implementation detail. | |
192 // TODO(riannucci): now that I think about this... it may not actually b e true. | |
193 // We should verify that the .Name for a task added in a tr ansaction is | |
194 // meaningless. Maybe names generated in a transaction are somehow | |
195 // guaranteed to be meaningful? | |
196 toRet := dupTask(toSched) | |
197 toRet.Name = "" | |
198 | |
199 return toRet, nil | |
200 } | |
201 | |
202 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) { | |
203 err := t.IsBroken() | |
204 if err != nil { | |
205 return nil, err | |
206 } | |
207 | |
208 t.Lock() | |
209 defer t.Unlock() | |
210 | |
211 return t.addLocked(task, queueName) | |
212 } | |
213 | |
214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) { | |
215 err := t.IsBroken() | |
216 if err != nil { | |
217 return nil, err | |
218 } | |
219 | |
220 t.Lock() | |
221 defer t.Unlock() | |
222 | |
223 return multi(tasks, queueName, t.addLocked) | |
224 } | |
225 | |
226 ////////////////////////////// private functions /////////////////////////////// | |
227 | |
228 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | |
229 | |
230 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" | |
231 | |
232 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string { | |
233 _, ok := queue[cur] | |
234 for !ok && cur == "" { | |
235 name := [500]byte{} | |
236 for i := 0; i < 500; i++ { | |
M-A Ruel
2015/05/25 18:21:10
Always 500 is excessive IMHO
iannucci
2015/05/27 19:33:32
That's the actual name limit.
| |
237 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] | |
238 } | |
239 cur = string(name[:]) | |
240 _, ok = queue[cur] | |
241 } | |
242 return cur | |
243 } | |
244 | |
245 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error { | |
246 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)} | |
247 } | |
248 | |
249 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { | |
250 ret := []*taskqueue.Task(nil) | |
251 me := appengine.MultiError(nil) | |
252 foundErr := false | |
253 for _, task := range tasks { | |
254 rt, err := f(task, queueName) | |
255 ret = append(ret, rt) | |
256 me = append(me, err) | |
257 if err != nil { | |
258 foundErr = true | |
259 } | |
260 } | |
261 if !foundErr { | |
262 me = nil | |
263 } | |
264 return ret, me | |
265 } | |
266 | |
267 func dupTask(t *taskqueue.Task) *taskqueue.Task { | |
268 ret := &taskqueue.Task{} | |
269 *ret = *t | |
270 | |
271 if t.Header != nil { | |
272 ret.Header = make(http.Header, len(t.Header)) | |
273 for k, vs := range t.Header { | |
274 newVs := make([]string, len(vs)) | |
275 copy(newVs, vs) | |
276 ret.Header[k] = newVs | |
277 } | |
278 } | |
279 | |
280 if t.Payload != nil { | |
281 ret.Payload = make([]byte, len(t.Payload)) | |
282 copy(ret.Payload, t.Payload) | |
283 } | |
284 | |
285 if t.RetryOptions != nil { | |
286 ret.RetryOptions = &taskqueue.RetryOptions{} | |
287 *ret.RetryOptions = *t.RetryOptions | |
288 } | |
289 | |
290 return ret | |
291 } | |
292 | |
293 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | |
294 r := make(wrapper.QueueData, len(q)) | |
295 for k, q := range q { | |
296 r[k] = make(map[string]*taskqueue.Task, len(q)) | |
297 for tn, t := range q { | |
298 r[k][tn] = dupTask(t) | |
299 } | |
300 } | |
301 return r | |
302 } | |
OLD | NEW |