Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(198)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: add go-slab dependency Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "fmt"
9 "infra/gae/libs/wrapper"
10 "math/rand"
11 "net/http"
12 "regexp"
13 "time"
14
15 "golang.org/x/net/context"
16
17 "appengine"
18 "appengine/taskqueue"
19 "appengine_internal"
20 dbpb "appengine_internal/datastore"
21 pb "appengine_internal/taskqueue"
22 )
23
24 /////////////////////////////// public functions ///////////////////////////////
25
26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible
27 // by wrapper.GetTQ(c)
28 func UseTQ(c context.Context) context.Context {
29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
30 tqd := cur(ic).Get(memContextTQIdx)
31 var ret interface {
32 wrapper.TQTestable
33 wrapper.TaskQueue
34 }
35 switch x := tqd.(type) {
36 case *taskQueueData:
37 ret = &taskqueueImpl{
38 wrapper.DummyTQ(),
39 x,
40 curGID(ic).namespace,
41 func() time.Time { return wrapper.GetTimeNow(ic) },
42 wrapper.GetMathRand(ic),
43 }
44
45 case *txnTaskQueueData:
46 ret = &taskqueueTxnImpl{
47 wrapper.DummyTQ(),
48 x,
49 curGID(ic).namespace,
50 func() time.Time { return wrapper.GetTimeNow(ic) },
51 wrapper.GetMathRand(ic),
52 }
53
54 default:
55 panic(fmt.Errorf("TQ: bad type: %v", tqd))
56 }
57 return ret
58 })
59 }
60
61 //////////////////////////////// taskqueueImpl /////////////////////////////////
62
63 type taskqueueImpl struct {
64 wrapper.TaskQueue
65 *taskQueueData
66
67 ns string
68 timeNow func() time.Time
69 mathRand *rand.Rand
70 }
71
72 var (
73 _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
74 _ = wrapper.TQTestable((*taskqueueImpl)(nil))
75 )
76
77 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
78 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand)
79 if err != nil {
80 return nil, err
81 }
82
83 if _, ok := t.archived[queueName][toSched.Name]; ok {
84 // SDK converts TOMBSTONE -> already added too
85 return nil, taskqueue.ErrTaskAlreadyAdded
86 } else if _, ok := t.named[queueName][toSched.Name]; ok {
87 return nil, taskqueue.ErrTaskAlreadyAdded
88 } else {
89 t.named[queueName][toSched.Name] = toSched
90 }
91
92 return dupTask(toSched), nil
93 }
94
95 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) {
96 if err := t.IsBroken(); err != nil {
97 return nil, err
98 }
99
100 t.Lock()
101 defer t.Unlock()
102
103 return t.addLocked(task, queueName)
104 }
105
106 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or {
107 queueName, err := t.getQueueName(queueName)
108 if err != nil {
109 return err
110 }
111
112 if _, ok := t.archived[queueName][task.Name]; ok {
113 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
114 }
115
116 if _, ok := t.named[queueName][task.Name]; !ok {
117 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
118 }
119
120 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
121 delete(t.named[queueName], task.Name)
122
123 return nil
124 }
125
126 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
127 if err := t.IsBroken(); err != nil {
128 return err
129 }
130
131 t.Lock()
132 defer t.Unlock()
133
134 return t.deleteLocked(task, queueName)
135 }
136
137 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) {
138 if err := t.IsBroken(); err != nil {
139 return nil, err
140 }
141
142 t.Lock()
143 defer t.Unlock()
144
145 return multi(tasks, queueName, t.addLocked)
146 }
147
148 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror {
149 if err := t.IsBroken(); err != nil {
150 return err
151 }
152
153 t.Lock()
154 defer t.Unlock()
155
156 _, err := multi(tasks, queueName,
157 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
158 return nil, t.deleteLocked(tsk, qn)
159 })
160 return err
161 }
162
163 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
164
165 type taskqueueTxnImpl struct {
166 wrapper.TaskQueue
167 *txnTaskQueueData
168
169 ns string
170 timeNow func() time.Time
171 mathRand *rand.Rand
172 }
173
174 var (
175 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
176 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
177 )
178
179 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
180 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand)
181 if err != nil {
182 return nil, err
183 }
184
185 numTasks := 0
186 for _, vs := range t.anony {
187 numTasks += len(vs)
188 }
189 if numTasks+1 > 5 {
190 // transactional tasks are actually implemented 'for real' as Ac tions which
191 // ride on the datastore. The current datastore implementation o nly allows
192 // a maximum of 5 Actions per transaction, and more than that re sult in a
193 // BAD_REQUEST.
194 return nil, newDSError(dbpb.Error_BAD_REQUEST)
195 }
196
197 t.anony[queueName] = append(t.anony[queueName], toSched)
198
199 // the fact that we have generated a unique name for this task queue ite m is
200 // an implementation detail.
201 // TODO(riannucci): now that I think about this... it may not actually b e true.
202 // We should verify that the .Name for a task added in a tr ansaction is
203 // meaningless. Maybe names generated in a transaction are somehow
204 // guaranteed to be meaningful?
205 toRet := dupTask(toSched)
206 toRet.Name = ""
207
208 return toRet, nil
209 }
210
211 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) {
212 if err := t.IsBroken(); err != nil {
213 return nil, err
214 }
215
216 t.Lock()
217 defer t.Unlock()
218
219 return t.addLocked(task, queueName)
220 }
221
222 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) {
223 if err := t.IsBroken(); err != nil {
224 return nil, err
225 }
226
227 t.Lock()
228 defer t.Unlock()
229
230 return multi(tasks, queueName, t.addLocked)
231 }
232
233 ////////////////////////////// private functions ///////////////////////////////
234
235 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
236
237 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
238
239 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string {
240 _, ok := queue[cur]
241 for !ok && cur == "" {
242 name := [500]byte{}
243 for i := 0; i < 500; i++ {
244 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))]
245 }
246 cur = string(name[:])
247 _, ok = queue[cur]
248 }
249 return cur
250 }
251
252 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
253 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
254 }
255
256 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
257 ret := []*taskqueue.Task(nil)
258 me := appengine.MultiError(nil)
259 foundErr := false
260 for _, task := range tasks {
261 rt, err := f(task, queueName)
262 ret = append(ret, rt)
263 me = append(me, err)
264 if err != nil {
265 foundErr = true
266 }
267 }
268 if !foundErr {
269 me = nil
270 }
271 return ret, me
272 }
273
274 func dupTask(t *taskqueue.Task) *taskqueue.Task {
275 ret := &taskqueue.Task{}
276 *ret = *t
277
278 if t.Header != nil {
279 ret.Header = make(http.Header, len(t.Header))
280 for k, vs := range t.Header {
281 newVs := make([]string, len(vs))
282 copy(newVs, vs)
283 ret.Header[k] = newVs
284 }
285 }
286
287 if t.Payload != nil {
288 ret.Payload = make([]byte, len(t.Payload))
289 copy(ret.Payload, t.Payload)
290 }
291
292 if t.RetryOptions != nil {
293 ret.RetryOptions = &taskqueue.RetryOptions{}
294 *ret.RetryOptions = *t.RetryOptions
295 }
296
297 return ret
298 }
299
300 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
301 r := make(wrapper.QueueData, len(q))
302 for k, q := range q {
303 r[k] = make(map[string]*taskqueue.Task, len(q))
304 for tn, t := range q {
305 r[k][tn] = dupTask(t)
306 }
307 }
308 return r
309 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/plist_test.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698