Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(201)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue.go

Issue 1230303003: Revert "Refactor current GAE abstraction library to be free of the SDK*" (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors"
9 "fmt" 8 "fmt"
9 "infra/gae/libs/wrapper"
10 "net/http" 10 "net/http"
11 "regexp" 11 "regexp"
12 12
13 "golang.org/x/net/context" 13 "golang.org/x/net/context"
14 14
15 » "infra/gae/libs/gae" 15 » "appengine"
16 » "appengine/taskqueue"
17 » "appengine_internal"
18 » dbpb "appengine_internal/datastore"
19 » pb "appengine_internal/taskqueue"
16 ) 20 )
17 21
18 /////////////////////////////// public functions /////////////////////////////// 22 /////////////////////////////// public functions ///////////////////////////////
19 23
20 func useTQ(c context.Context) context.Context { 24 func useTQ(c context.Context) context.Context {
21 » return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { 25 » return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e {
22 tqd := cur(ic).Get(memContextTQIdx) 26 tqd := cur(ic).Get(memContextTQIdx)
23 var ret interface { 27 var ret interface {
24 » » » gae.TQTestable 28 » » » wrapper.TQTestable
25 » » » gae.TaskQueue 29 » » » wrapper.TaskQueue
26 } 30 }
27 switch x := tqd.(type) { 31 switch x := tqd.(type) {
28 case *taskQueueData: 32 case *taskQueueData:
29 ret = &taskqueueImpl{ 33 ret = &taskqueueImpl{
30 » » » » gae.DummyTQ(), 34 » » » » wrapper.DummyTQ(),
31 x, 35 x,
32 ic, 36 ic,
33 curGID(ic).namespace, 37 curGID(ic).namespace,
34 } 38 }
35 39
36 case *txnTaskQueueData: 40 case *txnTaskQueueData:
37 ret = &taskqueueTxnImpl{ 41 ret = &taskqueueTxnImpl{
38 » » » » gae.DummyTQ(), 42 » » » » wrapper.DummyTQ(),
39 x, 43 x,
40 ic, 44 ic,
41 curGID(ic).namespace, 45 curGID(ic).namespace,
42 } 46 }
43 47
44 default: 48 default:
45 panic(fmt.Errorf("TQ: bad type: %v", tqd)) 49 panic(fmt.Errorf("TQ: bad type: %v", tqd))
46 } 50 }
47 return ret 51 return ret
48 }) 52 })
49 } 53 }
50 54
51 //////////////////////////////// taskqueueImpl ///////////////////////////////// 55 //////////////////////////////// taskqueueImpl /////////////////////////////////
52 56
53 type taskqueueImpl struct { 57 type taskqueueImpl struct {
54 » gae.TaskQueue 58 » wrapper.TaskQueue
55 *taskQueueData 59 *taskQueueData
56 60
57 ctx context.Context 61 ctx context.Context
58 ns string 62 ns string
59 } 63 }
60 64
61 var ( 65 var (
62 » _ = gae.TaskQueue((*taskqueueImpl)(nil)) 66 » _ = wrapper.TaskQueue((*taskqueueImpl)(nil))
63 » _ = gae.TQTestable((*taskqueueImpl)(nil)) 67 » _ = wrapper.TQTestable((*taskqueueImpl)(nil))
64 ) 68 )
65 69
66 func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa sk, error) { 70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) {
67 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) 71 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
68 if err != nil { 72 if err != nil {
69 return nil, err 73 return nil, err
70 } 74 }
71 75
72 if _, ok := t.archived[queueName][toSched.Name]; ok { 76 if _, ok := t.archived[queueName][toSched.Name]; ok {
73 // SDK converts TOMBSTONE -> already added too 77 // SDK converts TOMBSTONE -> already added too
74 » » return nil, gae.ErrTQTaskAlreadyAdded 78 » » return nil, taskqueue.ErrTaskAlreadyAdded
75 } else if _, ok := t.named[queueName][toSched.Name]; ok { 79 } else if _, ok := t.named[queueName][toSched.Name]; ok {
76 » » return nil, gae.ErrTQTaskAlreadyAdded 80 » » return nil, taskqueue.ErrTaskAlreadyAdded
77 } else { 81 } else {
78 t.named[queueName][toSched.Name] = toSched 82 t.named[queueName][toSched.Name] = toSched
79 } 83 }
80 84
81 return dupTask(toSched), nil 85 return dupTask(toSched), nil
82 } 86 }
83 87
84 func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQ Task, err error) { 88 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) {
85 » err = t.RunIfNotBroken(func() (err error) { 89 » if err := t.IsBroken(); err != nil {
86 » » t.Lock() 90 » » return nil, err
87 » » defer t.Unlock() 91 » }
88 » » retTask, err = t.addLocked(task, queueName) 92
89 » » return 93 » t.Lock()
90 » }) 94 » defer t.Unlock()
91 » return 95
96 » return t.addLocked(task, queueName)
92 } 97 }
93 98
94 func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { 99 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or {
95 queueName, err := t.getQueueName(queueName) 100 queueName, err := t.getQueueName(queueName)
96 if err != nil { 101 if err != nil {
97 return err 102 return err
98 } 103 }
99 104
100 if _, ok := t.archived[queueName][task.Name]; ok { 105 if _, ok := t.archived[queueName][task.Name]; ok {
101 » » return errors.New("TOMBSTONED_TASK") 106 » » return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK)
102 } 107 }
103 108
104 if _, ok := t.named[queueName][task.Name]; !ok { 109 if _, ok := t.named[queueName][task.Name]; !ok {
105 » » return errors.New("UNKNOWN_TASK") 110 » » return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK)
106 } 111 }
107 112
108 t.archived[queueName][task.Name] = t.named[queueName][task.Name] 113 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
109 delete(t.named[queueName], task.Name) 114 delete(t.named[queueName], task.Name)
110 115
111 return nil 116 return nil
112 } 117 }
113 118
114 func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { 119 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error {
115 » return t.RunIfNotBroken(func() error { 120 » if err := t.IsBroken(); err != nil {
116 » » t.Lock() 121 » » return err
117 » » defer t.Unlock() 122 » }
118 » » return t.deleteLocked(task, queueName) 123
119 » }) 124 » t.Lock()
125 » defer t.Unlock()
126
127 » return t.deleteLocked(task, queueName)
120 } 128 }
121 129
122 func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTask s []*gae.TQTask, err error) { 130 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) {
123 » err = t.RunIfNotBroken(func() (err error) { 131 » if err := t.IsBroken(); err != nil {
124 » » t.Lock() 132 » » return nil, err
125 » » defer t.Unlock() 133 » }
126 » » retTasks, err = multi(tasks, queueName, t.addLocked) 134
127 » » return 135 » t.Lock()
128 » }) 136 » defer t.Unlock()
129 » return 137
138 » return multi(tasks, queueName, t.addLocked)
130 } 139 }
131 140
132 func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { 141 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror {
133 » return t.RunIfNotBroken(func() error { 142 » if err := t.IsBroken(); err != nil {
134 » » t.Lock() 143 » » return err
135 » » defer t.Unlock() 144 » }
136 145
137 » » _, err := multi(tasks, queueName, 146 » t.Lock()
138 » » » func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { 147 » defer t.Unlock()
139 » » » » return nil, t.deleteLocked(tsk, qn) 148
140 » » » }) 149 » _, err := multi(tasks, queueName,
141 » » return err 150 » » func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) {
142 » }) 151 » » » return nil, t.deleteLocked(tsk, qn)
152 » » })
153 » return err
143 } 154 }
144 155
145 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 156 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
146 157
147 type taskqueueTxnImpl struct { 158 type taskqueueTxnImpl struct {
148 » gae.TaskQueue 159 » wrapper.TaskQueue
149 *txnTaskQueueData 160 *txnTaskQueueData
150 161
151 ctx context.Context 162 ctx context.Context
152 ns string 163 ns string
153 } 164 }
154 165
155 var ( 166 var (
156 » _ = gae.TaskQueue((*taskqueueTxnImpl)(nil)) 167 » _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil))
157 » _ = gae.TQTestable((*taskqueueTxnImpl)(nil)) 168 » _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil))
158 ) 169 )
159 170
160 func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T QTask, error) { 171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) {
161 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e) 172 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e)
162 if err != nil { 173 if err != nil {
163 return nil, err 174 return nil, err
164 } 175 }
165 176
166 numTasks := 0 177 numTasks := 0
167 for _, vs := range t.anony { 178 for _, vs := range t.anony {
168 numTasks += len(vs) 179 numTasks += len(vs)
169 } 180 }
170 if numTasks+1 > 5 { 181 if numTasks+1 > 5 {
171 // transactional tasks are actually implemented 'for real' as Ac tions which 182 // transactional tasks are actually implemented 'for real' as Ac tions which
172 // ride on the datastore. The current datastore implementation o nly allows 183 // ride on the datastore. The current datastore implementation o nly allows
173 // a maximum of 5 Actions per transaction, and more than that re sult in a 184 // a maximum of 5 Actions per transaction, and more than that re sult in a
174 // BAD_REQUEST. 185 // BAD_REQUEST.
175 » » return nil, errors.New("BAD_REQUEST") 186 » » return nil, newDSError(dbpb.Error_BAD_REQUEST)
176 } 187 }
177 188
178 t.anony[queueName] = append(t.anony[queueName], toSched) 189 t.anony[queueName] = append(t.anony[queueName], toSched)
179 190
180 // the fact that we have generated a unique name for this task queue ite m is 191 // the fact that we have generated a unique name for this task queue ite m is
181 // an implementation detail. 192 // an implementation detail.
182 // TODO(riannucci): now that I think about this... it may not actually b e true. 193 // TODO(riannucci): now that I think about this... it may not actually b e true.
183 // We should verify that the .Name for a task added in a tr ansaction is 194 // We should verify that the .Name for a task added in a tr ansaction is
184 // meaningless. Maybe names generated in a transaction are somehow 195 // meaningless. Maybe names generated in a transaction are somehow
185 // guaranteed to be meaningful? 196 // guaranteed to be meaningful?
186 toRet := dupTask(toSched) 197 toRet := dupTask(toSched)
187 toRet.Name = "" 198 toRet.Name = ""
188 199
189 return toRet, nil 200 return toRet, nil
190 } 201 }
191 202
192 func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae .TQTask, err error) { 203 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) {
193 » err = t.RunIfNotBroken(func() (err error) { 204 » if err := t.IsBroken(); err != nil {
194 » » t.Lock() 205 » » return nil, err
195 » » defer t.Unlock() 206 » }
196 » » retTask, err = t.addLocked(task, queueName) 207
197 » » return 208 » t.Lock()
198 » }) 209 » defer t.Unlock()
199 » return 210
211 » return t.addLocked(task, queueName)
200 } 212 }
201 213
202 func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retT asks []*gae.TQTask, err error) { 214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) {
203 » err = t.RunIfNotBroken(func() (err error) { 215 » if err := t.IsBroken(); err != nil {
204 » » t.Lock() 216 » » return nil, err
205 » » defer t.Unlock() 217 » }
206 » » retTasks, err = multi(tasks, queueName, t.addLocked) 218
207 » » return 219 » t.Lock()
208 » }) 220 » defer t.Unlock()
209 » return 221
222 » return multi(tasks, queueName, t.addLocked)
210 } 223 }
211 224
212 ////////////////////////////// private functions /////////////////////////////// 225 ////////////////////////////// private functions ///////////////////////////////
213 226
214 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
215 228
216 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
217 230
218 func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string { 231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str ing {
219 _, ok := queue[cur] 232 _, ok := queue[cur]
220 for !ok && cur == "" { 233 for !ok && cur == "" {
221 name := [500]byte{} 234 name := [500]byte{}
222 for i := 0; i < 500; i++ { 235 for i := 0; i < 500; i++ {
223 » » » name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(val idTaskChars))] 236 » » » name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len (validTaskChars))]
224 } 237 }
225 cur = string(name[:]) 238 cur = string(name[:])
226 _, ok = queue[cur] 239 _, ok = queue[cur]
227 } 240 }
228 return cur 241 return cur
229 } 242 }
230 243
231 func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (* gae.TQTask, error)) ([]*gae.TQTask, error) { 244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error {
232 » ret := []*gae.TQTask(nil) 245 » return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)}
233 » me := gae.MultiError(nil) 246 }
247
248 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) {
249 » ret := []*taskqueue.Task(nil)
250 » me := appengine.MultiError(nil)
234 foundErr := false 251 foundErr := false
235 for _, task := range tasks { 252 for _, task := range tasks {
236 rt, err := f(task, queueName) 253 rt, err := f(task, queueName)
237 ret = append(ret, rt) 254 ret = append(ret, rt)
238 me = append(me, err) 255 me = append(me, err)
239 if err != nil { 256 if err != nil {
240 foundErr = true 257 foundErr = true
241 } 258 }
242 } 259 }
243 if !foundErr { 260 if !foundErr {
244 me = nil 261 me = nil
245 } 262 }
246 return ret, me 263 return ret, me
247 } 264 }
248 265
249 func dupTask(t *gae.TQTask) *gae.TQTask { 266 func dupTask(t *taskqueue.Task) *taskqueue.Task {
250 » ret := &gae.TQTask{} 267 » ret := &taskqueue.Task{}
251 *ret = *t 268 *ret = *t
252 269
253 if t.Header != nil { 270 if t.Header != nil {
254 ret.Header = make(http.Header, len(t.Header)) 271 ret.Header = make(http.Header, len(t.Header))
255 for k, vs := range t.Header { 272 for k, vs := range t.Header {
256 newVs := make([]string, len(vs)) 273 newVs := make([]string, len(vs))
257 copy(newVs, vs) 274 copy(newVs, vs)
258 ret.Header[k] = newVs 275 ret.Header[k] = newVs
259 } 276 }
260 } 277 }
261 278
262 if t.Payload != nil { 279 if t.Payload != nil {
263 ret.Payload = make([]byte, len(t.Payload)) 280 ret.Payload = make([]byte, len(t.Payload))
264 copy(ret.Payload, t.Payload) 281 copy(ret.Payload, t.Payload)
265 } 282 }
266 283
267 if t.RetryOptions != nil { 284 if t.RetryOptions != nil {
268 » » ret.RetryOptions = &gae.TQRetryOptions{} 285 » » ret.RetryOptions = &taskqueue.RetryOptions{}
269 *ret.RetryOptions = *t.RetryOptions 286 *ret.RetryOptions = *t.RetryOptions
270 } 287 }
271 288
272 return ret 289 return ret
273 } 290 }
274 291
275 func dupQueue(q gae.QueueData) gae.QueueData { 292 func dupQueue(q wrapper.QueueData) wrapper.QueueData {
276 » r := make(gae.QueueData, len(q)) 293 » r := make(wrapper.QueueData, len(q))
277 for k, q := range q { 294 for k, q := range q {
278 » » r[k] = make(map[string]*gae.TQTask, len(q)) 295 » » r[k] = make(map[string]*taskqueue.Task, len(q))
279 for tn, t := range q { 296 for tn, t := range q {
280 r[k][tn] = dupTask(t) 297 r[k][tn] = dupTask(t)
281 } 298 }
282 } 299 }
283 return r 300 return r
284 } 301 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/plist_test.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698