Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(39)

Side by Side Diff: impl/memory/taskqueue.go

Issue 2302743002: Interface update, per-method Contexts. (Closed)
Patch Set: Lightning talk licenses. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/race_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "regexp" 8 "regexp"
9 "sync/atomic"
10 9
11 "golang.org/x/net/context" 10 "golang.org/x/net/context"
12 11
13 tq "github.com/luci/gae/service/taskqueue" 12 tq "github.com/luci/gae/service/taskqueue"
13
14 "github.com/luci/luci-go/common/data/rand/mathrand" 14 "github.com/luci/luci-go/common/data/rand/mathrand"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 ) 16 )
17 17
18 /////////////////////////////// public functions /////////////////////////////// 18 /////////////////////////////// public functions ///////////////////////////////
19 19
20 func useTQ(c context.Context) context.Context { 20 func useTQ(c context.Context) context.Context {
21 » return tq.SetRawFactory(c, func(ic context.Context, wantTxn bool) tq.Raw Interface { 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
22 » » ns, _ := curGID(ic).getNamespace() 22 » » memCtx, isTxn := cur(ic)
23 » » var tqd memContextObj 23 » » tqd := memCtx.Get(memContextTQIdx)
24 24
25 » » if !wantTxn { 25 » » if isTxn {
26 » » » tqd = curNoTxn(ic).Get(memContextTQIdx) 26 » » » return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic}
27 » » } else {
28 » » » tqd = cur(ic).Get(memContextTQIdx)
29 } 27 }
30 28 » » return &taskqueueImpl{tqd.(*taskQueueData), ic}
31 » » if x, ok := tqd.(*taskQueueData); ok {
32 » » » return &taskqueueImpl{x, ic, ns}
33 » » }
34 » » return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic, ns}
35 }) 29 })
36 } 30 }
37 31
38 //////////////////////////////// taskqueueImpl ///////////////////////////////// 32 //////////////////////////////// taskqueueImpl /////////////////////////////////
39 33
40 type taskqueueImpl struct { 34 type taskqueueImpl struct {
41 *taskQueueData 35 *taskQueueData
42 36
43 ctx context.Context 37 ctx context.Context
44 ns string
45 } 38 }
46 39
47 var ( 40 var (
48 _ = tq.RawInterface((*taskqueueImpl)(nil)) 41 _ = tq.RawInterface((*taskqueueImpl)(nil))
49 _ = tq.Testable((*taskqueueImpl)(nil)) 42 _ = tq.Testable((*taskqueueImpl)(nil))
50 ) 43 )
51 44
52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) { 45 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) {
53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) 46 » toSched, err := t.prepTask(t.ctx, task, queueName)
54 if err != nil { 47 if err != nil {
55 return nil, err 48 return nil, err
56 } 49 }
57 50
58 if _, ok := t.archived[queueName][toSched.Name]; ok { 51 if _, ok := t.archived[queueName][toSched.Name]; ok {
59 // SDK converts TOMBSTONE -> already added too 52 // SDK converts TOMBSTONE -> already added too
60 return nil, tq.ErrTaskAlreadyAdded 53 return nil, tq.ErrTaskAlreadyAdded
61 } else if _, ok := t.named[queueName][toSched.Name]; ok { 54 } else if _, ok := t.named[queueName][toSched.Name]; ok {
62 return nil, tq.ErrTaskAlreadyAdded 55 return nil, tq.ErrTaskAlreadyAdded
63 } else { 56 } else {
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 s.OldestETA = t.ETA 131 s.OldestETA = t.ETA
139 } 132 }
140 } 133 }
141 cb(&s, nil) 134 cb(&s, nil)
142 } 135 }
143 } 136 }
144 137
145 return nil 138 return nil
146 } 139 }
147 140
148 func (t *taskqueueImpl) Testable() tq.Testable { 141 func (t *taskqueueImpl) GetTestable() tq.Testable { return t }
149 » return t
150 }
151 142
152 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 143 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
153 144
154 type taskqueueTxnImpl struct { 145 type taskqueueTxnImpl struct {
155 *txnTaskQueueData 146 *txnTaskQueueData
156 147
157 ctx context.Context 148 ctx context.Context
158 ns string
159 } 149 }
160 150
161 var _ interface { 151 var _ interface {
162 tq.RawInterface 152 tq.RawInterface
163 tq.Testable 153 tq.Testable
164 } = (*taskqueueTxnImpl)(nil) 154 } = (*taskqueueTxnImpl)(nil)
165 155
166 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { 156 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
167 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) 157 » toSched, err := t.parent.prepTask(t.ctx, task, queueName)
168 if err != nil { 158 if err != nil {
169 return nil, err 159 return nil, err
170 } 160 }
171 161
172 numTasks := 0 162 numTasks := 0
173 for _, vs := range t.anony { 163 for _, vs := range t.anony {
174 numTasks += len(vs) 164 numTasks += len(vs)
175 } 165 }
176 if numTasks+1 > 5 { 166 if numTasks+1 > 5 {
177 // transactional tasks are actually implemented 'for real' as Ac tions which 167 // transactional tasks are actually implemented 'for real' as Ac tions which
(...skipping 11 matching lines...) Expand all
189 // We should verify that the .Name for a task added in a tr ansaction is 179 // We should verify that the .Name for a task added in a tr ansaction is
190 // meaningless. Maybe names generated in a transaction are somehow 180 // meaningless. Maybe names generated in a transaction are somehow
191 // guaranteed to be meaningful? 181 // guaranteed to be meaningful?
192 toRet := toSched.Duplicate() 182 toRet := toSched.Duplicate()
193 toRet.Name = "" 183 toRet.Name = ""
194 184
195 return toRet, nil 185 return toRet, nil
196 } 186 }
197 187
198 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra wTaskCB) error { 188 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra wTaskCB) error {
199 » if atomic.LoadInt32(&t.closed) == 1 { 189 » if err := assertTxnValid(t.ctx); err != nil {
200 » » return errors.New("taskqueue: transaction context has expired") 190 » » return err
201 } 191 }
202 192
203 t.Lock() 193 t.Lock()
204 defer t.Unlock() 194 defer t.Unlock()
205 195
206 queueName, err := t.parent.getQueueNameLocked(queueName) 196 queueName, err := t.parent.getQueueNameLocked(queueName)
207 if err != nil { 197 if err != nil {
208 return err 198 return err
209 } 199 }
210 200
211 for _, task := range tasks { 201 for _, task := range tasks {
212 cb(t.addLocked(task, queueName)) 202 cb(t.addLocked(task, queueName))
213 } 203 }
214 return nil 204 return nil
215 } 205 }
216 206
217 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { 207 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
218 return errors.New("taskqueue: cannot DeleteMulti from a transaction") 208 return errors.New("taskqueue: cannot DeleteMulti from a transaction")
219 } 209 }
220 210
221 func (t *taskqueueTxnImpl) Purge(string) error { 211 func (t *taskqueueTxnImpl) Purge(string) error {
222 return errors.New("taskqueue: cannot Purge from a transaction") 212 return errors.New("taskqueue: cannot Purge from a transaction")
223 } 213 }
224 214
225 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { 215 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
226 return errors.New("taskqueue: cannot Stats from a transaction") 216 return errors.New("taskqueue: cannot Stats from a transaction")
227 } 217 }
228 218
229 func (t *taskqueueTxnImpl) Testable() tq.Testable { 219 func (t *taskqueueTxnImpl) GetTestable() tq.Testable { return t }
230 » return t
231 }
232 220
233 ////////////////////////////// private functions /////////////////////////////// 221 ////////////////////////////// private functions ///////////////////////////////
234 222
235 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") 223 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
236 224
237 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" 225 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
238 226
239 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { 227 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
240 _, ok := queue[cur] 228 _, ok := queue[cur]
241 for !ok && cur == "" { 229 for !ok && cur == "" {
(...skipping 10 matching lines...) Expand all
252 func dupQueue(q tq.QueueData) tq.QueueData { 240 func dupQueue(q tq.QueueData) tq.QueueData {
253 r := make(tq.QueueData, len(q)) 241 r := make(tq.QueueData, len(q))
254 for k, q := range q { 242 for k, q := range q {
255 r[k] = make(map[string]*tq.Task, len(q)) 243 r[k] = make(map[string]*tq.Task, len(q))
256 for tn, t := range q { 244 for tn, t := range q {
257 r[k][tn] = t.Duplicate() 245 r[k][tn] = t.Duplicate()
258 } 246 }
259 } 247 }
260 return r 248 return r
261 } 249 }
OLDNEW
« no previous file with comments | « impl/memory/race_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698