Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: memory/taskqueue.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « memory/raw_datastore_test.go ('k') | memory/taskqueue_data.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "net/http"
10 "regexp"
11
12 "golang.org/x/net/context"
13
14 "github.com/luci/gae"
15 "github.com/luci/gae/dummy"
16 )
17
18 /////////////////////////////// public functions ///////////////////////////////
19
20 func useTQ(c context.Context) context.Context {
21 return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue {
22 tqd := cur(ic).Get(memContextTQIdx)
23 if x, ok := tqd.(*taskQueueData); ok {
24 return &taskqueueImpl{
25 dummy.TQ(),
26 x,
27 ic,
28 curGID(ic).namespace,
29 }
30 }
31 return &taskqueueTxnImpl{
32 dummy.TQ(),
33 tqd.(*txnTaskQueueData),
34 ic,
35 curGID(ic).namespace,
36 }
37 })
38 }
39
40 //////////////////////////////// taskqueueImpl /////////////////////////////////
41
42 type taskqueueImpl struct {
43 gae.TaskQueue
44 *taskQueueData
45
46 ctx context.Context
47 ns string
48 }
49
50 var (
51 _ = gae.TaskQueue((*taskqueueImpl)(nil))
52 _ = gae.TQTestable((*taskqueueImpl)(nil))
53 )
54
55 func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa sk, error) {
56 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName)
57 if err != nil {
58 return nil, err
59 }
60
61 if _, ok := t.archived[queueName][toSched.Name]; ok {
62 // SDK converts TOMBSTONE -> already added too
63 return nil, gae.ErrTQTaskAlreadyAdded
64 } else if _, ok := t.named[queueName][toSched.Name]; ok {
65 return nil, gae.ErrTQTaskAlreadyAdded
66 } else {
67 t.named[queueName][toSched.Name] = toSched
68 }
69
70 return dupTask(toSched), nil
71 }
72
73 func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, er ror) {
74 t.Lock()
75 defer t.Unlock()
76 return t.addLocked(task, queueName)
77 }
78
79 func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error {
80 queueName, err := t.getQueueName(queueName)
81 if err != nil {
82 return err
83 }
84
85 if _, ok := t.archived[queueName][task.Name]; ok {
86 return errors.New("TOMBSTONED_TASK")
87 }
88
89 if _, ok := t.named[queueName][task.Name]; !ok {
90 return errors.New("UNKNOWN_TASK")
91 }
92
93 t.archived[queueName][task.Name] = t.named[queueName][task.Name]
94 delete(t.named[queueName], task.Name)
95
96 return nil
97 }
98
99 func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error {
100 t.Lock()
101 defer t.Unlock()
102 return t.deleteLocked(task, queueName)
103 }
104
105 func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae. TQTask, error) {
106 t.Lock()
107 defer t.Unlock()
108 return multi(tasks, queueName, t.addLocked)
109 }
110
111 func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error {
112 t.Lock()
113 defer t.Unlock()
114
115 _, err := multi(tasks, queueName,
116 func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) {
117 return nil, t.deleteLocked(tsk, qn)
118 })
119 return err
120 }
121
122 /////////////////////////////// taskqueueTxnImpl ///////////////////////////////
123
124 type taskqueueTxnImpl struct {
125 gae.TaskQueue
126 *txnTaskQueueData
127
128 ctx context.Context
129 ns string
130 }
131
132 var _ interface {
133 gae.TaskQueue
134 gae.TQTestable
135 } = (*taskqueueTxnImpl)(nil)
136
137 func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T QTask, error) {
138 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e)
139 if err != nil {
140 return nil, err
141 }
142
143 numTasks := 0
144 for _, vs := range t.anony {
145 numTasks += len(vs)
146 }
147 if numTasks+1 > 5 {
148 // transactional tasks are actually implemented 'for real' as Ac tions which
149 // ride on the datastore. The current datastore implementation o nly allows
150 // a maximum of 5 Actions per transaction, and more than that re sult in a
151 // BAD_REQUEST.
152 return nil, errors.New("BAD_REQUEST")
153 }
154
155 t.anony[queueName] = append(t.anony[queueName], toSched)
156
157 // the fact that we have generated a unique name for this task queue ite m is
158 // an implementation detail.
159 // TODO(riannucci): now that I think about this... it may not actually b e true.
160 // We should verify that the .Name for a task added in a tr ansaction is
161 // meaningless. Maybe names generated in a transaction are somehow
162 // guaranteed to be meaningful?
163 toRet := dupTask(toSched)
164 toRet.Name = ""
165
166 return toRet, nil
167 }
168
169 func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae .TQTask, err error) {
170 err = t.run(func() (err error) {
171 t.Lock()
172 defer t.Unlock()
173 retTask, err = t.addLocked(task, queueName)
174 return
175 })
176 return
177 }
178
179 func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retT asks []*gae.TQTask, err error) {
180 err = t.run(func() (err error) {
181 t.Lock()
182 defer t.Unlock()
183 retTasks, err = multi(tasks, queueName, t.addLocked)
184 return
185 })
186 return
187 }
188
189 ////////////////////////////// private functions ///////////////////////////////
190
191 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$")
192
193 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_"
194
195 func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string {
196 _, ok := queue[cur]
197 for !ok && cur == "" {
198 name := [500]byte{}
199 for i := 0; i < 500; i++ {
200 name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(val idTaskChars))]
201 }
202 cur = string(name[:])
203 _, ok = queue[cur]
204 }
205 return cur
206 }
207
208 func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (* gae.TQTask, error)) ([]*gae.TQTask, error) {
209 ret := []*gae.TQTask(nil)
210 lme := gae.LazyMultiError{Size: len(tasks)}
211 for i, task := range tasks {
212 rt, err := f(task, queueName)
213 ret = append(ret, rt)
214 lme.Assign(i, err)
215 }
216 return ret, lme.Get()
217 }
218
219 func dupTask(t *gae.TQTask) *gae.TQTask {
220 ret := &gae.TQTask{}
221 *ret = *t
222
223 if t.Header != nil {
224 ret.Header = make(http.Header, len(t.Header))
225 for k, vs := range t.Header {
226 newVs := make([]string, len(vs))
227 copy(newVs, vs)
228 ret.Header[k] = newVs
229 }
230 }
231
232 if t.Payload != nil {
233 ret.Payload = make([]byte, len(t.Payload))
234 copy(ret.Payload, t.Payload)
235 }
236
237 if t.RetryOptions != nil {
238 ret.RetryOptions = &gae.TQRetryOptions{}
239 *ret.RetryOptions = *t.RetryOptions
240 }
241
242 return ret
243 }
244
245 func dupQueue(q gae.QueueData) gae.QueueData {
246 r := make(gae.QueueData, len(q))
247 for k, q := range q {
248 r[k] = make(map[string]*gae.TQTask, len(q))
249 for tn, t := range q {
250 r[k][tn] = dupTask(t)
251 }
252 }
253 return r
254 }
OLDNEW
« no previous file with comments | « memory/raw_datastore_test.go ('k') | memory/taskqueue_data.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698