OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "net/http" | |
9 "regexp" | 8 "regexp" |
| 9 "sync/atomic" |
10 | 10 |
11 "golang.org/x/net/context" | 11 "golang.org/x/net/context" |
12 | 12 |
13 "github.com/luci/gae/impl/dummy" | |
14 tq "github.com/luci/gae/service/taskqueue" | 13 tq "github.com/luci/gae/service/taskqueue" |
15 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/mathrand" | 15 "github.com/luci/luci-go/common/mathrand" |
17 ) | 16 ) |
18 | 17 |
19 /////////////////////////////// public functions /////////////////////////////// | 18 /////////////////////////////// public functions /////////////////////////////// |
20 | 19 |
21 func useTQ(c context.Context) context.Context { | 20 func useTQ(c context.Context) context.Context { |
22 » return tq.SetFactory(c, func(ic context.Context) tq.Interface { | 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface { |
23 tqd := cur(ic).Get(memContextTQIdx) | 22 tqd := cur(ic).Get(memContextTQIdx) |
24 if x, ok := tqd.(*taskQueueData); ok { | 23 if x, ok := tqd.(*taskQueueData); ok { |
25 return &taskqueueImpl{ | 24 return &taskqueueImpl{ |
26 dummy.TaskQueue(), | |
27 x, | 25 x, |
28 ic, | 26 ic, |
29 curGID(ic).namespace, | 27 curGID(ic).namespace, |
30 } | 28 } |
31 } | 29 } |
32 return &taskqueueTxnImpl{ | 30 return &taskqueueTxnImpl{ |
33 dummy.TaskQueue(), | |
34 tqd.(*txnTaskQueueData), | 31 tqd.(*txnTaskQueueData), |
35 ic, | 32 ic, |
36 curGID(ic).namespace, | 33 curGID(ic).namespace, |
37 } | 34 } |
38 }) | 35 }) |
39 } | 36 } |
40 | 37 |
41 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 38 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
42 | 39 |
43 type taskqueueImpl struct { | 40 type taskqueueImpl struct { |
44 tq.Interface | |
45 *taskQueueData | 41 *taskQueueData |
46 | 42 |
47 ctx context.Context | 43 ctx context.Context |
48 ns string | 44 ns string |
49 } | 45 } |
50 | 46 |
51 var ( | 47 var ( |
52 » _ = tq.Interface((*taskqueueImpl)(nil)) | 48 » _ = tq.RawInterface((*taskqueueImpl)(nil)) |
53 _ = tq.Testable((*taskqueueImpl)(nil)) | 49 _ = tq.Testable((*taskqueueImpl)(nil)) |
54 ) | 50 ) |
55 | 51 |
56 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { | 52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { |
57 » toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | 53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) |
58 if err != nil { | 54 if err != nil { |
59 return nil, err | 55 return nil, err |
60 } | 56 } |
61 | 57 |
62 if _, ok := t.archived[queueName][toSched.Name]; ok { | 58 if _, ok := t.archived[queueName][toSched.Name]; ok { |
63 // SDK converts TOMBSTONE -> already added too | 59 // SDK converts TOMBSTONE -> already added too |
64 return nil, tq.ErrTaskAlreadyAdded | 60 return nil, tq.ErrTaskAlreadyAdded |
65 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 61 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
66 return nil, tq.ErrTaskAlreadyAdded | 62 return nil, tq.ErrTaskAlreadyAdded |
67 } else { | 63 } else { |
68 t.named[queueName][toSched.Name] = toSched | 64 t.named[queueName][toSched.Name] = toSched |
69 } | 65 } |
70 | 66 |
71 » return dupTask(toSched), nil | 67 » return toSched.Duplicate(), nil |
72 } | |
73 | |
74 func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { | |
75 » t.Lock() | |
76 » defer t.Unlock() | |
77 » return t.addLocked(task, queueName) | |
78 } | 68 } |
79 | 69 |
80 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { | 70 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
81 queueName, err := t.getQueueName(queueName) | |
82 if err != nil { | |
83 return err | |
84 } | |
85 | |
86 if _, ok := t.archived[queueName][task.Name]; ok { | 71 if _, ok := t.archived[queueName][task.Name]; ok { |
87 return errors.New("TOMBSTONED_TASK") | 72 return errors.New("TOMBSTONED_TASK") |
88 } | 73 } |
89 | 74 |
90 if _, ok := t.named[queueName][task.Name]; !ok { | 75 if _, ok := t.named[queueName][task.Name]; !ok { |
91 return errors.New("UNKNOWN_TASK") | 76 return errors.New("UNKNOWN_TASK") |
92 } | 77 } |
93 | 78 |
94 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | 79 t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
95 delete(t.named[queueName], task.Name) | 80 delete(t.named[queueName], task.Name) |
96 | 81 |
97 return nil | 82 return nil |
98 } | 83 } |
99 | 84 |
100 func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { | 85 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTa
skCB) error { |
101 » t.Lock() | |
102 » defer t.Unlock() | |
103 » return t.deleteLocked(task, queueName) | |
104 } | |
105 | |
106 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task
, error) { | |
107 » t.Lock() | |
108 » defer t.Unlock() | |
109 » return multi(tasks, queueName, t.addLocked) | |
110 } | |
111 | |
112 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { | |
113 t.Lock() | 86 t.Lock() |
114 defer t.Unlock() | 87 defer t.Unlock() |
115 | 88 |
116 » _, err := multi(tasks, queueName, | 89 » queueName, err := t.getQueueNameLocked(queueName) |
117 » » func(tsk *tq.Task, qn string) (*tq.Task, error) { | 90 » if err != nil { |
118 » » » return nil, t.deleteLocked(tsk, qn) | 91 » » return err |
119 » » }) | 92 » } |
120 » return err | 93 |
| 94 » for _, task := range tasks { |
| 95 » » cb(t.addLocked(task, queueName)) |
| 96 » } |
| 97 » return nil |
| 98 } |
| 99 |
| 100 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.Ra
wCB) error { |
| 101 » t.Lock() |
| 102 » defer t.Unlock() |
| 103 |
| 104 » queueName, err := t.getQueueNameLocked(queueName) |
| 105 » if err != nil { |
| 106 » » return err |
| 107 » } |
| 108 |
| 109 » for _, task := range tasks { |
| 110 » » cb(t.deleteLocked(task, queueName)) |
| 111 » } |
| 112 » return nil |
| 113 } |
| 114 |
| 115 func (t *taskqueueImpl) Purge(queueName string) error { |
| 116 » t.Lock() |
| 117 » defer t.Unlock() |
| 118 |
| 119 » return t.purgeLocked(queueName) |
| 120 } |
| 121 |
| 122 func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { |
| 123 » t.Lock() |
| 124 » defer t.Unlock() |
| 125 |
| 126 » for _, qn := range queueNames { |
| 127 » » qn, err := t.getQueueNameLocked(qn) |
| 128 » » if err != nil { |
| 129 » » » cb(nil, err) |
| 130 » » } else { |
| 131 » » » s := tq.Statistics{ |
| 132 » » » » Tasks: len(t.named[qn]), |
| 133 » » » } |
| 134 » » » for _, t := range t.named[qn] { |
| 135 » » » » if s.OldestETA.IsZero() { |
| 136 » » » » » s.OldestETA = t.ETA |
| 137 » » » » } else if t.ETA.Before(s.OldestETA) { |
| 138 » » » » » s.OldestETA = t.ETA |
| 139 » » » » } |
| 140 » » » } |
| 141 » » » cb(&s, nil) |
| 142 » » } |
| 143 » } |
| 144 |
| 145 » return nil |
121 } | 146 } |
122 | 147 |
123 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 148 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
124 | 149 |
125 type taskqueueTxnImpl struct { | 150 type taskqueueTxnImpl struct { |
126 tq.Interface | |
127 *txnTaskQueueData | 151 *txnTaskQueueData |
128 | 152 |
129 ctx context.Context | 153 ctx context.Context |
130 ns string | 154 ns string |
131 } | 155 } |
132 | 156 |
133 var _ interface { | 157 var _ interface { |
134 » tq.Interface | 158 » tq.RawInterface |
135 tq.Testable | 159 tq.Testable |
136 } = (*taskqueueTxnImpl)(nil) | 160 } = (*taskqueueTxnImpl)(nil) |
137 | 161 |
138 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { | 162 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { |
139 » toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) | 163 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
140 if err != nil { | 164 if err != nil { |
141 return nil, err | 165 return nil, err |
142 } | 166 } |
143 | 167 |
144 numTasks := 0 | 168 numTasks := 0 |
145 for _, vs := range t.anony { | 169 for _, vs := range t.anony { |
146 numTasks += len(vs) | 170 numTasks += len(vs) |
147 } | 171 } |
148 if numTasks+1 > 5 { | 172 if numTasks+1 > 5 { |
149 // transactional tasks are actually implemented 'for real' as Ac
tions which | 173 // transactional tasks are actually implemented 'for real' as Ac
tions which |
150 // ride on the datastore. The current datastore implementation o
nly allows | 174 // ride on the datastore. The current datastore implementation o
nly allows |
151 // a maximum of 5 Actions per transaction, and more than that re
sult in a | 175 // a maximum of 5 Actions per transaction, and more than that re
sult in a |
152 // BAD_REQUEST. | 176 // BAD_REQUEST. |
153 return nil, errors.New("BAD_REQUEST") | 177 return nil, errors.New("BAD_REQUEST") |
154 } | 178 } |
155 | 179 |
156 t.anony[queueName] = append(t.anony[queueName], toSched) | 180 t.anony[queueName] = append(t.anony[queueName], toSched) |
157 | 181 |
158 // the fact that we have generated a unique name for this task queue ite
m is | 182 // the fact that we have generated a unique name for this task queue ite
m is |
159 // an implementation detail. | 183 // an implementation detail. |
160 // TODO(riannucci): now that I think about this... it may not actually b
e true. | 184 // TODO(riannucci): now that I think about this... it may not actually b
e true. |
161 // We should verify that the .Name for a task added in a tr
ansaction is | 185 // We should verify that the .Name for a task added in a tr
ansaction is |
162 // meaningless. Maybe names generated in a transaction are
somehow | 186 // meaningless. Maybe names generated in a transaction are
somehow |
163 // guaranteed to be meaningful? | 187 // guaranteed to be meaningful? |
164 » toRet := dupTask(toSched) | 188 » toRet := toSched.Duplicate() |
165 toRet.Name = "" | 189 toRet.Name = "" |
166 | 190 |
167 return toRet, nil | 191 return toRet, nil |
168 } | 192 } |
169 | 193 |
170 func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Tas
k, err error) { | 194 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra
wTaskCB) error { |
171 » err = t.run(func() (err error) { | 195 » if atomic.LoadInt32(&t.closed) == 1 { |
172 » » t.Lock() | 196 » » return errors.New("taskqueue: transaction context has expired") |
173 » » defer t.Unlock() | 197 » } |
174 » » retTask, err = t.addLocked(task, queueName) | 198 |
175 » » return | 199 » t.Lock() |
176 » }) | 200 » defer t.Unlock() |
177 » return | 201 |
| 202 » queueName, err := t.parent.getQueueNameLocked(queueName) |
| 203 » if err != nil { |
| 204 » » return err |
| 205 » } |
| 206 |
| 207 » for _, task := range tasks { |
| 208 » » cb(t.addLocked(task, queueName)) |
| 209 » } |
| 210 » return nil |
178 } | 211 } |
179 | 212 |
180 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTask
s []*tq.Task, err error) { | 213 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { |
181 » err = t.run(func() (err error) { | 214 » return errors.New("taskqueue: cannot DeleteMulti from a transaction") |
182 » » t.Lock() | 215 } |
183 » » defer t.Unlock() | 216 |
184 » » retTasks, err = multi(tasks, queueName, t.addLocked) | 217 func (t *taskqueueTxnImpl) Purge(string) error { |
185 » » return | 218 » return errors.New("taskqueue: cannot Purge from a transaction") |
186 » }) | 219 } |
187 » return | 220 |
| 221 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { |
| 222 » return errors.New("taskqueue: cannot Stats from a transaction") |
188 } | 223 } |
189 | 224 |
190 ////////////////////////////// private functions /////////////////////////////// | 225 ////////////////////////////// private functions /////////////////////////////// |
191 | 226 |
192 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
193 | 228 |
194 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
195 | 230 |
196 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { | 231 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
197 _, ok := queue[cur] | 232 _, ok := queue[cur] |
198 for !ok && cur == "" { | 233 for !ok && cur == "" { |
199 name := [500]byte{} | 234 name := [500]byte{} |
200 for i := 0; i < 500; i++ { | 235 for i := 0; i < 500; i++ { |
201 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT
askChars))] | 236 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT
askChars))] |
202 } | 237 } |
203 cur = string(name[:]) | 238 cur = string(name[:]) |
204 _, ok = queue[cur] | 239 _, ok = queue[cur] |
205 } | 240 } |
206 return cur | 241 return cur |
207 } | 242 } |
208 | 243 |
209 func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Tas
k, error)) ([]*tq.Task, error) { | |
210 ret := []*tq.Task(nil) | |
211 lme := errors.LazyMultiError{Size: len(tasks)} | |
212 for i, task := range tasks { | |
213 rt, err := f(task, queueName) | |
214 ret = append(ret, rt) | |
215 lme.Assign(i, err) | |
216 } | |
217 return ret, lme.Get() | |
218 } | |
219 | |
220 func dupTask(t *tq.Task) *tq.Task { | |
221 ret := &tq.Task{} | |
222 *ret = *t | |
223 | |
224 if t.Header != nil { | |
225 ret.Header = make(http.Header, len(t.Header)) | |
226 for k, vs := range t.Header { | |
227 newVs := make([]string, len(vs)) | |
228 copy(newVs, vs) | |
229 ret.Header[k] = newVs | |
230 } | |
231 } | |
232 | |
233 if t.Payload != nil { | |
234 ret.Payload = make([]byte, len(t.Payload)) | |
235 copy(ret.Payload, t.Payload) | |
236 } | |
237 | |
238 if t.RetryOptions != nil { | |
239 ret.RetryOptions = &tq.RetryOptions{} | |
240 *ret.RetryOptions = *t.RetryOptions | |
241 } | |
242 | |
243 return ret | |
244 } | |
245 | |
246 func dupQueue(q tq.QueueData) tq.QueueData { | 244 func dupQueue(q tq.QueueData) tq.QueueData { |
247 r := make(tq.QueueData, len(q)) | 245 r := make(tq.QueueData, len(q)) |
248 for k, q := range q { | 246 for k, q := range q { |
249 r[k] = make(map[string]*tq.Task, len(q)) | 247 r[k] = make(map[string]*tq.Task, len(q)) |
250 for tn, t := range q { | 248 for tn, t := range q { |
251 » » » r[k][tn] = dupTask(t) | 249 » » » r[k][tn] = t.Duplicate() |
252 } | 250 } |
253 } | 251 } |
254 return r | 252 return r |
255 } | 253 } |
OLD | NEW |