OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "fmt" | |
9 "infra/gae/libs/wrapper" | |
10 "math/rand" | |
11 "net/http" | |
12 "regexp" | |
13 "time" | |
14 | |
15 "golang.org/x/net/context" | |
16 | |
17 "appengine" | |
18 "appengine/taskqueue" | |
19 "appengine_internal" | |
20 dbpb "appengine_internal/datastore" | |
21 pb "appengine_internal/taskqueue" | |
22 ) | |
23 | |
24 /////////////////////////////// public functions /////////////////////////////// | |
25 | |
26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible | |
27 // by wrapper.GetTQ(c) | |
28 func UseTQ(c context.Context) context.Context { | |
29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { | |
30 tqd := cur(ic).Get("TQ") | |
31 var ret interface { | |
32 wrapper.TQTestable | |
33 wrapper.TaskQueue | |
34 } | |
35 switch x := tqd.(type) { | |
36 case *taskQueueData: | |
37 ret = &taskqueueImpl{ | |
38 wrapper.DummyTQ(), | |
M-A Ruel
2015/05/28 22:42:39
you create a TQ everytime?
iannucci
2015/05/28 23:00:35
it returns a constant object (a named struct{}) in
| |
39 x, | |
40 curGID(ic).namespace, | |
41 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
42 wrapper.GetMathRand(ic), | |
43 } | |
44 | |
45 case *txnTaskQueueData: | |
46 ret = &taskqueueTxnImpl{ | |
47 wrapper.DummyTQ(), | |
48 x, | |
49 curGID(ic).namespace, | |
50 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
51 wrapper.GetMathRand(ic), | |
52 } | |
53 | |
54 default: | |
55 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | |
56 } | |
57 return ret | |
58 }) | |
59 } | |
60 | |
61 //////////////////////////////// taskqueueImpl ///////////////////////////////// | |
62 | |
63 type taskqueueImpl struct { | |
64 wrapper.TaskQueue | |
65 *taskQueueData | |
66 | |
67 ns string | |
68 timeNow func() time.Time | |
69 mathRand *rand.Rand | |
70 } | |
71 | |
72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { | |
73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) | |
74 if err != nil { | |
75 return nil, err | |
76 } | |
77 | |
78 if _, ok := t.archived[queueName][toSched.Name]; ok { | |
79 // SDK converts TOMBSTONE -> already added too | |
80 return nil, taskqueue.ErrTaskAlreadyAdded | |
81 } else if _, ok := t.named[queueName][toSched.Name]; ok { | |
82 return nil, taskqueue.ErrTaskAlreadyAdded | |
83 } else { | |
84 t.named[queueName][toSched.Name] = toSched | |
85 } | |
86 | |
87 return dupTask(toSched), nil | |
88 } | |
89 | |
90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) { | |
91 if err := t.IsBroken(); err != nil { | |
92 return nil, err | |
93 } | |
94 | |
95 t.Lock() | |
96 defer t.Unlock() | |
97 | |
98 return t.addLocked(task, queueName) | |
99 } | |
100 | |
101 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or { | |
102 queueName, err := t.getQueueName(queueName) | |
103 if err != nil { | |
104 return err | |
105 } | |
106 | |
107 if _, ok := t.archived[queueName][task.Name]; ok { | |
108 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) | |
109 } | |
110 | |
111 if _, ok := t.named[queueName][task.Name]; !ok { | |
112 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) | |
113 } | |
114 | |
115 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | |
116 delete(t.named[queueName], task.Name) | |
117 | |
118 return nil | |
119 } | |
120 | |
121 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { | |
122 if err := t.IsBroken(); err != nil { | |
123 return err | |
124 } | |
125 | |
126 t.Lock() | |
127 defer t.Unlock() | |
128 | |
129 return t.deleteLocked(task, queueName) | |
130 } | |
131 | |
132 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) { | |
133 if err := t.IsBroken(); err != nil { | |
134 return nil, err | |
135 } | |
136 | |
M-A Ruel
2015/05/28 22:42:39
Isn't a lock missing here?
iannucci
2015/05/28 23:00:35
indeed
| |
137 return multi(tasks, queueName, t.addLocked) | |
138 } | |
139 | |
140 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror { | |
141 if err := t.IsBroken(); err != nil { | |
142 return err | |
143 } | |
144 | |
145 t.Lock() | |
146 defer t.Unlock() | |
147 | |
148 _, err := multi(tasks, queueName, | |
149 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { | |
150 return nil, t.deleteLocked(tsk, qn) | |
151 }) | |
152 return err | |
153 } | |
154 | |
155 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | |
156 | |
157 type taskqueueTxnImpl struct { | |
158 wrapper.TaskQueue | |
159 *txnTaskQueueData | |
160 | |
161 ns string | |
162 timeNow func() time.Time | |
163 mathRand *rand.Rand | |
164 } | |
165 | |
166 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { | |
167 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) | |
168 if err != nil { | |
169 return nil, err | |
170 } | |
171 | |
172 numTasks := 0 | |
173 for _, vs := range t.anony { | |
174 numTasks += len(vs) | |
175 } | |
176 if numTasks+1 > 5 { | |
177 // transactional tasks are actually implemented 'for real' as Ac tions which | |
178 // ride on the datastore. The current datastore implementation o nly allows | |
179 // a maximum of 5 Actions per transaction, and more than that re sult in a | |
180 // BAD_REQUEST. | |
181 return nil, newDSError(dbpb.Error_BAD_REQUEST) | |
182 } | |
183 | |
184 t.anony[queueName] = append(t.anony[queueName], toSched) | |
185 | |
186 // the fact that we have generated a unique name for this task queue ite m is | |
187 // an implementation detail. | |
188 // TODO(riannucci): now that I think about this... it may not actually b e true. | |
189 // We should verify that the .Name for a task added in a tr ansaction is | |
190 // meaningless. Maybe names generated in a transaction are somehow | |
191 // guaranteed to be meaningful? | |
192 toRet := dupTask(toSched) | |
193 toRet.Name = "" | |
194 | |
195 return toRet, nil | |
196 } | |
197 | |
198 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) { | |
199 if err := t.IsBroken(); err != nil { | |
200 return nil, err | |
201 } | |
202 | |
203 t.Lock() | |
204 defer t.Unlock() | |
205 | |
206 return t.addLocked(task, queueName) | |
207 } | |
208 | |
209 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) { | |
210 if err := t.IsBroken(); err != nil { | |
211 return nil, err | |
212 } | |
213 | |
214 t.Lock() | |
215 defer t.Unlock() | |
216 | |
217 return multi(tasks, queueName, t.addLocked) | |
218 } | |
219 | |
220 ////////////////////////////// private functions /////////////////////////////// | |
221 | |
222 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | |
223 | |
224 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" | |
225 | |
226 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string { | |
227 _, ok := queue[cur] | |
228 for !ok && cur == "" { | |
229 name := [500]byte{} | |
230 for i := 0; i < 500; i++ { | |
231 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] | |
232 } | |
233 cur = string(name[:]) | |
234 _, ok = queue[cur] | |
235 } | |
236 return cur | |
237 } | |
238 | |
239 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error { | |
240 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)} | |
241 } | |
242 | |
243 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { | |
244 ret := []*taskqueue.Task(nil) | |
245 me := appengine.MultiError(nil) | |
246 foundErr := false | |
247 for _, task := range tasks { | |
248 rt, err := f(task, queueName) | |
249 ret = append(ret, rt) | |
250 me = append(me, err) | |
251 if err != nil { | |
252 foundErr = true | |
253 } | |
254 } | |
255 if !foundErr { | |
256 me = nil | |
257 } | |
258 return ret, me | |
259 } | |
260 | |
261 func dupTask(t *taskqueue.Task) *taskqueue.Task { | |
262 ret := &taskqueue.Task{} | |
263 *ret = *t | |
264 | |
265 if t.Header != nil { | |
266 ret.Header = make(http.Header, len(t.Header)) | |
267 for k, vs := range t.Header { | |
268 newVs := make([]string, len(vs)) | |
269 copy(newVs, vs) | |
270 ret.Header[k] = newVs | |
271 } | |
272 } | |
273 | |
274 if t.Payload != nil { | |
275 ret.Payload = make([]byte, len(t.Payload)) | |
276 copy(ret.Payload, t.Payload) | |
277 } | |
278 | |
279 if t.RetryOptions != nil { | |
280 ret.RetryOptions = &taskqueue.RetryOptions{} | |
281 *ret.RetryOptions = *t.RetryOptions | |
282 } | |
283 | |
284 return ret | |
285 } | |
286 | |
287 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | |
288 r := make(wrapper.QueueData, len(q)) | |
289 for k, q := range q { | |
290 r[k] = make(map[string]*taskqueue.Task, len(q)) | |
291 for tn, t := range q { | |
292 r[k][tn] = dupTask(t) | |
293 } | |
294 } | |
295 return r | |
296 } | |
OLD | NEW |