Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(428)

Side by Side Diff: impl/memory/taskqueue.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/raw_datastore_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "net/http" 9 "net/http"
10 "regexp" 10 "regexp"
11 11
12 "golang.org/x/net/context" 12 "golang.org/x/net/context"
13 13
14 "github.com/luci/gae" 14 "github.com/luci/gae"
15 » "github.com/luci/gae/dummy" 15 » "github.com/luci/gae/impl/dummy"
16 » tq "github.com/luci/gae/service/taskqueue"
17 » "github.com/luci/luci-go/common/mathrand"
16 ) 18 )
17 19
18 /////////////////////////////// public functions /////////////////////////////// 20 /////////////////////////////// public functions ///////////////////////////////
19 21
20 func useTQ(c context.Context) context.Context { 22 func useTQ(c context.Context) context.Context {
21 » return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { 23 » return tq.SetFactory(c, func(ic context.Context) tq.Interface {
22 tqd := cur(ic).Get(memContextTQIdx) 24 tqd := cur(ic).Get(memContextTQIdx)
23 if x, ok := tqd.(*taskQueueData); ok { 25 if x, ok := tqd.(*taskQueueData); ok {
24 return &taskqueueImpl{ 26 return &taskqueueImpl{
25 » » » » dummy.TQ(), 27 » » » » dummy.TaskQueue(),
26 x, 28 x,
27 ic, 29 ic,
28 curGID(ic).namespace, 30 curGID(ic).namespace,
29 } 31 }
30 } 32 }
31 return &taskqueueTxnImpl{ 33 return &taskqueueTxnImpl{
32 » » » dummy.TQ(), 34 » » » dummy.TaskQueue(),
33 tqd.(*txnTaskQueueData), 35 tqd.(*txnTaskQueueData),
34 ic, 36 ic,
35 curGID(ic).namespace, 37 curGID(ic).namespace,
36 } 38 }
37 }) 39 })
38 } 40 }
39 41
40 //////////////////////////////// taskqueueImpl ///////////////////////////////// 42 //////////////////////////////// taskqueueImpl /////////////////////////////////
41 43
42 type taskqueueImpl struct { 44 type taskqueueImpl struct {
43 » gae.TaskQueue 45 » tq.Interface
44 *taskQueueData 46 *taskQueueData
45 47
46 ctx context.Context 48 ctx context.Context
47 ns string 49 ns string
48 } 50 }
49 51
50 var ( 52 var (
51 » _ = gae.TaskQueue((*taskqueueImpl)(nil)) 53 » _ = tq.Interface((*taskqueueImpl)(nil))
52 » _ = gae.TQTestable((*taskqueueImpl)(nil)) 54 » _ = tq.Testable((*taskqueueImpl)(nil))
53 ) 55 )
54 56
55 func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa sk, error) { 57 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) {
56 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) 58 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
57 if err != nil { 59 if err != nil {
58 return nil, err 60 return nil, err
59 } 61 }
60 62
61 if _, ok := t.archived[queueName][toSched.Name]; ok { 63 if _, ok := t.archived[queueName][toSched.Name]; ok {
62 // SDK converts TOMBSTONE -> already added too 64 // SDK converts TOMBSTONE -> already added too
63 » » return nil, gae.ErrTQTaskAlreadyAdded 65 » » return nil, tq.ErrTaskAlreadyAdded
64 } else if _, ok := t.named[queueName][toSched.Name]; ok { 66 } else if _, ok := t.named[queueName][toSched.Name]; ok {
65 » » return nil, gae.ErrTQTaskAlreadyAdded 67 » » return nil, tq.ErrTaskAlreadyAdded
66 } else { 68 } else {
67 t.named[queueName][toSched.Name] = toSched 69 t.named[queueName][toSched.Name] = toSched
68 } 70 }
69 71
70 return dupTask(toSched), nil 72 return dupTask(toSched), nil
71 } 73 }
72 74
73 func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, er ror) { 75 func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
74 t.Lock() 76 t.Lock()
75 defer t.Unlock() 77 defer t.Unlock()
76 return t.addLocked(task, queueName) 78 return t.addLocked(task, queueName)
77 } 79 }
78 80
79 func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { 81 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
80 queueName, err := t.getQueueName(queueName) 82 queueName, err := t.getQueueName(queueName)
81 if err != nil { 83 if err != nil {
82 return err 84 return err
83 } 85 }
84 86
85 if _, ok := t.archived[queueName][task.Name]; ok { 87 if _, ok := t.archived[queueName][task.Name]; ok {
86 return errors.New("TOMBSTONED_TASK") 88 return errors.New("TOMBSTONED_TASK")
87 } 89 }
88 90
89 if _, ok := t.named[queueName][task.Name]; !ok { 91 if _, ok := t.named[queueName][task.Name]; !ok {
90 return errors.New("UNKNOWN_TASK") 92 return errors.New("UNKNOWN_TASK")
91 } 93 }
92 94
93 t.archived[queueName][task.Name] = t.named[queueName][task.Name] 95 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
94 delete(t.named[queueName], task.Name) 96 delete(t.named[queueName], task.Name)
95 97
96 return nil 98 return nil
97 } 99 }
98 100
99 func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { 101 func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error {
100 t.Lock() 102 t.Lock()
101 defer t.Unlock() 103 defer t.Unlock()
102 return t.deleteLocked(task, queueName) 104 return t.deleteLocked(task, queueName)
103 } 105 }
104 106
105 func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae. TQTask, error) { 107 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task , error) {
106 t.Lock() 108 t.Lock()
107 defer t.Unlock() 109 defer t.Unlock()
108 return multi(tasks, queueName, t.addLocked) 110 return multi(tasks, queueName, t.addLocked)
109 } 111 }
110 112
111 func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { 113 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
112 t.Lock() 114 t.Lock()
113 defer t.Unlock() 115 defer t.Unlock()
114 116
115 _, err := multi(tasks, queueName, 117 _, err := multi(tasks, queueName,
116 » » func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { 118 » » func(tsk *tq.Task, qn string) (*tq.Task, error) {
117 return nil, t.deleteLocked(tsk, qn) 119 return nil, t.deleteLocked(tsk, qn)
118 }) 120 })
119 return err 121 return err
120 } 122 }
121 123
122 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 124 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
123 125
124 type taskqueueTxnImpl struct { 126 type taskqueueTxnImpl struct {
125 » gae.TaskQueue 127 » tq.Interface
126 *txnTaskQueueData 128 *txnTaskQueueData
127 129
128 ctx context.Context 130 ctx context.Context
129 ns string 131 ns string
130 } 132 }
131 133
132 var _ interface { 134 var _ interface {
133 » gae.TaskQueue 135 » tq.Interface
134 » gae.TQTestable 136 » tq.Testable
135 } = (*taskqueueTxnImpl)(nil) 137 } = (*taskqueueTxnImpl)(nil)
136 138
137 func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T QTask, error) { 139 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
138 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e) 140 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e)
139 if err != nil { 141 if err != nil {
140 return nil, err 142 return nil, err
141 } 143 }
142 144
143 numTasks := 0 145 numTasks := 0
144 for _, vs := range t.anony { 146 for _, vs := range t.anony {
145 numTasks += len(vs) 147 numTasks += len(vs)
146 } 148 }
147 if numTasks+1 > 5 { 149 if numTasks+1 > 5 {
(...skipping 11 matching lines...) Expand all
159 // TODO(riannucci): now that I think about this... it may not actually b e true. 161 // TODO(riannucci): now that I think about this... it may not actually b e true.
160 // We should verify that the .Name for a task added in a tr ansaction is 162 // We should verify that the .Name for a task added in a tr ansaction is
161 // meaningless. Maybe names generated in a transaction are somehow 163 // meaningless. Maybe names generated in a transaction are somehow
162 // guaranteed to be meaningful? 164 // guaranteed to be meaningful?
163 toRet := dupTask(toSched) 165 toRet := dupTask(toSched)
164 toRet.Name = "" 166 toRet.Name = ""
165 167
166 return toRet, nil 168 return toRet, nil
167 } 169 }
168 170
169 func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae .TQTask, err error) { 171 func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Tas k, err error) {
170 err = t.run(func() (err error) { 172 err = t.run(func() (err error) {
171 t.Lock() 173 t.Lock()
172 defer t.Unlock() 174 defer t.Unlock()
173 retTask, err = t.addLocked(task, queueName) 175 retTask, err = t.addLocked(task, queueName)
174 return 176 return
175 }) 177 })
176 return 178 return
177 } 179 }
178 180
179 func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retT asks []*gae.TQTask, err error) { 181 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTask s []*tq.Task, err error) {
180 err = t.run(func() (err error) { 182 err = t.run(func() (err error) {
181 t.Lock() 183 t.Lock()
182 defer t.Unlock() 184 defer t.Unlock()
183 retTasks, err = multi(tasks, queueName, t.addLocked) 185 retTasks, err = multi(tasks, queueName, t.addLocked)
184 return 186 return
185 }) 187 })
186 return 188 return
187 } 189 }
188 190
189 ////////////////////////////// private functions /////////////////////////////// 191 ////////////////////////////// private functions ///////////////////////////////
190 192
191 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") 193 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
192 194
193 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" 195 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
194 196
195 func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string { 197 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
196 _, ok := queue[cur] 198 _, ok := queue[cur]
197 for !ok && cur == "" { 199 for !ok && cur == "" {
198 name := [500]byte{} 200 name := [500]byte{}
199 for i := 0; i < 500; i++ { 201 for i := 0; i < 500; i++ {
200 » » » name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(val idTaskChars))] 202 » » » name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))]
201 } 203 }
202 cur = string(name[:]) 204 cur = string(name[:])
203 _, ok = queue[cur] 205 _, ok = queue[cur]
204 } 206 }
205 return cur 207 return cur
206 } 208 }
207 209
208 func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (* gae.TQTask, error)) ([]*gae.TQTask, error) { 210 func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Tas k, error)) ([]*tq.Task, error) {
209 » ret := []*gae.TQTask(nil) 211 » ret := []*tq.Task(nil)
210 lme := gae.LazyMultiError{Size: len(tasks)} 212 lme := gae.LazyMultiError{Size: len(tasks)}
211 for i, task := range tasks { 213 for i, task := range tasks {
212 rt, err := f(task, queueName) 214 rt, err := f(task, queueName)
213 ret = append(ret, rt) 215 ret = append(ret, rt)
214 lme.Assign(i, err) 216 lme.Assign(i, err)
215 } 217 }
216 return ret, lme.Get() 218 return ret, lme.Get()
217 } 219 }
218 220
219 func dupTask(t *gae.TQTask) *gae.TQTask { 221 func dupTask(t *tq.Task) *tq.Task {
220 » ret := &gae.TQTask{} 222 » ret := &tq.Task{}
221 *ret = *t 223 *ret = *t
222 224
223 if t.Header != nil { 225 if t.Header != nil {
224 ret.Header = make(http.Header, len(t.Header)) 226 ret.Header = make(http.Header, len(t.Header))
225 for k, vs := range t.Header { 227 for k, vs := range t.Header {
226 newVs := make([]string, len(vs)) 228 newVs := make([]string, len(vs))
227 copy(newVs, vs) 229 copy(newVs, vs)
228 ret.Header[k] = newVs 230 ret.Header[k] = newVs
229 } 231 }
230 } 232 }
231 233
232 if t.Payload != nil { 234 if t.Payload != nil {
233 ret.Payload = make([]byte, len(t.Payload)) 235 ret.Payload = make([]byte, len(t.Payload))
234 copy(ret.Payload, t.Payload) 236 copy(ret.Payload, t.Payload)
235 } 237 }
236 238
237 if t.RetryOptions != nil { 239 if t.RetryOptions != nil {
238 » » ret.RetryOptions = &gae.TQRetryOptions{} 240 » » ret.RetryOptions = &tq.RetryOptions{}
239 *ret.RetryOptions = *t.RetryOptions 241 *ret.RetryOptions = *t.RetryOptions
240 } 242 }
241 243
242 return ret 244 return ret
243 } 245 }
244 246
245 func dupQueue(q gae.QueueData) gae.QueueData { 247 func dupQueue(q tq.QueueData) tq.QueueData {
246 » r := make(gae.QueueData, len(q)) 248 » r := make(tq.QueueData, len(q))
247 for k, q := range q { 249 for k, q := range q {
248 » » r[k] = make(map[string]*gae.TQTask, len(q)) 250 » » r[k] = make(map[string]*tq.Task, len(q))
249 for tn, t := range q { 251 for tn, t := range q {
250 r[k][tn] = dupTask(t) 252 r[k][tn] = dupTask(t)
251 } 253 }
252 } 254 }
253 return r 255 return r
254 } 256 }
OLDNEW
« no previous file with comments | « impl/memory/raw_datastore_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698