OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "fmt" | |
10 "infra/gae/libs/wrapper" | |
11 "math/rand" | |
12 "net/http" | |
13 "sync" | |
14 "sync/atomic" | |
15 "time" | |
16 | |
17 "appengine/datastore" | |
18 "appengine/taskqueue" | |
19 pb "appengine_internal/taskqueue" | |
20 ) | |
21 | |
22 var ( | |
23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e") | |
24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e") | |
25 ) | |
26 | |
27 //////////////////////////////// taskQueueData ///////////////////////////////// | |
28 | |
29 type taskQueueData struct { | |
30 sync.Mutex | |
31 *wrapper.BrokenFeatures | |
M-A Ruel
2015/05/27 20:14:48
Doesn't need to be a pointer.
iannucci
2015/05/27 21:36:22
done
| |
32 | |
33 named wrapper.QueueData | |
34 archived wrapper.QueueData | |
35 } | |
36 | |
37 ////////////////////////////// New(taskQueueData) ////////////////////////////// | |
38 | |
39 func newTaskQueueData() memContextObj { | |
40 return &taskQueueData{ | |
41 BrokenFeatures: &wrapper.BrokenFeatures{ | |
42 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)}, | |
43 named: wrapper.QueueData{"default": {}}, | |
44 archived: wrapper.QueueData{"default": {}}, | |
45 } | |
46 } | |
47 | |
48 ///////////////////////// memContextObj(taskQueueData) ///////////////////////// | |
49 | |
50 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } | |
51 func (t *taskQueueData) endTxn() {} | |
52 func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) { | |
53 txn := obj.(*txnTaskQueueData) | |
54 for qn, tasks := range txn.anony { | |
55 for _, tsk := range tasks { | |
56 tsk.Name = mkName(rnd, tsk.Name, t.named[qn]) | |
57 t.named[qn][tsk.Name] = tsk | |
58 } | |
59 } | |
60 txn.anony = nil | |
61 } | |
62 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) { | |
63 return &txnTaskQueueData{ | |
64 BrokenFeatures: t.BrokenFeatures, | |
65 parent: t, | |
66 anony: wrapper.AnonymousQueueData{}, | |
67 }, nil | |
68 } | |
69 | |
70 ////////////////////// wrapper.TQTestable(taskQueueData) /////////////////////// | |
71 | |
72 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { | |
73 return nil | |
74 } | |
75 | |
76 func (t *taskQueueData) CreateQueue(queueName string) { | |
77 t.Lock() | |
78 defer t.Unlock() | |
79 | |
80 if _, ok := t.named[queueName]; ok { | |
81 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName)) | |
82 } | |
83 t.named[queueName] = map[string]*taskqueue.Task{} | |
84 t.archived[queueName] = map[string]*taskqueue.Task{} | |
85 } | |
86 | |
87 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData { | |
88 t.Lock() | |
89 defer t.Unlock() | |
90 | |
91 return dupQueue(t.named) | |
92 } | |
93 | |
94 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { | |
95 t.Lock() | |
96 defer t.Unlock() | |
97 | |
98 return dupQueue(t.archived) | |
99 } | |
100 | |
101 func (t *taskQueueData) resetTasksWithLock() { | |
102 for queuename := range t.named { | |
103 t.named[queuename] = map[string]*taskqueue.Task{} | |
104 t.archived[queuename] = map[string]*taskqueue.Task{} | |
105 } | |
106 } | |
107 | |
108 func (t *taskQueueData) ResetTasks() { | |
109 t.Lock() | |
110 defer t.Unlock() | |
111 | |
112 t.resetTasksWithLock() | |
113 } | |
114 | |
115 /////////////////////////// taskQueueData (private) //////////////////////////// | |
116 | |
117 func (t *taskQueueData) getQueueName(queueName string) (string, error) { | |
118 if queueName == "" { | |
119 queueName = "default" | |
120 } | |
121 if _, ok := t.named[queueName]; !ok { | |
122 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) | |
123 } | |
124 return queueName, nil | |
125 } | |
126 | |
127 func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri ng, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) { | |
128 queueName, err := t.getQueueName(queueName) | |
129 if err != nil { | |
130 return nil, "", err | |
131 } | |
132 | |
133 toSched := dupTask(task) | |
134 | |
135 if toSched.Path == "" { | |
136 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) | |
137 } | |
138 | |
139 if toSched.ETA.IsZero() { | |
140 toSched.ETA = now.Add(toSched.Delay) | |
141 } else if toSched.Delay != 0 { | |
142 panic("taskqueue: both Delay and ETA are set") | |
143 } | |
144 toSched.Delay = 0 | |
145 | |
146 if toSched.Method == "" { | |
147 toSched.Method = "POST" | |
148 } | |
149 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok { | |
150 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod) | |
151 } | |
152 if toSched.Method != "POST" && toSched.Method != "PUT" { | |
153 toSched.Payload = nil | |
154 } | |
155 | |
156 if _, ok := toSched.Header[currentNamespace]; !ok { | |
157 if ns != "" { | |
158 if toSched.Header == nil { | |
159 toSched.Header = http.Header{} | |
160 } | |
161 toSched.Header[currentNamespace] = []string{ns} | |
162 } | |
163 } | |
164 // TODO(riannucci): implement DefaultNamespace | |
165 | |
166 if toSched.Name == "" { | |
167 toSched.Name = mkName(rnd, "", t.named[queueName]) | |
168 } else { | |
169 if !validTaskName.MatchString(toSched.Name) { | |
170 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME) | |
171 } | |
172 } | |
173 | |
174 return toSched, queueName, nil | |
175 } | |
176 | |
177 /////////////////////////////// txnTaskQueueData /////////////////////////////// | |
178 | |
179 type txnTaskQueueData struct { | |
180 *wrapper.BrokenFeatures | |
181 | |
182 lock sync.Mutex | |
183 | |
184 // boolean 0 or 1, use atomic.*Int32 to access. | |
185 closed int32 | |
186 anony wrapper.AnonymousQueueData | |
187 parent *taskQueueData | |
188 } | |
189 | |
190 /////////////////////// memContextObj(txnTaskQueueData) //////////////////////// | |
191 | |
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } | |
193 | |
194 func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) { | |
195 panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) | |
196 } | |
197 | |
198 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { | |
199 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") | |
200 } | |
201 | |
202 func (t *txnTaskQueueData) endTxn() { | |
203 if atomic.LoadInt32(&t.closed) == 1 { | |
204 panic("cannot end transaction twice") | |
205 } | |
206 atomic.StoreInt32(&t.closed, 1) | |
207 } | |
208 | |
209 /////////////////// wrapper.BrokenFeatures(txnTaskQueueData) /////////////////// | |
210 | |
211 func (t *txnTaskQueueData) IsBroken() error { | |
212 // Slightly different from the SDK... datastore and taskqueue each imple ment | |
213 // this here, where in the SDK only datastore.transaction.Call does. | |
214 if atomic.LoadInt32(&t.closed) == 1 { | |
215 return fmt.Errorf("taskqueue: transaction context has expired") | |
216 } | |
217 return t.parent.IsBroken() | |
218 } | |
219 | |
220 ///////////////////// wrapper.TQTestable(txnTaskQueueData) ///////////////////// | |
221 | |
222 func (t *txnTaskQueueData) ResetTasks() { | |
223 t.Lock() | |
224 defer t.Unlock() | |
225 | |
226 for queuename := range t.anony { | |
227 t.anony[queuename] = nil | |
228 } | |
229 t.parent.resetTasksWithLock() | |
230 } | |
231 | |
232 func (t *txnTaskQueueData) Lock() { | |
233 t.lock.Lock() | |
234 t.parent.Lock() | |
235 } | |
236 func (t *txnTaskQueueData) Unlock() { | |
237 t.parent.Unlock() | |
238 t.lock.Unlock() | |
239 } | |
240 | |
241 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { | |
242 t.Lock() | |
243 defer t.Unlock() | |
244 | |
245 ret := make(wrapper.AnonymousQueueData, len(t.anony)) | |
246 for k, vs := range t.anony { | |
247 ret[k] = make([]*taskqueue.Task, len(vs)) | |
248 for i, v := range vs { | |
249 tsk := dupTask(v) | |
250 tsk.Name = "" | |
251 ret[k][i] = tsk | |
252 } | |
253 } | |
254 | |
255 return ret | |
256 } | |
257 | |
258 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData { | |
259 return t.parent.GetTombstonedTasks() | |
260 } | |
261 | |
262 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { | |
263 return t.parent.GetScheduledTasks() | |
264 } | |
265 | |
266 func (t *txnTaskQueueData) CreateQueue(queueName string) { | |
267 t.parent.CreateQueue(queueName) | |
268 } | |
OLD | NEW |