OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "regexp" | 8 "regexp" |
9 "sync/atomic" | |
10 | 9 |
11 "golang.org/x/net/context" | 10 "golang.org/x/net/context" |
12 | 11 |
13 tq "github.com/luci/gae/service/taskqueue" | 12 tq "github.com/luci/gae/service/taskqueue" |
| 13 |
14 "github.com/luci/luci-go/common/data/rand/mathrand" | 14 "github.com/luci/luci-go/common/data/rand/mathrand" |
15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
16 ) | 16 ) |
17 | 17 |
18 /////////////////////////////// public functions /////////////////////////////// | 18 /////////////////////////////// public functions /////////////////////////////// |
19 | 19 |
20 func useTQ(c context.Context) context.Context { | 20 func useTQ(c context.Context) context.Context { |
21 » return tq.SetRawFactory(c, func(ic context.Context, wantTxn bool) tq.Raw
Interface { | 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface { |
22 » » ns, _ := curGID(ic).getNamespace() | 22 » » memCtx, isTxn := cur(ic) |
23 » » var tqd memContextObj | 23 » » tqd := memCtx.Get(memContextTQIdx) |
24 | 24 |
25 » » if !wantTxn { | 25 » » if isTxn { |
26 » » » tqd = curNoTxn(ic).Get(memContextTQIdx) | 26 » » » return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic} |
27 » » } else { | |
28 » » » tqd = cur(ic).Get(memContextTQIdx) | |
29 } | 27 } |
30 | 28 » » return &taskqueueImpl{tqd.(*taskQueueData), ic} |
31 » » if x, ok := tqd.(*taskQueueData); ok { | |
32 » » » return &taskqueueImpl{x, ic, ns} | |
33 » » } | |
34 » » return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic, ns} | |
35 }) | 29 }) |
36 } | 30 } |
37 | 31 |
38 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 32 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
39 | 33 |
40 type taskqueueImpl struct { | 34 type taskqueueImpl struct { |
41 *taskQueueData | 35 *taskQueueData |
42 | 36 |
43 ctx context.Context | 37 ctx context.Context |
44 ns string | |
45 } | 38 } |
46 | 39 |
47 var ( | 40 var ( |
48 _ = tq.RawInterface((*taskqueueImpl)(nil)) | 41 _ = tq.RawInterface((*taskqueueImpl)(nil)) |
49 _ = tq.Testable((*taskqueueImpl)(nil)) | 42 _ = tq.Testable((*taskqueueImpl)(nil)) |
50 ) | 43 ) |
51 | 44 |
52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { | 45 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { |
53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) | 46 » toSched, err := t.prepTask(t.ctx, task, queueName) |
54 if err != nil { | 47 if err != nil { |
55 return nil, err | 48 return nil, err |
56 } | 49 } |
57 | 50 |
58 if _, ok := t.archived[queueName][toSched.Name]; ok { | 51 if _, ok := t.archived[queueName][toSched.Name]; ok { |
59 // SDK converts TOMBSTONE -> already added too | 52 // SDK converts TOMBSTONE -> already added too |
60 return nil, tq.ErrTaskAlreadyAdded | 53 return nil, tq.ErrTaskAlreadyAdded |
61 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 54 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
62 return nil, tq.ErrTaskAlreadyAdded | 55 return nil, tq.ErrTaskAlreadyAdded |
63 } else { | 56 } else { |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
138 s.OldestETA = t.ETA | 131 s.OldestETA = t.ETA |
139 } | 132 } |
140 } | 133 } |
141 cb(&s, nil) | 134 cb(&s, nil) |
142 } | 135 } |
143 } | 136 } |
144 | 137 |
145 return nil | 138 return nil |
146 } | 139 } |
147 | 140 |
148 func (t *taskqueueImpl) Testable() tq.Testable { | 141 func (t *taskqueueImpl) GetTestable() tq.Testable { return t } |
149 » return t | |
150 } | |
151 | 142 |
152 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 143 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
153 | 144 |
154 type taskqueueTxnImpl struct { | 145 type taskqueueTxnImpl struct { |
155 *txnTaskQueueData | 146 *txnTaskQueueData |
156 | 147 |
157 ctx context.Context | 148 ctx context.Context |
158 ns string | |
159 } | 149 } |
160 | 150 |
161 var _ interface { | 151 var _ interface { |
162 tq.RawInterface | 152 tq.RawInterface |
163 tq.Testable | 153 tq.Testable |
164 } = (*taskqueueTxnImpl)(nil) | 154 } = (*taskqueueTxnImpl)(nil) |
165 | 155 |
166 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { | 156 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { |
167 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) | 157 » toSched, err := t.parent.prepTask(t.ctx, task, queueName) |
168 if err != nil { | 158 if err != nil { |
169 return nil, err | 159 return nil, err |
170 } | 160 } |
171 | 161 |
172 numTasks := 0 | 162 numTasks := 0 |
173 for _, vs := range t.anony { | 163 for _, vs := range t.anony { |
174 numTasks += len(vs) | 164 numTasks += len(vs) |
175 } | 165 } |
176 if numTasks+1 > 5 { | 166 if numTasks+1 > 5 { |
177 // transactional tasks are actually implemented 'for real' as Ac
tions which | 167 // transactional tasks are actually implemented 'for real' as Ac
tions which |
(...skipping 11 matching lines...) Expand all Loading... |
189 // We should verify that the .Name for a task added in a tr
ansaction is | 179 // We should verify that the .Name for a task added in a tr
ansaction is |
190 // meaningless. Maybe names generated in a transaction are
somehow | 180 // meaningless. Maybe names generated in a transaction are
somehow |
191 // guaranteed to be meaningful? | 181 // guaranteed to be meaningful? |
192 toRet := toSched.Duplicate() | 182 toRet := toSched.Duplicate() |
193 toRet.Name = "" | 183 toRet.Name = "" |
194 | 184 |
195 return toRet, nil | 185 return toRet, nil |
196 } | 186 } |
197 | 187 |
198 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra
wTaskCB) error { | 188 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra
wTaskCB) error { |
199 » if atomic.LoadInt32(&t.closed) == 1 { | 189 » if err := assertTxnValid(t.ctx); err != nil { |
200 » » return errors.New("taskqueue: transaction context has expired") | 190 » » return err |
201 } | 191 } |
202 | 192 |
203 t.Lock() | 193 t.Lock() |
204 defer t.Unlock() | 194 defer t.Unlock() |
205 | 195 |
206 queueName, err := t.parent.getQueueNameLocked(queueName) | 196 queueName, err := t.parent.getQueueNameLocked(queueName) |
207 if err != nil { | 197 if err != nil { |
208 return err | 198 return err |
209 } | 199 } |
210 | 200 |
211 for _, task := range tasks { | 201 for _, task := range tasks { |
212 cb(t.addLocked(task, queueName)) | 202 cb(t.addLocked(task, queueName)) |
213 } | 203 } |
214 return nil | 204 return nil |
215 } | 205 } |
216 | 206 |
217 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { | 207 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { |
218 return errors.New("taskqueue: cannot DeleteMulti from a transaction") | 208 return errors.New("taskqueue: cannot DeleteMulti from a transaction") |
219 } | 209 } |
220 | 210 |
221 func (t *taskqueueTxnImpl) Purge(string) error { | 211 func (t *taskqueueTxnImpl) Purge(string) error { |
222 return errors.New("taskqueue: cannot Purge from a transaction") | 212 return errors.New("taskqueue: cannot Purge from a transaction") |
223 } | 213 } |
224 | 214 |
225 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { | 215 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { |
226 return errors.New("taskqueue: cannot Stats from a transaction") | 216 return errors.New("taskqueue: cannot Stats from a transaction") |
227 } | 217 } |
228 | 218 |
229 func (t *taskqueueTxnImpl) Testable() tq.Testable { | 219 func (t *taskqueueTxnImpl) GetTestable() tq.Testable { return t } |
230 » return t | |
231 } | |
232 | 220 |
233 ////////////////////////////// private functions /////////////////////////////// | 221 ////////////////////////////// private functions /////////////////////////////// |
234 | 222 |
235 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 223 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
236 | 224 |
237 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | 225 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
238 | 226 |
239 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { | 227 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
240 _, ok := queue[cur] | 228 _, ok := queue[cur] |
241 for !ok && cur == "" { | 229 for !ok && cur == "" { |
(...skipping 10 matching lines...) Expand all Loading... |
252 func dupQueue(q tq.QueueData) tq.QueueData { | 240 func dupQueue(q tq.QueueData) tq.QueueData { |
253 r := make(tq.QueueData, len(q)) | 241 r := make(tq.QueueData, len(q)) |
254 for k, q := range q { | 242 for k, q := range q { |
255 r[k] = make(map[string]*tq.Task, len(q)) | 243 r[k] = make(map[string]*tq.Task, len(q)) |
256 for tn, t := range q { | 244 for tn, t := range q { |
257 r[k][tn] = t.Duplicate() | 245 r[k][tn] = t.Duplicate() |
258 } | 246 } |
259 } | 247 } |
260 return r | 248 return r |
261 } | 249 } |
OLD | NEW |