OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "errors" | |
9 "fmt" | |
10 "net/http" | |
11 "sync" | |
12 "sync/atomic" | |
13 | |
14 "golang.org/x/net/context" | |
15 | |
16 "github.com/luci/gae" | |
17 "github.com/luci/luci-go/common/clock" | |
18 ) | |
19 | |
20 var ( | |
21 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac
e") | |
22 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac
e") | |
23 ) | |
24 | |
25 //////////////////////////////// taskQueueData ///////////////////////////////// | |
26 | |
27 type taskQueueData struct { | |
28 sync.Mutex | |
29 | |
30 named gae.QueueData | |
31 archived gae.QueueData | |
32 } | |
33 | |
34 var ( | |
35 _ = memContextObj((*taskQueueData)(nil)) | |
36 _ = gae.TQTestable((*taskQueueData)(nil)) | |
37 ) | |
38 | |
39 func newTaskQueueData() memContextObj { | |
40 return &taskQueueData{ | |
41 named: gae.QueueData{"default": {}}, | |
42 archived: gae.QueueData{"default": {}}, | |
43 } | |
44 } | |
45 | |
46 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } | |
47 func (t *taskQueueData) endTxn() {} | |
48 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { | |
49 txn := obj.(*txnTaskQueueData) | |
50 for qn, tasks := range txn.anony { | |
51 for _, tsk := range tasks { | |
52 tsk.Name = mkName(c, tsk.Name, t.named[qn]) | |
53 t.named[qn][tsk.Name] = tsk | |
54 } | |
55 } | |
56 txn.anony = nil | |
57 } | |
58 func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { | |
59 return &txnTaskQueueData{ | |
60 parent: t, | |
61 anony: gae.AnonymousQueueData{}, | |
62 } | |
63 } | |
64 | |
65 func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData { | |
66 return nil | |
67 } | |
68 | |
69 func (t *taskQueueData) CreateQueue(queueName string) { | |
70 t.Lock() | |
71 defer t.Unlock() | |
72 | |
73 if _, ok := t.named[queueName]; ok { | |
74 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw
ice! %q", queueName)) | |
75 } | |
76 t.named[queueName] = map[string]*gae.TQTask{} | |
77 t.archived[queueName] = map[string]*gae.TQTask{} | |
78 } | |
79 | |
80 func (t *taskQueueData) GetScheduledTasks() gae.QueueData { | |
81 t.Lock() | |
82 defer t.Unlock() | |
83 | |
84 return dupQueue(t.named) | |
85 } | |
86 | |
87 func (t *taskQueueData) GetTombstonedTasks() gae.QueueData { | |
88 t.Lock() | |
89 defer t.Unlock() | |
90 | |
91 return dupQueue(t.archived) | |
92 } | |
93 | |
94 func (t *taskQueueData) resetTasksWithLock() { | |
95 for queueName := range t.named { | |
96 t.named[queueName] = map[string]*gae.TQTask{} | |
97 t.archived[queueName] = map[string]*gae.TQTask{} | |
98 } | |
99 } | |
100 | |
101 func (t *taskQueueData) ResetTasks() { | |
102 t.Lock() | |
103 defer t.Unlock() | |
104 | |
105 t.resetTasksWithLock() | |
106 } | |
107 | |
108 func (t *taskQueueData) getQueueName(queueName string) (string, error) { | |
109 if queueName == "" { | |
110 queueName = "default" | |
111 } | |
112 if _, ok := t.named[queueName]; !ok { | |
113 return "", errors.New("UNKNOWN_QUEUE") | |
114 } | |
115 return queueName, nil | |
116 } | |
117 | |
118 var tqOkMethods = map[string]struct{}{ | |
119 "GET": {}, | |
120 "POST": {}, | |
121 "HEAD": {}, | |
122 "PUT": {}, | |
123 "DELETE": {}, | |
124 } | |
125 | |
126 func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask,
queueName string) (*gae.TQTask, string, error) { | |
127 queueName, err := t.getQueueName(queueName) | |
128 if err != nil { | |
129 return nil, "", err | |
130 } | |
131 | |
132 toSched := dupTask(task) | |
133 | |
134 if toSched.Path == "" { | |
135 toSched.Path = "/_ah/queue/" + queueName | |
136 } | |
137 | |
138 if toSched.ETA.IsZero() { | |
139 toSched.ETA = clock.Now(c).Add(toSched.Delay) | |
140 } else if toSched.Delay != 0 { | |
141 panic("taskqueue: both Delay and ETA are set") | |
142 } | |
143 toSched.Delay = 0 | |
144 | |
145 if toSched.Method == "" { | |
146 toSched.Method = "POST" | |
147 } | |
148 if _, ok := tqOkMethods[toSched.Method]; !ok { | |
149 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M
ethod) | |
150 } | |
151 if toSched.Method != "POST" && toSched.Method != "PUT" { | |
152 toSched.Payload = nil | |
153 } | |
154 | |
155 if _, ok := toSched.Header[currentNamespace]; !ok { | |
156 if ns != "" { | |
157 if toSched.Header == nil { | |
158 toSched.Header = http.Header{} | |
159 } | |
160 toSched.Header[currentNamespace] = []string{ns} | |
161 } | |
162 } | |
163 // TODO(riannucci): implement DefaultNamespace | |
164 | |
165 if toSched.Name == "" { | |
166 toSched.Name = mkName(c, "", t.named[queueName]) | |
167 } else { | |
168 if !validTaskName.MatchString(toSched.Name) { | |
169 return nil, "", errors.New("INVALID_TASK_NAME") | |
170 } | |
171 } | |
172 | |
173 return toSched, queueName, nil | |
174 } | |
175 | |
176 /////////////////////////////// txnTaskQueueData /////////////////////////////// | |
177 | |
178 type txnTaskQueueData struct { | |
179 lock sync.Mutex | |
180 | |
181 // boolean 0 or 1, use atomic.*Int32 to access. | |
182 closed int32 | |
183 anony gae.AnonymousQueueData | |
184 parent *taskQueueData | |
185 } | |
186 | |
187 var ( | |
188 _ = memContextObj((*txnTaskQueueData)(nil)) | |
189 _ = gae.TQTestable((*txnTaskQueueData)(nil)) | |
190 ) | |
191 | |
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { retu
rn false } | |
193 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { pani
c("impossible") } | |
194 func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { pani
c("impossible") } | |
195 | |
196 func (t *txnTaskQueueData) endTxn() { | |
197 if atomic.LoadInt32(&t.closed) == 1 { | |
198 panic("cannot end transaction twice") | |
199 } | |
200 atomic.StoreInt32(&t.closed, 1) | |
201 } | |
202 | |
203 func (t *txnTaskQueueData) run(f func() error) error { | |
204 // Slightly different from the SDK... datastore and taskqueue each imple
ment | |
205 // this here, where in the SDK only datastore.transaction.Call does. | |
206 if atomic.LoadInt32(&t.closed) == 1 { | |
207 return fmt.Errorf("taskqueue: transaction context has expired") | |
208 } | |
209 return f() | |
210 } | |
211 | |
212 func (t *txnTaskQueueData) ResetTasks() { | |
213 t.Lock() | |
214 defer t.Unlock() | |
215 | |
216 for queuename := range t.anony { | |
217 t.anony[queuename] = nil | |
218 } | |
219 t.parent.resetTasksWithLock() | |
220 } | |
221 | |
222 func (t *txnTaskQueueData) Lock() { | |
223 t.lock.Lock() | |
224 t.parent.Lock() | |
225 } | |
226 func (t *txnTaskQueueData) Unlock() { | |
227 t.parent.Unlock() | |
228 t.lock.Unlock() | |
229 } | |
230 | |
231 func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData { | |
232 t.Lock() | |
233 defer t.Unlock() | |
234 | |
235 ret := make(gae.AnonymousQueueData, len(t.anony)) | |
236 for k, vs := range t.anony { | |
237 ret[k] = make([]*gae.TQTask, len(vs)) | |
238 for i, v := range vs { | |
239 tsk := dupTask(v) | |
240 tsk.Name = "" | |
241 ret[k][i] = tsk | |
242 } | |
243 } | |
244 | |
245 return ret | |
246 } | |
247 | |
248 func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData { | |
249 return t.parent.GetTombstonedTasks() | |
250 } | |
251 | |
252 func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData { | |
253 return t.parent.GetScheduledTasks() | |
254 } | |
255 | |
256 func (t *txnTaskQueueData) CreateQueue(queueName string) { | |
257 t.parent.CreateQueue(queueName) | |
258 } | |
OLD | NEW |