Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: impl/memory/taskqueue.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: address comments Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/memcache_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "net/http"
9 "regexp" 8 "regexp"
9 "sync/atomic"
10 10
11 "golang.org/x/net/context" 11 "golang.org/x/net/context"
12 12
13 "github.com/luci/gae/impl/dummy"
14 tq "github.com/luci/gae/service/taskqueue" 13 tq "github.com/luci/gae/service/taskqueue"
15 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/mathrand" 15 "github.com/luci/luci-go/common/mathrand"
17 ) 16 )
18 17
19 /////////////////////////////// public functions /////////////////////////////// 18 /////////////////////////////// public functions ///////////////////////////////
20 19
21 func useTQ(c context.Context) context.Context { 20 func useTQ(c context.Context) context.Context {
22 » return tq.SetFactory(c, func(ic context.Context) tq.Interface { 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
23 tqd := cur(ic).Get(memContextTQIdx) 22 tqd := cur(ic).Get(memContextTQIdx)
24 if x, ok := tqd.(*taskQueueData); ok { 23 if x, ok := tqd.(*taskQueueData); ok {
25 return &taskqueueImpl{ 24 return &taskqueueImpl{
26 dummy.TaskQueue(),
27 x, 25 x,
28 ic, 26 ic,
29 curGID(ic).namespace, 27 curGID(ic).namespace,
30 } 28 }
31 } 29 }
32 return &taskqueueTxnImpl{ 30 return &taskqueueTxnImpl{
33 dummy.TaskQueue(),
34 tqd.(*txnTaskQueueData), 31 tqd.(*txnTaskQueueData),
35 ic, 32 ic,
36 curGID(ic).namespace, 33 curGID(ic).namespace,
37 } 34 }
38 }) 35 })
39 } 36 }
40 37
41 //////////////////////////////// taskqueueImpl ///////////////////////////////// 38 //////////////////////////////// taskqueueImpl /////////////////////////////////
42 39
43 type taskqueueImpl struct { 40 type taskqueueImpl struct {
44 tq.Interface
45 *taskQueueData 41 *taskQueueData
46 42
47 ctx context.Context 43 ctx context.Context
48 ns string 44 ns string
49 } 45 }
50 46
51 var ( 47 var (
52 » _ = tq.Interface((*taskqueueImpl)(nil)) 48 » _ = tq.RawInterface((*taskqueueImpl)(nil))
53 _ = tq.Testable((*taskqueueImpl)(nil)) 49 _ = tq.Testable((*taskqueueImpl)(nil))
54 ) 50 )
55 51
56 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) { 52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) {
57 » toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) 53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName)
58 if err != nil { 54 if err != nil {
59 return nil, err 55 return nil, err
60 } 56 }
61 57
62 if _, ok := t.archived[queueName][toSched.Name]; ok { 58 if _, ok := t.archived[queueName][toSched.Name]; ok {
63 // SDK converts TOMBSTONE -> already added too 59 // SDK converts TOMBSTONE -> already added too
64 return nil, tq.ErrTaskAlreadyAdded 60 return nil, tq.ErrTaskAlreadyAdded
65 } else if _, ok := t.named[queueName][toSched.Name]; ok { 61 } else if _, ok := t.named[queueName][toSched.Name]; ok {
66 return nil, tq.ErrTaskAlreadyAdded 62 return nil, tq.ErrTaskAlreadyAdded
67 } else { 63 } else {
68 t.named[queueName][toSched.Name] = toSched 64 t.named[queueName][toSched.Name] = toSched
69 } 65 }
70 66
71 » return dupTask(toSched), nil 67 » return toSched.Duplicate(), nil
72 }
73
74 func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
75 » t.Lock()
76 » defer t.Unlock()
77 » return t.addLocked(task, queueName)
78 } 68 }
79 69
80 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { 70 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
81 queueName, err := t.getQueueName(queueName)
82 if err != nil {
83 return err
84 }
85
86 if _, ok := t.archived[queueName][task.Name]; ok { 71 if _, ok := t.archived[queueName][task.Name]; ok {
87 return errors.New("TOMBSTONED_TASK") 72 return errors.New("TOMBSTONED_TASK")
88 } 73 }
89 74
90 if _, ok := t.named[queueName][task.Name]; !ok { 75 if _, ok := t.named[queueName][task.Name]; !ok {
91 return errors.New("UNKNOWN_TASK") 76 return errors.New("UNKNOWN_TASK")
92 } 77 }
93 78
94 t.archived[queueName][task.Name] = t.named[queueName][task.Name] 79 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
95 delete(t.named[queueName], task.Name) 80 delete(t.named[queueName], task.Name)
96 81
97 return nil 82 return nil
98 } 83 }
99 84
100 func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { 85 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTa skCB) error {
101 » t.Lock()
102 » defer t.Unlock()
103 » return t.deleteLocked(task, queueName)
104 }
105
106 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task , error) {
107 » t.Lock()
108 » defer t.Unlock()
109 » return multi(tasks, queueName, t.addLocked)
110 }
111
112 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
113 t.Lock() 86 t.Lock()
114 defer t.Unlock() 87 defer t.Unlock()
115 88
116 » _, err := multi(tasks, queueName, 89 » queueName, err := t.getQueueNameLocked(queueName)
117 » » func(tsk *tq.Task, qn string) (*tq.Task, error) { 90 » if err != nil {
118 » » » return nil, t.deleteLocked(tsk, qn) 91 » » return err
119 » » }) 92 » }
120 » return err 93
94 » for _, task := range tasks {
95 » » cb(t.addLocked(task, queueName))
96 » }
97 » return nil
98 }
99
100 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.Ra wCB) error {
101 » t.Lock()
102 » defer t.Unlock()
103
104 » queueName, err := t.getQueueNameLocked(queueName)
105 » if err != nil {
106 » » return err
107 » }
108
109 » for _, task := range tasks {
110 » » cb(t.deleteLocked(task, queueName))
111 » }
112 » return nil
113 }
114
115 func (t *taskqueueImpl) Purge(queueName string) error {
116 » t.Lock()
117 » defer t.Unlock()
118
119 » return t.purgeLocked(queueName)
120 }
121
122 func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
123 » t.Lock()
124 » defer t.Unlock()
125
126 » for _, qn := range queueNames {
127 » » qn, err := t.getQueueNameLocked(qn)
128 » » if err != nil {
129 » » » cb(nil, err)
130 » » } else {
131 » » » s := tq.Statistics{
132 » » » » Tasks: len(t.named[qn]),
133 » » » }
134 » » » for _, t := range t.named[qn] {
135 » » » » if s.OldestETA.IsZero() {
136 » » » » » s.OldestETA = t.ETA
137 » » » » } else if t.ETA.Before(s.OldestETA) {
138 » » » » » s.OldestETA = t.ETA
139 » » » » }
140 » » » }
141 » » » cb(&s, nil)
142 » » }
143 » }
144
145 » return nil
121 } 146 }
122 147
123 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 148 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
124 149
125 type taskqueueTxnImpl struct { 150 type taskqueueTxnImpl struct {
126 tq.Interface
127 *txnTaskQueueData 151 *txnTaskQueueData
128 152
129 ctx context.Context 153 ctx context.Context
130 ns string 154 ns string
131 } 155 }
132 156
133 var _ interface { 157 var _ interface {
134 » tq.Interface 158 » tq.RawInterface
135 tq.Testable 159 tq.Testable
136 } = (*taskqueueTxnImpl)(nil) 160 } = (*taskqueueTxnImpl)(nil)
137 161
138 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { 162 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
139 » toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e) 163 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
140 if err != nil { 164 if err != nil {
141 return nil, err 165 return nil, err
142 } 166 }
143 167
144 numTasks := 0 168 numTasks := 0
145 for _, vs := range t.anony { 169 for _, vs := range t.anony {
146 numTasks += len(vs) 170 numTasks += len(vs)
147 } 171 }
148 if numTasks+1 > 5 { 172 if numTasks+1 > 5 {
149 // transactional tasks are actually implemented 'for real' as Ac tions which 173 // transactional tasks are actually implemented 'for real' as Ac tions which
150 // ride on the datastore. The current datastore implementation o nly allows 174 // ride on the datastore. The current datastore implementation o nly allows
151 // a maximum of 5 Actions per transaction, and more than that re sult in a 175 // a maximum of 5 Actions per transaction, and more than that re sult in a
152 // BAD_REQUEST. 176 // BAD_REQUEST.
153 return nil, errors.New("BAD_REQUEST") 177 return nil, errors.New("BAD_REQUEST")
154 } 178 }
155 179
156 t.anony[queueName] = append(t.anony[queueName], toSched) 180 t.anony[queueName] = append(t.anony[queueName], toSched)
157 181
158 // the fact that we have generated a unique name for this task queue ite m is 182 // the fact that we have generated a unique name for this task queue ite m is
159 // an implementation detail. 183 // an implementation detail.
160 // TODO(riannucci): now that I think about this... it may not actually b e true. 184 // TODO(riannucci): now that I think about this... it may not actually b e true.
161 // We should verify that the .Name for a task added in a tr ansaction is 185 // We should verify that the .Name for a task added in a tr ansaction is
162 // meaningless. Maybe names generated in a transaction are somehow 186 // meaningless. Maybe names generated in a transaction are somehow
163 // guaranteed to be meaningful? 187 // guaranteed to be meaningful?
164 » toRet := dupTask(toSched) 188 » toRet := toSched.Duplicate()
165 toRet.Name = "" 189 toRet.Name = ""
166 190
167 return toRet, nil 191 return toRet, nil
168 } 192 }
169 193
170 func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Tas k, err error) { 194 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra wTaskCB) error {
171 » err = t.run(func() (err error) { 195 » if atomic.LoadInt32(&t.closed) == 1 {
172 » » t.Lock() 196 » » return errors.New("taskqueue: transaction context has expired")
173 » » defer t.Unlock() 197 » }
174 » » retTask, err = t.addLocked(task, queueName) 198
175 » » return 199 » t.Lock()
176 » }) 200 » defer t.Unlock()
177 » return 201
202 » queueName, err := t.parent.getQueueNameLocked(queueName)
203 » if err != nil {
204 » » return err
205 » }
206
207 » for _, task := range tasks {
208 » » cb(t.addLocked(task, queueName))
209 » }
210 » return nil
178 } 211 }
179 212
180 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTask s []*tq.Task, err error) { 213 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
181 » err = t.run(func() (err error) { 214 » return errors.New("taskqueue: cannot DeleteMulti from a transaction")
182 » » t.Lock() 215 }
183 » » defer t.Unlock() 216
184 » » retTasks, err = multi(tasks, queueName, t.addLocked) 217 func (t *taskqueueTxnImpl) Purge(string) error {
185 » » return 218 » return errors.New("taskqueue: cannot Purge from a transaction")
186 » }) 219 }
187 » return 220
221 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
222 » return errors.New("taskqueue: cannot Stats from a transaction")
188 } 223 }
189 224
190 ////////////////////////////// private functions /////////////////////////////// 225 ////////////////////////////// private functions ///////////////////////////////
191 226
192 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
193 228
194 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
195 230
196 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { 231 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
197 _, ok := queue[cur] 232 _, ok := queue[cur]
198 for !ok && cur == "" { 233 for !ok && cur == "" {
199 name := [500]byte{} 234 name := [500]byte{}
200 for i := 0; i < 500; i++ { 235 for i := 0; i < 500; i++ {
201 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))] 236 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))]
202 } 237 }
203 cur = string(name[:]) 238 cur = string(name[:])
204 _, ok = queue[cur] 239 _, ok = queue[cur]
205 } 240 }
206 return cur 241 return cur
207 } 242 }
208 243
209 func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Tas k, error)) ([]*tq.Task, error) {
210 ret := []*tq.Task(nil)
211 lme := errors.LazyMultiError{Size: len(tasks)}
212 for i, task := range tasks {
213 rt, err := f(task, queueName)
214 ret = append(ret, rt)
215 lme.Assign(i, err)
216 }
217 return ret, lme.Get()
218 }
219
220 func dupTask(t *tq.Task) *tq.Task {
221 ret := &tq.Task{}
222 *ret = *t
223
224 if t.Header != nil {
225 ret.Header = make(http.Header, len(t.Header))
226 for k, vs := range t.Header {
227 newVs := make([]string, len(vs))
228 copy(newVs, vs)
229 ret.Header[k] = newVs
230 }
231 }
232
233 if t.Payload != nil {
234 ret.Payload = make([]byte, len(t.Payload))
235 copy(ret.Payload, t.Payload)
236 }
237
238 if t.RetryOptions != nil {
239 ret.RetryOptions = &tq.RetryOptions{}
240 *ret.RetryOptions = *t.RetryOptions
241 }
242
243 return ret
244 }
245
246 func dupQueue(q tq.QueueData) tq.QueueData { 244 func dupQueue(q tq.QueueData) tq.QueueData {
247 r := make(tq.QueueData, len(q)) 245 r := make(tq.QueueData, len(q))
248 for k, q := range q { 246 for k, q := range q {
249 r[k] = make(map[string]*tq.Task, len(q)) 247 r[k] = make(map[string]*tq.Task, len(q))
250 for tn, t := range q { 248 for tn, t := range q {
251 » » » r[k][tn] = dupTask(t) 249 » » » r[k][tn] = t.Duplicate()
252 } 250 }
253 } 251 }
254 return r 252 return r
255 } 253 }
OLDNEW
« no previous file with comments | « impl/memory/memcache_test.go ('k') | impl/memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698