OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "net/http" | |
10 "regexp" | |
11 | |
12 "golang.org/x/net/context" | |
13 | |
14 "github.com/luci/gae" | |
15 "github.com/luci/gae/dummy" | |
16 ) | |
17 | |
18 /////////////////////////////// public functions /////////////////////////////// | |
19 | |
20 func useTQ(c context.Context) context.Context { | |
21 return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { | |
22 tqd := cur(ic).Get(memContextTQIdx) | |
23 if x, ok := tqd.(*taskQueueData); ok { | |
24 return &taskqueueImpl{ | |
25 dummy.TQ(), | |
26 x, | |
27 ic, | |
28 curGID(ic).namespace, | |
29 } | |
30 } | |
31 return &taskqueueTxnImpl{ | |
32 dummy.TQ(), | |
33 tqd.(*txnTaskQueueData), | |
34 ic, | |
35 curGID(ic).namespace, | |
36 } | |
37 }) | |
38 } | |
39 | |
40 //////////////////////////////// taskqueueImpl ///////////////////////////////// | |
41 | |
42 type taskqueueImpl struct { | |
43 gae.TaskQueue | |
44 *taskQueueData | |
45 | |
46 ctx context.Context | |
47 ns string | |
48 } | |
49 | |
50 var ( | |
51 _ = gae.TaskQueue((*taskqueueImpl)(nil)) | |
52 _ = gae.TQTestable((*taskqueueImpl)(nil)) | |
53 ) | |
54 | |
55 func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
sk, error) { | |
56 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | |
57 if err != nil { | |
58 return nil, err | |
59 } | |
60 | |
61 if _, ok := t.archived[queueName][toSched.Name]; ok { | |
62 // SDK converts TOMBSTONE -> already added too | |
63 return nil, gae.ErrTQTaskAlreadyAdded | |
64 } else if _, ok := t.named[queueName][toSched.Name]; ok { | |
65 return nil, gae.ErrTQTaskAlreadyAdded | |
66 } else { | |
67 t.named[queueName][toSched.Name] = toSched | |
68 } | |
69 | |
70 return dupTask(toSched), nil | |
71 } | |
72 | |
73 func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, er
ror) { | |
74 t.Lock() | |
75 defer t.Unlock() | |
76 return t.addLocked(task, queueName) | |
77 } | |
78 | |
79 func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { | |
80 queueName, err := t.getQueueName(queueName) | |
81 if err != nil { | |
82 return err | |
83 } | |
84 | |
85 if _, ok := t.archived[queueName][task.Name]; ok { | |
86 return errors.New("TOMBSTONED_TASK") | |
87 } | |
88 | |
89 if _, ok := t.named[queueName][task.Name]; !ok { | |
90 return errors.New("UNKNOWN_TASK") | |
91 } | |
92 | |
93 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | |
94 delete(t.named[queueName], task.Name) | |
95 | |
96 return nil | |
97 } | |
98 | |
99 func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { | |
100 t.Lock() | |
101 defer t.Unlock() | |
102 return t.deleteLocked(task, queueName) | |
103 } | |
104 | |
105 func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.
TQTask, error) { | |
106 t.Lock() | |
107 defer t.Unlock() | |
108 return multi(tasks, queueName, t.addLocked) | |
109 } | |
110 | |
111 func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error
{ | |
112 t.Lock() | |
113 defer t.Unlock() | |
114 | |
115 _, err := multi(tasks, queueName, | |
116 func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { | |
117 return nil, t.deleteLocked(tsk, qn) | |
118 }) | |
119 return err | |
120 } | |
121 | |
122 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | |
123 | |
124 type taskqueueTxnImpl struct { | |
125 gae.TaskQueue | |
126 *txnTaskQueueData | |
127 | |
128 ctx context.Context | |
129 ns string | |
130 } | |
131 | |
132 var _ interface { | |
133 gae.TaskQueue | |
134 gae.TQTestable | |
135 } = (*taskqueueTxnImpl)(nil) | |
136 | |
137 func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
QTask, error) { | |
138 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) | |
139 if err != nil { | |
140 return nil, err | |
141 } | |
142 | |
143 numTasks := 0 | |
144 for _, vs := range t.anony { | |
145 numTasks += len(vs) | |
146 } | |
147 if numTasks+1 > 5 { | |
148 // transactional tasks are actually implemented 'for real' as Ac
tions which | |
149 // ride on the datastore. The current datastore implementation o
nly allows | |
150 // a maximum of 5 Actions per transaction, and more than that re
sult in a | |
151 // BAD_REQUEST. | |
152 return nil, errors.New("BAD_REQUEST") | |
153 } | |
154 | |
155 t.anony[queueName] = append(t.anony[queueName], toSched) | |
156 | |
157 // the fact that we have generated a unique name for this task queue ite
m is | |
158 // an implementation detail. | |
159 // TODO(riannucci): now that I think about this... it may not actually b
e true. | |
160 // We should verify that the .Name for a task added in a tr
ansaction is | |
161 // meaningless. Maybe names generated in a transaction are
somehow | |
162 // guaranteed to be meaningful? | |
163 toRet := dupTask(toSched) | |
164 toRet.Name = "" | |
165 | |
166 return toRet, nil | |
167 } | |
168 | |
169 func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae
.TQTask, err error) { | |
170 err = t.run(func() (err error) { | |
171 t.Lock() | |
172 defer t.Unlock() | |
173 retTask, err = t.addLocked(task, queueName) | |
174 return | |
175 }) | |
176 return | |
177 } | |
178 | |
179 func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retT
asks []*gae.TQTask, err error) { | |
180 err = t.run(func() (err error) { | |
181 t.Lock() | |
182 defer t.Unlock() | |
183 retTasks, err = multi(tasks, queueName, t.addLocked) | |
184 return | |
185 }) | |
186 return | |
187 } | |
188 | |
189 ////////////////////////////// private functions /////////////////////////////// | |
190 | |
191 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | |
192 | |
193 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | |
194 | |
195 func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string
{ | |
196 _, ok := queue[cur] | |
197 for !ok && cur == "" { | |
198 name := [500]byte{} | |
199 for i := 0; i < 500; i++ { | |
200 name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(val
idTaskChars))] | |
201 } | |
202 cur = string(name[:]) | |
203 _, ok = queue[cur] | |
204 } | |
205 return cur | |
206 } | |
207 | |
208 func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*
gae.TQTask, error)) ([]*gae.TQTask, error) { | |
209 ret := []*gae.TQTask(nil) | |
210 lme := gae.LazyMultiError{Size: len(tasks)} | |
211 for i, task := range tasks { | |
212 rt, err := f(task, queueName) | |
213 ret = append(ret, rt) | |
214 lme.Assign(i, err) | |
215 } | |
216 return ret, lme.Get() | |
217 } | |
218 | |
219 func dupTask(t *gae.TQTask) *gae.TQTask { | |
220 ret := &gae.TQTask{} | |
221 *ret = *t | |
222 | |
223 if t.Header != nil { | |
224 ret.Header = make(http.Header, len(t.Header)) | |
225 for k, vs := range t.Header { | |
226 newVs := make([]string, len(vs)) | |
227 copy(newVs, vs) | |
228 ret.Header[k] = newVs | |
229 } | |
230 } | |
231 | |
232 if t.Payload != nil { | |
233 ret.Payload = make([]byte, len(t.Payload)) | |
234 copy(ret.Payload, t.Payload) | |
235 } | |
236 | |
237 if t.RetryOptions != nil { | |
238 ret.RetryOptions = &gae.TQRetryOptions{} | |
239 *ret.RetryOptions = *t.RetryOptions | |
240 } | |
241 | |
242 return ret | |
243 } | |
244 | |
245 func dupQueue(q gae.QueueData) gae.QueueData { | |
246 r := make(gae.QueueData, len(q)) | |
247 for k, q := range q { | |
248 r[k] = make(map[string]*gae.TQTask, len(q)) | |
249 for tn, t := range q { | |
250 r[k][tn] = dupTask(t) | |
251 } | |
252 } | |
253 return r | |
254 } | |
OLD | NEW |