OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "fmt" | |
10 "math/rand" | |
11 "net/http" | |
12 "sync" | |
13 "sync/atomic" | |
14 "time" | |
15 | |
16 "appengine/datastore" | |
17 "appengine/taskqueue" | |
18 pb "appengine_internal/taskqueue" | |
19 | |
20 "infra/gae/libs/wrapper" | |
21 ) | |
22 | |
23 var ( | |
24 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e") | |
25 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e") | |
26 ) | |
27 | |
28 //////////////////////////////// taskQueueData ///////////////////////////////// | |
29 | |
30 type taskQueueData struct { | |
31 sync.Mutex | |
32 *wrapper.BrokenFeatures | |
33 | |
34 named wrapper.QueueData | |
35 archived wrapper.QueueData | |
36 } | |
37 | |
38 ////////////////////////////// New(taskQueueData) ////////////////////////////// | |
39 | |
40 func newTaskQueueData() memContextObj { | |
41 return &taskQueueData{ | |
42 BrokenFeatures: wrapper.NewBrokenFeatures( | |
43 newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)), | |
44 named: wrapper.QueueData{"default": {}}, | |
45 archived: wrapper.QueueData{"default": {}}, | |
46 } | |
47 } | |
48 | |
49 ///////////////////////// memContextObj(taskQueueData) ///////////////////////// | |
50 | |
51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } | |
52 func (t *taskQueueData) endTxn() {} | |
53 func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) { | |
54 txn := obj.(*txnTaskQueueData) | |
55 for qn, tasks := range txn.anony { | |
56 for _, tsk := range tasks { | |
57 tsk.Name = mkName(rnd, tsk.Name, t.named[qn]) | |
58 t.named[qn][tsk.Name] = tsk | |
59 } | |
60 } | |
61 txn.anony = nil | |
62 } | |
63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) { | |
64 return &txnTaskQueueData{ | |
65 BrokenFeatures: t.BrokenFeatures, | |
66 parent: t, | |
67 anony: wrapper.AnonymousQueueData{}, | |
68 }, nil | |
69 } | |
70 | |
71 ////////////////////// wrapper.TQTestable(taskQueueData) /////////////////////// | |
72 | |
73 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { | |
74 return nil | |
75 } | |
76 | |
77 func (t *taskQueueData) AddQueue(queueName string) { | |
78 t.Lock() | |
79 defer t.Unlock() | |
80 | |
81 if _, ok := t.named[queueName]; ok { | |
82 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName)) | |
83 } | |
84 t.named[queueName] = map[string]*taskqueue.Task{} | |
85 t.archived[queueName] = map[string]*taskqueue.Task{} | |
86 } | |
87 | |
88 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData { | |
89 t.Lock() | |
90 defer t.Unlock() | |
91 | |
92 return dupQueue(t.named) | |
93 } | |
94 | |
95 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData { | |
96 t.Lock() | |
97 defer t.Unlock() | |
98 | |
99 return dupQueue(t.archived) | |
100 } | |
101 | |
102 func (t *taskQueueData) resetTasksWithLock() { | |
103 for queuename := range t.named { | |
104 t.named[queuename] = map[string]*taskqueue.Task{} | |
105 t.archived[queuename] = map[string]*taskqueue.Task{} | |
106 } | |
107 } | |
108 | |
109 func (t *taskQueueData) ResetTasks() { | |
110 t.Lock() | |
111 defer t.Unlock() | |
112 | |
113 t.resetTasksWithLock() | |
114 } | |
115 | |
116 /////////////////////////// taskQueueData (private) //////////////////////////// | |
117 | |
118 func (t *taskQueueData) getQueueName(queueName string) (string, error) { | |
119 if queueName == "" { | |
120 queueName = "default" | |
121 } | |
122 if _, ok := t.named[queueName]; !ok { | |
123 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE) | |
124 } | |
125 return queueName, nil | |
126 } | |
127 | |
128 func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri ng, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) { | |
129 queueName, err := t.getQueueName(queueName) | |
130 if err != nil { | |
131 return nil, "", err | |
132 } | |
133 | |
134 toSched := dupTask(task) | |
135 | |
136 if toSched.Path == "" { | |
137 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL) | |
138 } | |
139 | |
140 if toSched.ETA.IsZero() { | |
141 toSched.ETA = now.Add(toSched.Delay) | |
142 } else if toSched.Delay != 0 { | |
143 panic("taskqueue: both Delay and ETA are set") | |
144 } | |
145 toSched.Delay = 0 | |
146 | |
147 if toSched.Method == "" { | |
148 toSched.Method = "POST" | |
149 } | |
150 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok { | |
151 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod) | |
152 } | |
153 if toSched.Method != "POST" && toSched.Method != "PUT" { | |
154 toSched.Payload = nil | |
155 } | |
156 | |
157 if _, ok := toSched.Header[currentNamespace]; !ok { | |
158 if ns != "" { | |
159 if toSched.Header == nil { | |
160 toSched.Header = http.Header{} | |
161 } | |
162 toSched.Header[currentNamespace] = []string{ns} | |
163 } | |
164 } | |
165 // TODO(riannucci): implement DefaultNamespace | |
166 | |
167 if toSched.Name == "" { | |
168 toSched.Name = mkName(rnd, "", t.named[queueName]) | |
169 } else { | |
170 if !validTaskName.MatchString(toSched.Name) { | |
171 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME) | |
172 } | |
173 } | |
174 | |
175 return toSched, queueName, nil | |
176 } | |
177 | |
178 /////////////////////////////// txnTaskQueueData /////////////////////////////// | |
179 | |
180 type txnTaskQueueData struct { | |
181 *wrapper.BrokenFeatures | |
182 | |
183 lock sync.Mutex | |
184 | |
185 // boolean 0 or 1, use atomic.*Int32 to access. | |
186 closed int32 | |
187 anony wrapper.AnonymousQueueData | |
188 parent *taskQueueData | |
189 } | |
190 | |
191 /////////////////////// memContextObj(txnTaskQueueData) //////////////////////// | |
192 | |
193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } | |
194 | |
195 func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) { | |
196 panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) | |
197 } | |
198 | |
199 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { | |
200 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") | |
201 } | |
202 | |
203 func (t *txnTaskQueueData) endTxn() { | |
204 if atomic.LoadInt32(&t.closed) == 1 { | |
205 panic("cannot end transaction twice") | |
206 } | |
207 atomic.StoreInt32(&t.closed, 1) | |
208 } | |
209 | |
210 /////////////////// wrapper.BrokenFeatures(txnTaskQueueData) /////////////////// | |
211 | |
212 func (t *txnTaskQueueData) IsBroken() error { | |
213 // Slightly different from the SDK... datastore and taskqueue each imple ment | |
214 // this here, where in the SDK only datastore.transaction.Call does. | |
215 if atomic.LoadInt32(&t.closed) == 1 { | |
216 return fmt.Errorf("taskqueue: transaction context has expired") | |
217 } | |
218 return t.parent.IsBroken() | |
219 } | |
220 | |
221 ///////////////////// wrapper.TQTestable(txnTaskQueueData) ///////////////////// | |
222 | |
223 func (t *txnTaskQueueData) ResetTasks() { | |
224 t.Lock() | |
225 defer t.Unlock() | |
226 | |
227 for queuename := range t.anony { | |
228 t.anony[queuename] = []*taskqueue.Task{} | |
M-A Ruel
2015/05/25 18:21:10
you could set it to nil and it'd be fine
iannucci
2015/05/27 19:33:32
done
| |
229 } | |
230 t.parent.resetTasksWithLock() | |
231 } | |
232 | |
233 func (t *txnTaskQueueData) Lock() { | |
234 t.lock.Lock() | |
235 t.parent.Lock() | |
236 } | |
237 func (t *txnTaskQueueData) Unlock() { | |
238 t.parent.Unlock() | |
239 t.lock.Unlock() | |
240 } | |
241 | |
242 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData { | |
243 t.Lock() | |
244 defer t.Unlock() | |
245 | |
246 ret := make(wrapper.AnonymousQueueData, len(t.anony)) | |
247 for k, vs := range t.anony { | |
248 ret[k] = make([]*taskqueue.Task, len(vs)) | |
249 for i, v := range vs { | |
250 tsk := dupTask(v) | |
251 tsk.Name = "" | |
252 ret[k][i] = tsk | |
253 } | |
254 } | |
255 | |
256 return ret | |
257 } | |
258 | |
259 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData { | |
260 return t.parent.GetTombstonedTasks() | |
261 } | |
262 | |
263 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData { | |
264 return t.parent.GetScheduledTasks() | |
265 } | |
266 | |
267 func (t *txnTaskQueueData) AddQueue(queueName string) { | |
268 t.parent.AddQueue(queueName) | |
269 } | |
OLD | NEW |