Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(266)

Side by Side Diff: impl/memory/taskqueue_data.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
117 } 117 }
118 118
119 var tqOkMethods = map[string]struct{}{ 119 var tqOkMethods = map[string]struct{}{
120 "GET": {}, 120 "GET": {},
121 "POST": {}, 121 "POST": {},
122 "HEAD": {}, 122 "HEAD": {},
123 "PUT": {}, 123 "PUT": {},
124 "DELETE": {}, 124 "DELETE": {},
125 } 125 }
126 126
127 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu eueName string) (*tq.Task, string, error) { 127 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu eueName string) (*tq.Task, error) {
128 » queueName, err := t.getQueueName(queueName) 128 » toSched := task.Duplicate()
129 » if err != nil {
130 » » return nil, "", err
131 » }
132
133 » toSched := dupTask(task)
134 129
135 if toSched.Path == "" { 130 if toSched.Path == "" {
136 toSched.Path = "/_ah/queue/" + queueName 131 toSched.Path = "/_ah/queue/" + queueName
137 } 132 }
138 133
139 if toSched.ETA.IsZero() { 134 if toSched.ETA.IsZero() {
140 toSched.ETA = clock.Now(c).Add(toSched.Delay) 135 toSched.ETA = clock.Now(c).Add(toSched.Delay)
141 } else if toSched.Delay != 0 { 136 } else if toSched.Delay != 0 {
142 panic("taskqueue: both Delay and ETA are set") 137 panic("taskqueue: both Delay and ETA are set")
143 } 138 }
144 toSched.Delay = 0 139 toSched.Delay = 0
145 140
146 if toSched.Method == "" { 141 if toSched.Method == "" {
147 toSched.Method = "POST" 142 toSched.Method = "POST"
148 } 143 }
149 if _, ok := tqOkMethods[toSched.Method]; !ok { 144 if _, ok := tqOkMethods[toSched.Method]; !ok {
150 » » return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod) 145 » » return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho d)
151 } 146 }
152 if toSched.Method != "POST" && toSched.Method != "PUT" { 147 if toSched.Method != "POST" && toSched.Method != "PUT" {
153 toSched.Payload = nil 148 toSched.Payload = nil
154 } 149 }
155 150
156 if _, ok := toSched.Header[currentNamespace]; !ok { 151 if _, ok := toSched.Header[currentNamespace]; !ok {
157 if ns != "" { 152 if ns != "" {
158 if toSched.Header == nil { 153 if toSched.Header == nil {
159 toSched.Header = http.Header{} 154 toSched.Header = http.Header{}
160 } 155 }
161 toSched.Header[currentNamespace] = []string{ns} 156 toSched.Header[currentNamespace] = []string{ns}
162 } 157 }
163 } 158 }
164 // TODO(riannucci): implement DefaultNamespace 159 // TODO(riannucci): implement DefaultNamespace
165 160
166 if toSched.Name == "" { 161 if toSched.Name == "" {
167 toSched.Name = mkName(c, "", t.named[queueName]) 162 toSched.Name = mkName(c, "", t.named[queueName])
168 } else { 163 } else {
169 if !validTaskName.MatchString(toSched.Name) { 164 if !validTaskName.MatchString(toSched.Name) {
170 » » » return nil, "", errors.New("INVALID_TASK_NAME") 165 » » » return nil, errors.New("INVALID_TASK_NAME")
171 } 166 }
172 } 167 }
173 168
174 » return toSched, queueName, nil 169 » return toSched, nil
175 } 170 }
176 171
177 /////////////////////////////// txnTaskQueueData /////////////////////////////// 172 /////////////////////////////// txnTaskQueueData ///////////////////////////////
178 173
179 type txnTaskQueueData struct { 174 type txnTaskQueueData struct {
180 lock sync.Mutex 175 lock sync.Mutex
181 176
182 // boolean 0 or 1, use atomic.*Int32 to access. 177 // boolean 0 or 1, use atomic.*Int32 to access.
183 closed int32 178 closed int32
184 anony tq.AnonymousQueueData 179 anony tq.AnonymousQueueData
185 parent *taskQueueData 180 parent *taskQueueData
186 } 181 }
187 182
188 var ( 183 var (
189 _ = memContextObj((*txnTaskQueueData)(nil)) 184 _ = memContextObj((*txnTaskQueueData)(nil))
190 _ = tq.Testable((*txnTaskQueueData)(nil)) 185 _ = tq.Testable((*txnTaskQueueData)(nil))
191 ) 186 )
192 187
193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } 188 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
194 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic(" impossible") } 189 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic(" impossible") }
195 func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj { panic(" impossible") } 190 func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj { panic(" impossible") }
196 191
197 func (t *txnTaskQueueData) endTxn() { 192 func (t *txnTaskQueueData) endTxn() {
198 if atomic.LoadInt32(&t.closed) == 1 { 193 if atomic.LoadInt32(&t.closed) == 1 {
199 panic("cannot end transaction twice") 194 panic("cannot end transaction twice")
200 } 195 }
201 atomic.StoreInt32(&t.closed, 1) 196 atomic.StoreInt32(&t.closed, 1)
202 } 197 }
203 198
204 func (t *txnTaskQueueData) run(f func() error) error {
205 // Slightly different from the SDK... datastore and taskqueue each imple ment
206 // this here, where in the SDK only datastore.transaction.Call does.
207 if atomic.LoadInt32(&t.closed) == 1 {
208 return fmt.Errorf("taskqueue: transaction context has expired")
209 }
210 return f()
211 }
212
213 func (t *txnTaskQueueData) ResetTasks() { 199 func (t *txnTaskQueueData) ResetTasks() {
214 t.Lock() 200 t.Lock()
215 defer t.Unlock() 201 defer t.Unlock()
216 202
217 for queuename := range t.anony { 203 for queuename := range t.anony {
218 t.anony[queuename] = nil 204 t.anony[queuename] = nil
219 } 205 }
220 t.parent.resetTasksWithLock() 206 t.parent.resetTasksWithLock()
221 } 207 }
222 208
223 func (t *txnTaskQueueData) Lock() { 209 func (t *txnTaskQueueData) Lock() {
224 t.lock.Lock() 210 t.lock.Lock()
225 t.parent.Lock() 211 t.parent.Lock()
226 } 212 }
227 func (t *txnTaskQueueData) Unlock() { 213 func (t *txnTaskQueueData) Unlock() {
228 t.parent.Unlock() 214 t.parent.Unlock()
229 t.lock.Unlock() 215 t.lock.Unlock()
230 } 216 }
231 217
232 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData { 218 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
233 t.Lock() 219 t.Lock()
234 defer t.Unlock() 220 defer t.Unlock()
235 221
236 ret := make(tq.AnonymousQueueData, len(t.anony)) 222 ret := make(tq.AnonymousQueueData, len(t.anony))
237 for k, vs := range t.anony { 223 for k, vs := range t.anony {
238 ret[k] = make([]*tq.Task, len(vs)) 224 ret[k] = make([]*tq.Task, len(vs))
239 for i, v := range vs { 225 for i, v := range vs {
240 » » » tsk := dupTask(v) 226 » » » tsk := v.Duplicate()
241 tsk.Name = "" 227 tsk.Name = ""
242 ret[k][i] = tsk 228 ret[k][i] = tsk
243 } 229 }
244 } 230 }
245 231
246 return ret 232 return ret
247 } 233 }
248 234
249 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData { 235 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData {
250 return t.parent.GetTombstonedTasks() 236 return t.parent.GetTombstonedTasks()
251 } 237 }
252 238
253 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { 239 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData {
254 return t.parent.GetScheduledTasks() 240 return t.parent.GetScheduledTasks()
255 } 241 }
256 242
257 func (t *txnTaskQueueData) CreateQueue(queueName string) { 243 func (t *txnTaskQueueData) CreateQueue(queueName string) {
258 t.parent.CreateQueue(queueName) 244 t.parent.CreateQueue(queueName)
259 } 245 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698