Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(937)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1240573002: Reland: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: expand coverage range to fit 32bit test expectations Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "fmt"
9 "infra/gae/libs/wrapper"
10 "net/http"
11 "regexp"
12
13 "golang.org/x/net/context"
14
15 "appengine"
16 "appengine/taskqueue"
17 "appengine_internal"
18 dbpb "appengine_internal/datastore"
19 pb "appengine_internal/taskqueue"
20 )
21
22 /////////////////////////////// public functions ///////////////////////////////
23
24 func useTQ(c context.Context) context.Context {
25 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
26 tqd := cur(ic).Get(memContextTQIdx)
27 var ret interface {
28 wrapper.TQTestable
29 wrapper.TaskQueue
30 }
31 switch x := tqd.(type) {
32 case *taskQueueData:
33 ret = &taskqueueImpl{
34 wrapper.DummyTQ(),
35 x,
36 ic,
37 curGID(ic).namespace,
38 }
39
40 case *txnTaskQueueData:
41 ret = &taskqueueTxnImpl{
42 wrapper.DummyTQ(),
43 x,
44 ic,
45 curGID(ic).namespace,
46 }
47
48 default:
49 panic(fmt.Errorf("TQ: bad type: %v", tqd))
50 }
51 return ret
52 })
53 }
54
55 //////////////////////////////// taskqueueImpl /////////////////////////////////
56
57 type taskqueueImpl struct {
58 wrapper.TaskQueue
59 *taskQueueData
60
61 ctx context.Context
62 ns string
63 }
64
65 var (
66 _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
67 _ = wrapper.TQTestable((*taskqueueImpl)(nil))
68 )
69
70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
71 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
72 if err != nil {
73 return nil, err
74 }
75
76 if _, ok := t.archived[queueName][toSched.Name]; ok {
77 // SDK converts TOMBSTONE -> already added too
78 return nil, taskqueue.ErrTaskAlreadyAdded
79 } else if _, ok := t.named[queueName][toSched.Name]; ok {
80 return nil, taskqueue.ErrTaskAlreadyAdded
81 } else {
82 t.named[queueName][toSched.Name] = toSched
83 }
84
85 return dupTask(toSched), nil
86 }
87
88 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) {
89 if err := t.IsBroken(); err != nil {
90 return nil, err
91 }
92
93 t.Lock()
94 defer t.Unlock()
95
96 return t.addLocked(task, queueName)
97 }
98
99 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or {
100 queueName, err := t.getQueueName(queueName)
101 if err != nil {
102 return err
103 }
104
105 if _, ok := t.archived[queueName][task.Name]; ok {
106 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
107 }
108
109 if _, ok := t.named[queueName][task.Name]; !ok {
110 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
111 }
112
113 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
114 delete(t.named[queueName], task.Name)
115
116 return nil
117 }
118
119 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
120 if err := t.IsBroken(); err != nil {
121 return err
122 }
123
124 t.Lock()
125 defer t.Unlock()
126
127 return t.deleteLocked(task, queueName)
128 }
129
130 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) {
131 if err := t.IsBroken(); err != nil {
132 return nil, err
133 }
134
135 t.Lock()
136 defer t.Unlock()
137
138 return multi(tasks, queueName, t.addLocked)
139 }
140
141 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror {
142 if err := t.IsBroken(); err != nil {
143 return err
144 }
145
146 t.Lock()
147 defer t.Unlock()
148
149 _, err := multi(tasks, queueName,
150 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
151 return nil, t.deleteLocked(tsk, qn)
152 })
153 return err
154 }
155
156 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
157
158 type taskqueueTxnImpl struct {
159 wrapper.TaskQueue
160 *txnTaskQueueData
161
162 ctx context.Context
163 ns string
164 }
165
166 var (
167 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
168 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
169 )
170
171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
172 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e)
173 if err != nil {
174 return nil, err
175 }
176
177 numTasks := 0
178 for _, vs := range t.anony {
179 numTasks += len(vs)
180 }
181 if numTasks+1 > 5 {
182 // transactional tasks are actually implemented 'for real' as Ac tions which
183 // ride on the datastore. The current datastore implementation o nly allows
184 // a maximum of 5 Actions per transaction, and more than that re sult in a
185 // BAD_REQUEST.
186 return nil, newDSError(dbpb.Error_BAD_REQUEST)
187 }
188
189 t.anony[queueName] = append(t.anony[queueName], toSched)
190
191 // the fact that we have generated a unique name for this task queue ite m is
192 // an implementation detail.
193 // TODO(riannucci): now that I think about this... it may not actually b e true.
194 // We should verify that the .Name for a task added in a tr ansaction is
195 // meaningless. Maybe names generated in a transaction are somehow
196 // guaranteed to be meaningful?
197 toRet := dupTask(toSched)
198 toRet.Name = ""
199
200 return toRet, nil
201 }
202
203 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) {
204 if err := t.IsBroken(); err != nil {
205 return nil, err
206 }
207
208 t.Lock()
209 defer t.Unlock()
210
211 return t.addLocked(task, queueName)
212 }
213
214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) {
215 if err := t.IsBroken(); err != nil {
216 return nil, err
217 }
218
219 t.Lock()
220 defer t.Unlock()
221
222 return multi(tasks, queueName, t.addLocked)
223 }
224
225 ////////////////////////////// private functions ///////////////////////////////
226
227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
228
229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
230
231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str ing {
232 _, ok := queue[cur]
233 for !ok && cur == "" {
234 name := [500]byte{}
235 for i := 0; i < 500; i++ {
236 name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len (validTaskChars))]
237 }
238 cur = string(name[:])
239 _, ok = queue[cur]
240 }
241 return cur
242 }
243
244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
245 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
246 }
247
248 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
249 ret := []*taskqueue.Task(nil)
250 me := appengine.MultiError(nil)
251 foundErr := false
252 for _, task := range tasks {
253 rt, err := f(task, queueName)
254 ret = append(ret, rt)
255 me = append(me, err)
256 if err != nil {
257 foundErr = true
258 }
259 }
260 if !foundErr {
261 me = nil
262 }
263 return ret, me
264 }
265
266 func dupTask(t *taskqueue.Task) *taskqueue.Task {
267 ret := &taskqueue.Task{}
268 *ret = *t
269
270 if t.Header != nil {
271 ret.Header = make(http.Header, len(t.Header))
272 for k, vs := range t.Header {
273 newVs := make([]string, len(vs))
274 copy(newVs, vs)
275 ret.Header[k] = newVs
276 }
277 }
278
279 if t.Payload != nil {
280 ret.Payload = make([]byte, len(t.Payload))
281 copy(ret.Payload, t.Payload)
282 }
283
284 if t.RetryOptions != nil {
285 ret.RetryOptions = &taskqueue.RetryOptions{}
286 *ret.RetryOptions = *t.RetryOptions
287 }
288
289 return ret
290 }
291
292 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
293 r := make(wrapper.QueueData, len(q))
294 for k, q := range q {
295 r[k] = make(map[string]*taskqueue.Task, len(q))
296 for tn, t := range q {
297 r[k][tn] = dupTask(t)
298 }
299 }
300 return r
301 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/plist_test.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698