Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(87)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: final fixes? Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "fmt"
9 "infra/gae/libs/wrapper"
10 "math/rand"
11 "net/http"
12 "regexp"
13 "time"
14
15 "golang.org/x/net/context"
16
17 "appengine"
18 "appengine/taskqueue"
19 "appengine_internal"
20 dbpb "appengine_internal/datastore"
21 pb "appengine_internal/taskqueue"
22 )
23
24 /////////////////////////////// public functions ///////////////////////////////
25
26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible
27 // by wrapper.GetTQ(c)
28 func UseTQ(c context.Context) context.Context {
29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
30 tqd := cur(ic).Get("TQ")
31 var ret interface {
32 wrapper.TQTestable
33 wrapper.TaskQueue
34 }
35 switch x := tqd.(type) {
36 case *taskQueueData:
37 ret = &taskqueueImpl{
38 wrapper.DummyTQ(),
39 x,
40 curGID(ic).namespace,
41 func() time.Time { return wrapper.GetTimeNow(ic) },
42 wrapper.GetMathRand(ic),
43 }
44
45 case *txnTaskQueueData:
46 ret = &taskqueueTxnImpl{
47 wrapper.DummyTQ(),
48 x,
49 curGID(ic).namespace,
50 func() time.Time { return wrapper.GetTimeNow(ic) },
51 wrapper.GetMathRand(ic),
52 }
53
54 default:
55 panic(fmt.Errorf("TQ: bad type: %v", tqd))
56 }
57 return ret
58 })
59 }
60
61 //////////////////////////////// taskqueueImpl /////////////////////////////////
62
63 type taskqueueImpl struct {
64 wrapper.TaskQueue
65 *taskQueueData
66
67 ns string
68 timeNow func() time.Time
69 mathRand *rand.Rand
70 }
71
72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand)
74 if err != nil {
75 return nil, err
76 }
77
78 if _, ok := t.archived[queueName][toSched.Name]; ok {
79 // SDK converts TOMBSTONE -> already added too
80 return nil, taskqueue.ErrTaskAlreadyAdded
81 } else if _, ok := t.named[queueName][toSched.Name]; ok {
82 return nil, taskqueue.ErrTaskAlreadyAdded
83 } else {
84 t.named[queueName][toSched.Name] = toSched
85 }
86
87 return dupTask(toSched), nil
88 }
89
90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) {
91 if err := t.IsBroken(); err != nil {
92 return nil, err
93 }
94
95 t.Lock()
96 defer t.Unlock()
97
98 return t.addLocked(task, queueName)
99 }
100
101 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or {
102 queueName, err := t.getQueueName(queueName)
103 if err != nil {
104 return err
105 }
106
107 if _, ok := t.archived[queueName][task.Name]; ok {
108 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
109 }
110
111 if _, ok := t.named[queueName][task.Name]; !ok {
112 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
113 }
114
115 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
116 delete(t.named[queueName], task.Name)
117
118 return nil
119 }
120
121 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
122 if err := t.IsBroken(); err != nil {
123 return err
124 }
125
126 t.Lock()
127 defer t.Unlock()
128
129 return t.deleteLocked(task, queueName)
130 }
131
132 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) {
133 if err := t.IsBroken(); err != nil {
134 return nil, err
135 }
136
137 t.Lock()
138 defer t.Unlock()
139
140 return multi(tasks, queueName, t.addLocked)
141 }
142
143 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror {
144 if err := t.IsBroken(); err != nil {
145 return err
146 }
147
148 t.Lock()
149 defer t.Unlock()
150
151 _, err := multi(tasks, queueName,
152 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
153 return nil, t.deleteLocked(tsk, qn)
154 })
155 return err
156 }
157
158 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
159
160 type taskqueueTxnImpl struct {
161 wrapper.TaskQueue
162 *txnTaskQueueData
163
164 ns string
165 timeNow func() time.Time
166 mathRand *rand.Rand
167 }
168
169 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
170 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand)
171 if err != nil {
172 return nil, err
173 }
174
175 numTasks := 0
176 for _, vs := range t.anony {
177 numTasks += len(vs)
178 }
179 if numTasks+1 > 5 {
180 // transactional tasks are actually implemented 'for real' as Ac tions which
181 // ride on the datastore. The current datastore implementation o nly allows
182 // a maximum of 5 Actions per transaction, and more than that re sult in a
183 // BAD_REQUEST.
184 return nil, newDSError(dbpb.Error_BAD_REQUEST)
185 }
186
187 t.anony[queueName] = append(t.anony[queueName], toSched)
188
189 // the fact that we have generated a unique name for this task queue ite m is
190 // an implementation detail.
191 // TODO(riannucci): now that I think about this... it may not actually b e true.
192 // We should verify that the .Name for a task added in a tr ansaction is
193 // meaningless. Maybe names generated in a transaction are somehow
194 // guaranteed to be meaningful?
195 toRet := dupTask(toSched)
196 toRet.Name = ""
197
198 return toRet, nil
199 }
200
201 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) {
202 if err := t.IsBroken(); err != nil {
203 return nil, err
204 }
205
206 t.Lock()
207 defer t.Unlock()
208
209 return t.addLocked(task, queueName)
210 }
211
212 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) {
213 if err := t.IsBroken(); err != nil {
214 return nil, err
215 }
216
217 t.Lock()
218 defer t.Unlock()
219
220 return multi(tasks, queueName, t.addLocked)
221 }
222
223 ////////////////////////////// private functions ///////////////////////////////
224
225 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
226
227 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
228
229 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string {
230 _, ok := queue[cur]
231 for !ok && cur == "" {
232 name := [500]byte{}
233 for i := 0; i < 500; i++ {
234 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))]
235 }
236 cur = string(name[:])
237 _, ok = queue[cur]
238 }
239 return cur
240 }
241
242 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
243 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
244 }
245
246 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
247 ret := []*taskqueue.Task(nil)
248 me := appengine.MultiError(nil)
249 foundErr := false
250 for _, task := range tasks {
251 rt, err := f(task, queueName)
252 ret = append(ret, rt)
253 me = append(me, err)
254 if err != nil {
255 foundErr = true
256 }
257 }
258 if !foundErr {
259 me = nil
260 }
261 return ret, me
262 }
263
264 func dupTask(t *taskqueue.Task) *taskqueue.Task {
265 ret := &taskqueue.Task{}
266 *ret = *t
267
268 if t.Header != nil {
269 ret.Header = make(http.Header, len(t.Header))
270 for k, vs := range t.Header {
271 newVs := make([]string, len(vs))
272 copy(newVs, vs)
273 ret.Header[k] = newVs
274 }
275 }
276
277 if t.Payload != nil {
278 ret.Payload = make([]byte, len(t.Payload))
279 copy(ret.Payload, t.Payload)
280 }
281
282 if t.RetryOptions != nil {
283 ret.RetryOptions = &taskqueue.RetryOptions{}
284 *ret.RetryOptions = *t.RetryOptions
285 }
286
287 return ret
288 }
289
290 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
291 r := make(wrapper.QueueData, len(q))
292 for k, q := range q {
293 r[k] = make(map[string]*taskqueue.Task, len(q))
294 for tn, t := range q {
295 r[k][tn] = dupTask(t)
296 }
297 }
298 return r
299 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698