OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "net/http" | 9 "net/http" |
10 "regexp" | 10 "regexp" |
11 | 11 |
12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
13 | 13 |
14 "github.com/luci/gae" | 14 "github.com/luci/gae" |
15 » "github.com/luci/gae/dummy" | 15 » "github.com/luci/gae/impl/dummy" |
| 16 » tq "github.com/luci/gae/service/taskqueue" |
| 17 » "github.com/luci/luci-go/common/mathrand" |
16 ) | 18 ) |
17 | 19 |
18 /////////////////////////////// public functions /////////////////////////////// | 20 /////////////////////////////// public functions /////////////////////////////// |
19 | 21 |
20 func useTQ(c context.Context) context.Context { | 22 func useTQ(c context.Context) context.Context { |
21 » return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { | 23 » return tq.SetFactory(c, func(ic context.Context) tq.Interface { |
22 tqd := cur(ic).Get(memContextTQIdx) | 24 tqd := cur(ic).Get(memContextTQIdx) |
23 if x, ok := tqd.(*taskQueueData); ok { | 25 if x, ok := tqd.(*taskQueueData); ok { |
24 return &taskqueueImpl{ | 26 return &taskqueueImpl{ |
25 » » » » dummy.TQ(), | 27 » » » » dummy.TaskQueue(), |
26 x, | 28 x, |
27 ic, | 29 ic, |
28 curGID(ic).namespace, | 30 curGID(ic).namespace, |
29 } | 31 } |
30 } | 32 } |
31 return &taskqueueTxnImpl{ | 33 return &taskqueueTxnImpl{ |
32 » » » dummy.TQ(), | 34 » » » dummy.TaskQueue(), |
33 tqd.(*txnTaskQueueData), | 35 tqd.(*txnTaskQueueData), |
34 ic, | 36 ic, |
35 curGID(ic).namespace, | 37 curGID(ic).namespace, |
36 } | 38 } |
37 }) | 39 }) |
38 } | 40 } |
39 | 41 |
40 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 42 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
41 | 43 |
42 type taskqueueImpl struct { | 44 type taskqueueImpl struct { |
43 » gae.TaskQueue | 45 » tq.Interface |
44 *taskQueueData | 46 *taskQueueData |
45 | 47 |
46 ctx context.Context | 48 ctx context.Context |
47 ns string | 49 ns string |
48 } | 50 } |
49 | 51 |
50 var ( | 52 var ( |
51 » _ = gae.TaskQueue((*taskqueueImpl)(nil)) | 53 » _ = tq.Interface((*taskqueueImpl)(nil)) |
52 » _ = gae.TQTestable((*taskqueueImpl)(nil)) | 54 » _ = tq.Testable((*taskqueueImpl)(nil)) |
53 ) | 55 ) |
54 | 56 |
55 func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
sk, error) { | 57 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { |
56 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | 58 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
57 if err != nil { | 59 if err != nil { |
58 return nil, err | 60 return nil, err |
59 } | 61 } |
60 | 62 |
61 if _, ok := t.archived[queueName][toSched.Name]; ok { | 63 if _, ok := t.archived[queueName][toSched.Name]; ok { |
62 // SDK converts TOMBSTONE -> already added too | 64 // SDK converts TOMBSTONE -> already added too |
63 » » return nil, gae.ErrTQTaskAlreadyAdded | 65 » » return nil, tq.ErrTaskAlreadyAdded |
64 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 66 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
65 » » return nil, gae.ErrTQTaskAlreadyAdded | 67 » » return nil, tq.ErrTaskAlreadyAdded |
66 } else { | 68 } else { |
67 t.named[queueName][toSched.Name] = toSched | 69 t.named[queueName][toSched.Name] = toSched |
68 } | 70 } |
69 | 71 |
70 return dupTask(toSched), nil | 72 return dupTask(toSched), nil |
71 } | 73 } |
72 | 74 |
73 func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, er
ror) { | 75 func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
74 t.Lock() | 76 t.Lock() |
75 defer t.Unlock() | 77 defer t.Unlock() |
76 return t.addLocked(task, queueName) | 78 return t.addLocked(task, queueName) |
77 } | 79 } |
78 | 80 |
79 func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { | 81 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
80 queueName, err := t.getQueueName(queueName) | 82 queueName, err := t.getQueueName(queueName) |
81 if err != nil { | 83 if err != nil { |
82 return err | 84 return err |
83 } | 85 } |
84 | 86 |
85 if _, ok := t.archived[queueName][task.Name]; ok { | 87 if _, ok := t.archived[queueName][task.Name]; ok { |
86 return errors.New("TOMBSTONED_TASK") | 88 return errors.New("TOMBSTONED_TASK") |
87 } | 89 } |
88 | 90 |
89 if _, ok := t.named[queueName][task.Name]; !ok { | 91 if _, ok := t.named[queueName][task.Name]; !ok { |
90 return errors.New("UNKNOWN_TASK") | 92 return errors.New("UNKNOWN_TASK") |
91 } | 93 } |
92 | 94 |
93 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | 95 t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
94 delete(t.named[queueName], task.Name) | 96 delete(t.named[queueName], task.Name) |
95 | 97 |
96 return nil | 98 return nil |
97 } | 99 } |
98 | 100 |
99 func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { | 101 func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { |
100 t.Lock() | 102 t.Lock() |
101 defer t.Unlock() | 103 defer t.Unlock() |
102 return t.deleteLocked(task, queueName) | 104 return t.deleteLocked(task, queueName) |
103 } | 105 } |
104 | 106 |
105 func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.
TQTask, error) { | 107 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task
, error) { |
106 t.Lock() | 108 t.Lock() |
107 defer t.Unlock() | 109 defer t.Unlock() |
108 return multi(tasks, queueName, t.addLocked) | 110 return multi(tasks, queueName, t.addLocked) |
109 } | 111 } |
110 | 112 |
111 func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error
{ | 113 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
112 t.Lock() | 114 t.Lock() |
113 defer t.Unlock() | 115 defer t.Unlock() |
114 | 116 |
115 _, err := multi(tasks, queueName, | 117 _, err := multi(tasks, queueName, |
116 » » func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { | 118 » » func(tsk *tq.Task, qn string) (*tq.Task, error) { |
117 return nil, t.deleteLocked(tsk, qn) | 119 return nil, t.deleteLocked(tsk, qn) |
118 }) | 120 }) |
119 return err | 121 return err |
120 } | 122 } |
121 | 123 |
122 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 124 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
123 | 125 |
124 type taskqueueTxnImpl struct { | 126 type taskqueueTxnImpl struct { |
125 » gae.TaskQueue | 127 » tq.Interface |
126 *txnTaskQueueData | 128 *txnTaskQueueData |
127 | 129 |
128 ctx context.Context | 130 ctx context.Context |
129 ns string | 131 ns string |
130 } | 132 } |
131 | 133 |
132 var _ interface { | 134 var _ interface { |
133 » gae.TaskQueue | 135 » tq.Interface |
134 » gae.TQTestable | 136 » tq.Testable |
135 } = (*taskqueueTxnImpl)(nil) | 137 } = (*taskqueueTxnImpl)(nil) |
136 | 138 |
137 func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
QTask, error) { | 139 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { |
138 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) | 140 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) |
139 if err != nil { | 141 if err != nil { |
140 return nil, err | 142 return nil, err |
141 } | 143 } |
142 | 144 |
143 numTasks := 0 | 145 numTasks := 0 |
144 for _, vs := range t.anony { | 146 for _, vs := range t.anony { |
145 numTasks += len(vs) | 147 numTasks += len(vs) |
146 } | 148 } |
147 if numTasks+1 > 5 { | 149 if numTasks+1 > 5 { |
(...skipping 11 matching lines...) Expand all Loading... |
159 // TODO(riannucci): now that I think about this... it may not actually b
e true. | 161 // TODO(riannucci): now that I think about this... it may not actually b
e true. |
160 // We should verify that the .Name for a task added in a tr
ansaction is | 162 // We should verify that the .Name for a task added in a tr
ansaction is |
161 // meaningless. Maybe names generated in a transaction are
somehow | 163 // meaningless. Maybe names generated in a transaction are
somehow |
162 // guaranteed to be meaningful? | 164 // guaranteed to be meaningful? |
163 toRet := dupTask(toSched) | 165 toRet := dupTask(toSched) |
164 toRet.Name = "" | 166 toRet.Name = "" |
165 | 167 |
166 return toRet, nil | 168 return toRet, nil |
167 } | 169 } |
168 | 170 |
169 func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae
.TQTask, err error) { | 171 func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Tas
k, err error) { |
170 err = t.run(func() (err error) { | 172 err = t.run(func() (err error) { |
171 t.Lock() | 173 t.Lock() |
172 defer t.Unlock() | 174 defer t.Unlock() |
173 retTask, err = t.addLocked(task, queueName) | 175 retTask, err = t.addLocked(task, queueName) |
174 return | 176 return |
175 }) | 177 }) |
176 return | 178 return |
177 } | 179 } |
178 | 180 |
179 func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retT
asks []*gae.TQTask, err error) { | 181 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTask
s []*tq.Task, err error) { |
180 err = t.run(func() (err error) { | 182 err = t.run(func() (err error) { |
181 t.Lock() | 183 t.Lock() |
182 defer t.Unlock() | 184 defer t.Unlock() |
183 retTasks, err = multi(tasks, queueName, t.addLocked) | 185 retTasks, err = multi(tasks, queueName, t.addLocked) |
184 return | 186 return |
185 }) | 187 }) |
186 return | 188 return |
187 } | 189 } |
188 | 190 |
189 ////////////////////////////// private functions /////////////////////////////// | 191 ////////////////////////////// private functions /////////////////////////////// |
190 | 192 |
191 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 193 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
192 | 194 |
193 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | 195 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
194 | 196 |
195 func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string
{ | 197 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
196 _, ok := queue[cur] | 198 _, ok := queue[cur] |
197 for !ok && cur == "" { | 199 for !ok && cur == "" { |
198 name := [500]byte{} | 200 name := [500]byte{} |
199 for i := 0; i < 500; i++ { | 201 for i := 0; i < 500; i++ { |
200 » » » name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(val
idTaskChars))] | 202 » » » name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT
askChars))] |
201 } | 203 } |
202 cur = string(name[:]) | 204 cur = string(name[:]) |
203 _, ok = queue[cur] | 205 _, ok = queue[cur] |
204 } | 206 } |
205 return cur | 207 return cur |
206 } | 208 } |
207 | 209 |
208 func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*
gae.TQTask, error)) ([]*gae.TQTask, error) { | 210 func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Tas
k, error)) ([]*tq.Task, error) { |
209 » ret := []*gae.TQTask(nil) | 211 » ret := []*tq.Task(nil) |
210 lme := gae.LazyMultiError{Size: len(tasks)} | 212 lme := gae.LazyMultiError{Size: len(tasks)} |
211 for i, task := range tasks { | 213 for i, task := range tasks { |
212 rt, err := f(task, queueName) | 214 rt, err := f(task, queueName) |
213 ret = append(ret, rt) | 215 ret = append(ret, rt) |
214 lme.Assign(i, err) | 216 lme.Assign(i, err) |
215 } | 217 } |
216 return ret, lme.Get() | 218 return ret, lme.Get() |
217 } | 219 } |
218 | 220 |
219 func dupTask(t *gae.TQTask) *gae.TQTask { | 221 func dupTask(t *tq.Task) *tq.Task { |
220 » ret := &gae.TQTask{} | 222 » ret := &tq.Task{} |
221 *ret = *t | 223 *ret = *t |
222 | 224 |
223 if t.Header != nil { | 225 if t.Header != nil { |
224 ret.Header = make(http.Header, len(t.Header)) | 226 ret.Header = make(http.Header, len(t.Header)) |
225 for k, vs := range t.Header { | 227 for k, vs := range t.Header { |
226 newVs := make([]string, len(vs)) | 228 newVs := make([]string, len(vs)) |
227 copy(newVs, vs) | 229 copy(newVs, vs) |
228 ret.Header[k] = newVs | 230 ret.Header[k] = newVs |
229 } | 231 } |
230 } | 232 } |
231 | 233 |
232 if t.Payload != nil { | 234 if t.Payload != nil { |
233 ret.Payload = make([]byte, len(t.Payload)) | 235 ret.Payload = make([]byte, len(t.Payload)) |
234 copy(ret.Payload, t.Payload) | 236 copy(ret.Payload, t.Payload) |
235 } | 237 } |
236 | 238 |
237 if t.RetryOptions != nil { | 239 if t.RetryOptions != nil { |
238 » » ret.RetryOptions = &gae.TQRetryOptions{} | 240 » » ret.RetryOptions = &tq.RetryOptions{} |
239 *ret.RetryOptions = *t.RetryOptions | 241 *ret.RetryOptions = *t.RetryOptions |
240 } | 242 } |
241 | 243 |
242 return ret | 244 return ret |
243 } | 245 } |
244 | 246 |
245 func dupQueue(q gae.QueueData) gae.QueueData { | 247 func dupQueue(q tq.QueueData) tq.QueueData { |
246 » r := make(gae.QueueData, len(q)) | 248 » r := make(tq.QueueData, len(q)) |
247 for k, q := range q { | 249 for k, q := range q { |
248 » » r[k] = make(map[string]*gae.TQTask, len(q)) | 250 » » r[k] = make(map[string]*tq.Task, len(q)) |
249 for tn, t := range q { | 251 for tn, t := range q { |
250 r[k][tn] = dupTask(t) | 252 r[k][tn] = dupTask(t) |
251 } | 253 } |
252 } | 254 } |
253 return r | 255 return r |
254 } | 256 } |
OLD | NEW |