Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(168)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: fixes Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "fmt"
9 "golang.org/x/net/context"
10 "math/rand"
11 "net/http"
12 "regexp"
13 "time"
14
15 "appengine"
16 "appengine/taskqueue"
17 "appengine_internal"
18 dbpb "appengine_internal/datastore"
19 pb "appengine_internal/taskqueue"
20
21 "infra/gae/libs/wrapper"
22 )
23
24 /////////////////////////////// public functions ///////////////////////////////
25
26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible
27 // by wrapper.GetTQ(c)
28 func UseTQ(c context.Context) context.Context {
29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
30 tqd := cur(ic).Get("TQ")
31 var ret interface {
32 wrapper.TQTestable
33 wrapper.TaskQueue
34 }
35 switch x := tqd.(type) {
36 case *taskQueueData:
37 ret = &taskqueueImpl{
38 wrapper.DummyTQ(),
39 x,
40 curGID(ic).namespace,
41 func() time.Time { return wrapper.GetTimeNow(ic) },
42 wrapper.GetMathRand(ic),
43 }
44
45 case *txnTaskQueueData:
46 ret = &taskqueueTxnImpl{
47 wrapper.DummyTQ(),
48 x,
49 curGID(ic).namespace,
50 func() time.Time { return wrapper.GetTimeNow(ic) },
51 wrapper.GetMathRand(ic),
52 }
53
54 default:
55 panic(fmt.Errorf("TQ: bad type: %v", tqd))
56 }
57 return ret
58 })
59 }
60
61 //////////////////////////////// taskqueueImpl /////////////////////////////////
62
63 type taskqueueImpl struct {
64 wrapper.TaskQueue
65 *taskQueueData
66
67 ns string
68 timeNow func() time.Time
69 mathRand *rand.Rand
70 }
71
72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand)
74 if err != nil {
75 return nil, err
76 }
77
78 if _, ok := t.archived[queueName][toSched.Name]; ok {
79 // SDK converts TOMBSTONE -> already added too
80 return nil, taskqueue.ErrTaskAlreadyAdded
81 } else if _, ok := t.named[queueName][toSched.Name]; ok {
82 return nil, taskqueue.ErrTaskAlreadyAdded
83 } else {
84 t.named[queueName][toSched.Name] = toSched
85 }
86
87 return dupTask(toSched), nil
88 }
89
90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) {
91 err := t.IsBroken()
92 if err != nil {
93 return nil, err
94 }
95
96 t.Lock()
97 defer t.Unlock()
98
99 return t.addLocked(task, queueName)
100 }
101
102 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or {
103 queueName, err := t.getQueueName(queueName)
104 if err != nil {
105 return err
106 }
107
108 if _, ok := t.archived[queueName][task.Name]; ok {
109 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
110 }
111
112 if _, ok := t.named[queueName][task.Name]; !ok {
113 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
114 }
115
116 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
117 delete(t.named[queueName], task.Name)
118
119 return nil
120 }
121
122 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
123 err := t.IsBroken()
124 if err != nil {
125 return err
126 }
127
128 t.Lock()
129 defer t.Unlock()
130
131 return t.deleteLocked(task, queueName)
132 }
133
134 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) {
135 err := t.IsBroken()
136 if err != nil {
137 return nil, err
138 }
139
140 return multi(tasks, queueName, t.addLocked)
141 }
142
143 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror {
144 err := t.IsBroken()
145 if err != nil {
146 return err
147 }
148
149 t.Lock()
150 defer t.Unlock()
151
152 _, err = multi(tasks, queueName,
153 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
154 return nil, t.deleteLocked(tsk, qn)
155 })
156 return err
157 }
158
159 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
160
161 type taskqueueTxnImpl struct {
162 wrapper.TaskQueue
163 *txnTaskQueueData
164
165 ns string
166 timeNow func() time.Time
167 mathRand *rand.Rand
M-A Ruel 2015/05/25 18:21:10 Why not always create its own rand with Seet(0) ?
iannucci 2015/05/27 19:33:32 So that the test writer can control it
168 }
169
170 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
171 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand)
172 if err != nil {
173 return nil, err
174 }
175
176 numTasks := 0
177 for _, vs := range t.anony {
178 numTasks += len(vs)
179 }
180 if numTasks+1 > 5 {
181 // transactional tasks are actually implemented 'for real' as Ac tions which
182 // ride on the datastore. The current datastore implementation o nly allows
183 // a maximum of 5 Actions per transaction, and more than that re sult in a
184 // BAD_REQUEST.
185 return nil, newDSError(dbpb.Error_BAD_REQUEST)
186 }
187
188 t.anony[queueName] = append(t.anony[queueName], toSched)
189
190 // the fact that we have generated a unique name for this task queue ite m is
191 // an implementation detail.
192 // TODO(riannucci): now that I think about this... it may not actually b e true.
193 // We should verify that the .Name for a task added in a tr ansaction is
194 // meaningless. Maybe names generated in a transaction are somehow
195 // guaranteed to be meaningful?
196 toRet := dupTask(toSched)
197 toRet.Name = ""
198
199 return toRet, nil
200 }
201
202 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) {
203 err := t.IsBroken()
204 if err != nil {
205 return nil, err
206 }
207
208 t.Lock()
209 defer t.Unlock()
210
211 return t.addLocked(task, queueName)
212 }
213
214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) {
215 err := t.IsBroken()
216 if err != nil {
217 return nil, err
218 }
219
220 t.Lock()
221 defer t.Unlock()
222
223 return multi(tasks, queueName, t.addLocked)
224 }
225
226 ////////////////////////////// private functions ///////////////////////////////
227
228 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
229
230 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
231
232 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string {
233 _, ok := queue[cur]
234 for !ok && cur == "" {
235 name := [500]byte{}
236 for i := 0; i < 500; i++ {
M-A Ruel 2015/05/25 18:21:10 Always 500 is excessive IMHO
iannucci 2015/05/27 19:33:32 That's the actual name limit.
237 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))]
238 }
239 cur = string(name[:])
240 _, ok = queue[cur]
241 }
242 return cur
243 }
244
245 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
246 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
247 }
248
249 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
250 ret := []*taskqueue.Task(nil)
251 me := appengine.MultiError(nil)
252 foundErr := false
253 for _, task := range tasks {
254 rt, err := f(task, queueName)
255 ret = append(ret, rt)
256 me = append(me, err)
257 if err != nil {
258 foundErr = true
259 }
260 }
261 if !foundErr {
262 me = nil
263 }
264 return ret, me
265 }
266
267 func dupTask(t *taskqueue.Task) *taskqueue.Task {
268 ret := &taskqueue.Task{}
269 *ret = *t
270
271 if t.Header != nil {
272 ret.Header = make(http.Header, len(t.Header))
273 for k, vs := range t.Header {
274 newVs := make([]string, len(vs))
275 copy(newVs, vs)
276 ret.Header[k] = newVs
277 }
278 }
279
280 if t.Payload != nil {
281 ret.Payload = make([]byte, len(t.Payload))
282 copy(ret.Payload, t.Payload)
283 }
284
285 if t.RetryOptions != nil {
286 ret.RetryOptions = &taskqueue.RetryOptions{}
287 *ret.RetryOptions = *t.RetryOptions
288 }
289
290 return ret
291 }
292
293 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
294 r := make(wrapper.QueueData, len(q))
295 for k, q := range q {
296 r[k] = make(map[string]*taskqueue.Task, len(q))
297 for tn, t := range q {
298 r[k][tn] = dupTask(t)
299 }
300 }
301 return r
302 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698