Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(154)

Side by Side Diff: impl/memory/taskqueue.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "net/http"
9 "regexp" 8 "regexp"
9 "sync/atomic"
10 10
11 "golang.org/x/net/context" 11 "golang.org/x/net/context"
12 12
13 "github.com/luci/gae/impl/dummy"
14 tq "github.com/luci/gae/service/taskqueue" 13 tq "github.com/luci/gae/service/taskqueue"
15 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/mathrand" 15 "github.com/luci/luci-go/common/mathrand"
17 ) 16 )
18 17
19 /////////////////////////////// public functions /////////////////////////////// 18 /////////////////////////////// public functions ///////////////////////////////
20 19
21 func useTQ(c context.Context) context.Context { 20 func useTQ(c context.Context) context.Context {
22 » return tq.SetFactory(c, func(ic context.Context) tq.Interface { 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface {
23 tqd := cur(ic).Get(memContextTQIdx) 22 tqd := cur(ic).Get(memContextTQIdx)
24 if x, ok := tqd.(*taskQueueData); ok { 23 if x, ok := tqd.(*taskQueueData); ok {
25 return &taskqueueImpl{ 24 return &taskqueueImpl{
26 dummy.TaskQueue(),
27 x, 25 x,
28 ic, 26 ic,
29 curGID(ic).namespace, 27 curGID(ic).namespace,
30 } 28 }
31 } 29 }
32 return &taskqueueTxnImpl{ 30 return &taskqueueTxnImpl{
33 dummy.TaskQueue(),
34 tqd.(*txnTaskQueueData), 31 tqd.(*txnTaskQueueData),
35 ic, 32 ic,
36 curGID(ic).namespace, 33 curGID(ic).namespace,
37 } 34 }
38 }) 35 })
39 } 36 }
40 37
41 //////////////////////////////// taskqueueImpl ///////////////////////////////// 38 //////////////////////////////// taskqueueImpl /////////////////////////////////
42 39
43 type taskqueueImpl struct { 40 type taskqueueImpl struct {
44 tq.Interface
45 *taskQueueData 41 *taskQueueData
46 42
47 ctx context.Context 43 ctx context.Context
48 ns string 44 ns string
49 } 45 }
50 46
51 var ( 47 var (
52 » _ = tq.Interface((*taskqueueImpl)(nil)) 48 » _ = tq.RawInterface((*taskqueueImpl)(nil))
53 _ = tq.Testable((*taskqueueImpl)(nil)) 49 _ = tq.Testable((*taskqueueImpl)(nil))
54 ) 50 )
55 51
56 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) { 52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) {
57 » toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) 53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName)
58 if err != nil { 54 if err != nil {
59 return nil, err 55 return nil, err
60 } 56 }
61 57
62 if _, ok := t.archived[queueName][toSched.Name]; ok { 58 if _, ok := t.archived[queueName][toSched.Name]; ok {
63 // SDK converts TOMBSTONE -> already added too 59 // SDK converts TOMBSTONE -> already added too
64 return nil, tq.ErrTaskAlreadyAdded 60 return nil, tq.ErrTaskAlreadyAdded
65 } else if _, ok := t.named[queueName][toSched.Name]; ok { 61 } else if _, ok := t.named[queueName][toSched.Name]; ok {
66 return nil, tq.ErrTaskAlreadyAdded 62 return nil, tq.ErrTaskAlreadyAdded
67 } else { 63 } else {
68 t.named[queueName][toSched.Name] = toSched 64 t.named[queueName][toSched.Name] = toSched
69 } 65 }
70 66
71 » return dupTask(toSched), nil 67 » return toSched.Duplicate(), nil
72 }
73
74 func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) {
75 » t.Lock()
76 » defer t.Unlock()
77 » return t.addLocked(task, queueName)
78 } 68 }
79 69
80 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { 70 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error {
81 queueName, err := t.getQueueName(queueName)
82 if err != nil {
83 return err
84 }
85
86 if _, ok := t.archived[queueName][task.Name]; ok { 71 if _, ok := t.archived[queueName][task.Name]; ok {
87 return errors.New("TOMBSTONED_TASK") 72 return errors.New("TOMBSTONED_TASK")
88 } 73 }
89 74
90 if _, ok := t.named[queueName][task.Name]; !ok { 75 if _, ok := t.named[queueName][task.Name]; !ok {
91 return errors.New("UNKNOWN_TASK") 76 return errors.New("UNKNOWN_TASK")
92 } 77 }
93 78
94 t.archived[queueName][task.Name] = t.named[queueName][task.Name] 79 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
95 delete(t.named[queueName], task.Name) 80 delete(t.named[queueName], task.Name)
96 81
97 return nil 82 return nil
98 } 83 }
99 84
100 func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { 85 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTa skCB) error {
101 » t.Lock()
102 » defer t.Unlock()
103 » return t.deleteLocked(task, queueName)
104 }
105
106 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task , error) {
107 » t.Lock()
108 » defer t.Unlock()
109 » return multi(tasks, queueName, t.addLocked)
110 }
111
112 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error {
113 t.Lock() 86 t.Lock()
114 defer t.Unlock() 87 defer t.Unlock()
115 88
116 » _, err := multi(tasks, queueName, 89 » queueName, err := t.getQueueName(queueName)
117 » » func(tsk *tq.Task, qn string) (*tq.Task, error) { 90 » if err != nil {
118 » » » return nil, t.deleteLocked(tsk, qn) 91 » » return err
119 » » }) 92 » }
120 » return err 93
94 » for _, task := range tasks {
95 » » cb(t.addLocked(task, queueName))
96 » }
97 » return nil
98 }
99
100 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.Ra wCB) error {
101 » t.Lock()
102 » defer t.Unlock()
103
104 » queueName, err := t.getQueueName(queueName)
105 » if err != nil {
106 » » return err
107 » }
108
109 » for _, task := range tasks {
110 » » cb(t.deleteLocked(task, queueName))
dnj 2015/08/03 22:37:25 Consider calling "deleteLocked" in the loop, aggre
iannucci 2015/08/04 01:21:21 yeah mumble. I ended up doing a thing for all of t
111 » }
112 » return nil
113 }
114
115 func (t *taskqueueImpl) Purge(queueName string) error {
116 » t.Lock()
117 » defer t.Unlock()
118
119 » queueName, err := t.getQueueName(queueName)
120 » if err != nil {
121 » » return err
122 » }
123
124 » t.named[queueName] = map[string]*tq.Task{}
dnj 2015/08/03 22:37:25 Consider making this a "taskQueueData" method: res
iannucci 2015/08/04 01:21:21 Done.
125 » t.archived[queueName] = map[string]*tq.Task{}
126 » return nil
127 }
128
129 func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error {
130 » t.Lock()
131 » defer t.Unlock()
132
133 » for _, qn := range queueNames {
134 » » qn, err := t.getQueueName(qn)
135 » » if err != nil {
136 » » » cb(nil, err)
137 » » } else {
138 » » » s := tq.Statistics{
139 » » » » Tasks: len(t.named[qn]),
140 » » » }
141 » » » for _, t := range t.named[qn] {
142 » » » » if s.OldestETA.IsZero() {
143 » » » » » s.OldestETA = t.ETA
144 » » » » } else if t.ETA.Before(s.OldestETA) {
145 » » » » » s.OldestETA = t.ETA
146 » » » » }
147 » » » }
148 » » » cb(&s, nil)
149 » » }
150 » }
151
152 » return nil
121 } 153 }
122 154
123 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// 155 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
124 156
125 type taskqueueTxnImpl struct { 157 type taskqueueTxnImpl struct {
126 tq.Interface
127 *txnTaskQueueData 158 *txnTaskQueueData
128 159
129 ctx context.Context 160 ctx context.Context
130 ns string 161 ns string
131 } 162 }
132 163
133 var _ interface { 164 var _ interface {
134 » tq.Interface 165 » tq.RawInterface
135 tq.Testable 166 tq.Testable
136 } = (*taskqueueTxnImpl)(nil) 167 } = (*taskqueueTxnImpl)(nil)
137 168
138 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { 169 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) {
139 » toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e) 170 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName)
140 if err != nil { 171 if err != nil {
141 return nil, err 172 return nil, err
142 } 173 }
143 174
144 numTasks := 0 175 numTasks := 0
145 for _, vs := range t.anony { 176 for _, vs := range t.anony {
146 numTasks += len(vs) 177 numTasks += len(vs)
147 } 178 }
148 if numTasks+1 > 5 { 179 if numTasks+1 > 5 {
149 // transactional tasks are actually implemented 'for real' as Ac tions which 180 // transactional tasks are actually implemented 'for real' as Ac tions which
150 // ride on the datastore. The current datastore implementation o nly allows 181 // ride on the datastore. The current datastore implementation o nly allows
151 // a maximum of 5 Actions per transaction, and more than that re sult in a 182 // a maximum of 5 Actions per transaction, and more than that re sult in a
152 // BAD_REQUEST. 183 // BAD_REQUEST.
153 return nil, errors.New("BAD_REQUEST") 184 return nil, errors.New("BAD_REQUEST")
154 } 185 }
155 186
156 t.anony[queueName] = append(t.anony[queueName], toSched) 187 t.anony[queueName] = append(t.anony[queueName], toSched)
157 188
158 // the fact that we have generated a unique name for this task queue ite m is 189 // the fact that we have generated a unique name for this task queue ite m is
159 // an implementation detail. 190 // an implementation detail.
160 // TODO(riannucci): now that I think about this... it may not actually b e true. 191 // TODO(riannucci): now that I think about this... it may not actually b e true.
161 // We should verify that the .Name for a task added in a tr ansaction is 192 // We should verify that the .Name for a task added in a tr ansaction is
162 // meaningless. Maybe names generated in a transaction are somehow 193 // meaningless. Maybe names generated in a transaction are somehow
163 // guaranteed to be meaningful? 194 // guaranteed to be meaningful?
164 » toRet := dupTask(toSched) 195 » toRet := toSched.Duplicate()
165 toRet.Name = "" 196 toRet.Name = ""
166 197
167 return toRet, nil 198 return toRet, nil
168 } 199 }
169 200
170 func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Tas k, err error) { 201 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra wTaskCB) error {
171 » err = t.run(func() (err error) { 202 » if atomic.LoadInt32(&t.closed) == 1 {
172 » » t.Lock() 203 » » return errors.New("taskqueue: transaction context has expired")
173 » » defer t.Unlock() 204 » }
174 » » retTask, err = t.addLocked(task, queueName) 205
175 » » return 206 » t.Lock()
176 » }) 207 » defer t.Unlock()
177 » return 208
209 » queueName, err := t.parent.getQueueName(queueName)
210 » if err != nil {
211 » » return err
212 » }
213
214 » for _, task := range tasks {
215 » » cb(t.addLocked(task, queueName))
216 » }
217 » return nil
178 } 218 }
179 219
180 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTask s []*tq.Task, err error) { 220 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error {
181 » err = t.run(func() (err error) { 221 » return errors.New("taskqueue: cannot DeleteMulti from a transaction")
182 » » t.Lock() 222 }
183 » » defer t.Unlock() 223
184 » » retTasks, err = multi(tasks, queueName, t.addLocked) 224 func (t *taskqueueTxnImpl) Purge(string) error {
185 » » return 225 » return errors.New("taskqueue: cannot Purge from a transaction")
186 » }) 226 }
187 » return 227
228 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error {
229 » return errors.New("taskqueue: cannot Stats from a transaction")
188 } 230 }
189 231
190 ////////////////////////////// private functions /////////////////////////////// 232 ////////////////////////////// private functions ///////////////////////////////
191 233
192 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") 234 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
193 235
194 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" 236 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
195 237
196 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { 238 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string {
197 _, ok := queue[cur] 239 _, ok := queue[cur]
198 for !ok && cur == "" { 240 for !ok && cur == "" {
199 name := [500]byte{} 241 name := [500]byte{}
200 for i := 0; i < 500; i++ { 242 for i := 0; i < 500; i++ {
201 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))] 243 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))]
202 } 244 }
203 cur = string(name[:]) 245 cur = string(name[:])
204 _, ok = queue[cur] 246 _, ok = queue[cur]
205 } 247 }
206 return cur 248 return cur
207 } 249 }
208 250
209 func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Tas k, error)) ([]*tq.Task, error) {
210 ret := []*tq.Task(nil)
211 lme := errors.LazyMultiError{Size: len(tasks)}
212 for i, task := range tasks {
213 rt, err := f(task, queueName)
214 ret = append(ret, rt)
215 lme.Assign(i, err)
216 }
217 return ret, lme.Get()
218 }
219
220 func dupTask(t *tq.Task) *tq.Task {
221 ret := &tq.Task{}
222 *ret = *t
223
224 if t.Header != nil {
225 ret.Header = make(http.Header, len(t.Header))
226 for k, vs := range t.Header {
227 newVs := make([]string, len(vs))
228 copy(newVs, vs)
229 ret.Header[k] = newVs
230 }
231 }
232
233 if t.Payload != nil {
234 ret.Payload = make([]byte, len(t.Payload))
235 copy(ret.Payload, t.Payload)
236 }
237
238 if t.RetryOptions != nil {
239 ret.RetryOptions = &tq.RetryOptions{}
240 *ret.RetryOptions = *t.RetryOptions
241 }
242
243 return ret
244 }
245
246 func dupQueue(q tq.QueueData) tq.QueueData { 251 func dupQueue(q tq.QueueData) tq.QueueData {
247 r := make(tq.QueueData, len(q)) 252 r := make(tq.QueueData, len(q))
248 for k, q := range q { 253 for k, q := range q {
249 r[k] = make(map[string]*tq.Task, len(q)) 254 r[k] = make(map[string]*tq.Task, len(q))
250 for tn, t := range q { 255 for tn, t := range q {
251 » » » r[k][tn] = dupTask(t) 256 » » » r[k][tn] = t.Duplicate()
252 } 257 }
253 } 258 }
254 return r 259 return r
255 } 260 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698