Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1148)

Side by Side Diff: impl/memory/taskqueue_data.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: address comments Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/taskqueue.go ('k') | impl/memory/taskqueue_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
99 } 99 }
100 } 100 }
101 101
102 func (t *taskQueueData) ResetTasks() { 102 func (t *taskQueueData) ResetTasks() {
103 t.Lock() 103 t.Lock()
104 defer t.Unlock() 104 defer t.Unlock()
105 105
106 t.resetTasksWithLock() 106 t.resetTasksWithLock()
107 } 107 }
108 108
109 func (t *taskQueueData) getQueueName(queueName string) (string, error) { 109 func (t *taskQueueData) getQueueNameLocked(queueName string) (string, error) {
110 if queueName == "" { 110 if queueName == "" {
111 queueName = "default" 111 queueName = "default"
112 } 112 }
113 if _, ok := t.named[queueName]; !ok { 113 if _, ok := t.named[queueName]; !ok {
114 return "", errors.New("UNKNOWN_QUEUE") 114 return "", errors.New("UNKNOWN_QUEUE")
115 } 115 }
116 return queueName, nil 116 return queueName, nil
117 } 117 }
118 118
119 func (t *taskQueueData) purgeLocked(queueName string) error {
120 queueName, err := t.getQueueNameLocked(queueName)
121 if err != nil {
122 return err
123 }
124
125 t.named[queueName] = map[string]*tq.Task{}
126 t.archived[queueName] = map[string]*tq.Task{}
127 return nil
128 }
129
119 var tqOkMethods = map[string]struct{}{ 130 var tqOkMethods = map[string]struct{}{
120 "GET": {}, 131 "GET": {},
121 "POST": {}, 132 "POST": {},
122 "HEAD": {}, 133 "HEAD": {},
123 "PUT": {}, 134 "PUT": {},
124 "DELETE": {}, 135 "DELETE": {},
125 } 136 }
126 137
127 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu eueName string) (*tq.Task, string, error) { 138 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu eueName string) (*tq.Task, error) {
128 » queueName, err := t.getQueueName(queueName) 139 » toSched := task.Duplicate()
129 » if err != nil {
130 » » return nil, "", err
131 » }
132
133 » toSched := dupTask(task)
134 140
135 if toSched.Path == "" { 141 if toSched.Path == "" {
136 toSched.Path = "/_ah/queue/" + queueName 142 toSched.Path = "/_ah/queue/" + queueName
137 } 143 }
138 144
139 if toSched.ETA.IsZero() { 145 if toSched.ETA.IsZero() {
140 toSched.ETA = clock.Now(c).Add(toSched.Delay) 146 toSched.ETA = clock.Now(c).Add(toSched.Delay)
141 } else if toSched.Delay != 0 { 147 } else if toSched.Delay != 0 {
142 panic("taskqueue: both Delay and ETA are set") 148 panic("taskqueue: both Delay and ETA are set")
143 } 149 }
144 toSched.Delay = 0 150 toSched.Delay = 0
145 151
146 if toSched.Method == "" { 152 if toSched.Method == "" {
147 toSched.Method = "POST" 153 toSched.Method = "POST"
148 } 154 }
149 if _, ok := tqOkMethods[toSched.Method]; !ok { 155 if _, ok := tqOkMethods[toSched.Method]; !ok {
150 » » return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod) 156 » » return nil, fmt.Errorf("taskqueue: bad method %q", toSched.Metho d)
151 } 157 }
152 if toSched.Method != "POST" && toSched.Method != "PUT" { 158 if toSched.Method != "POST" && toSched.Method != "PUT" {
153 toSched.Payload = nil 159 toSched.Payload = nil
154 } 160 }
155 161
156 if _, ok := toSched.Header[currentNamespace]; !ok { 162 if _, ok := toSched.Header[currentNamespace]; !ok {
157 if ns != "" { 163 if ns != "" {
158 if toSched.Header == nil { 164 if toSched.Header == nil {
159 toSched.Header = http.Header{} 165 toSched.Header = http.Header{}
160 } 166 }
161 toSched.Header[currentNamespace] = []string{ns} 167 toSched.Header[currentNamespace] = []string{ns}
162 } 168 }
163 } 169 }
164 // TODO(riannucci): implement DefaultNamespace 170 // TODO(riannucci): implement DefaultNamespace
165 171
166 if toSched.Name == "" { 172 if toSched.Name == "" {
167 toSched.Name = mkName(c, "", t.named[queueName]) 173 toSched.Name = mkName(c, "", t.named[queueName])
168 } else { 174 } else {
169 if !validTaskName.MatchString(toSched.Name) { 175 if !validTaskName.MatchString(toSched.Name) {
170 » » » return nil, "", errors.New("INVALID_TASK_NAME") 176 » » » return nil, errors.New("INVALID_TASK_NAME")
171 } 177 }
172 } 178 }
173 179
174 » return toSched, queueName, nil 180 » return toSched, nil
175 } 181 }
176 182
177 /////////////////////////////// txnTaskQueueData /////////////////////////////// 183 /////////////////////////////// txnTaskQueueData ///////////////////////////////
178 184
179 type txnTaskQueueData struct { 185 type txnTaskQueueData struct {
180 lock sync.Mutex 186 lock sync.Mutex
181 187
182 // boolean 0 or 1, use atomic.*Int32 to access. 188 // boolean 0 or 1, use atomic.*Int32 to access.
183 closed int32 189 closed int32
184 anony tq.AnonymousQueueData 190 anony tq.AnonymousQueueData
185 parent *taskQueueData 191 parent *taskQueueData
186 } 192 }
187 193
188 var ( 194 var (
189 _ = memContextObj((*txnTaskQueueData)(nil)) 195 _ = memContextObj((*txnTaskQueueData)(nil))
190 _ = tq.Testable((*txnTaskQueueData)(nil)) 196 _ = tq.Testable((*txnTaskQueueData)(nil))
191 ) 197 )
192 198
193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } 199 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
194 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic(" impossible") } 200 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic(" impossible") }
195 func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj { panic(" impossible") } 201 func (t *txnTaskQueueData) mkTxn(*ds.TransactionOptions) memContextObj { panic(" impossible") }
196 202
197 func (t *txnTaskQueueData) endTxn() { 203 func (t *txnTaskQueueData) endTxn() {
198 if atomic.LoadInt32(&t.closed) == 1 { 204 if atomic.LoadInt32(&t.closed) == 1 {
199 panic("cannot end transaction twice") 205 panic("cannot end transaction twice")
200 } 206 }
201 atomic.StoreInt32(&t.closed, 1) 207 atomic.StoreInt32(&t.closed, 1)
202 } 208 }
203 209
204 func (t *txnTaskQueueData) run(f func() error) error {
205 // Slightly different from the SDK... datastore and taskqueue each imple ment
206 // this here, where in the SDK only datastore.transaction.Call does.
207 if atomic.LoadInt32(&t.closed) == 1 {
208 return fmt.Errorf("taskqueue: transaction context has expired")
209 }
210 return f()
211 }
212
213 func (t *txnTaskQueueData) ResetTasks() { 210 func (t *txnTaskQueueData) ResetTasks() {
214 t.Lock() 211 t.Lock()
215 defer t.Unlock() 212 defer t.Unlock()
216 213
217 for queuename := range t.anony { 214 for queuename := range t.anony {
218 t.anony[queuename] = nil 215 t.anony[queuename] = nil
219 } 216 }
220 t.parent.resetTasksWithLock() 217 t.parent.resetTasksWithLock()
221 } 218 }
222 219
223 func (t *txnTaskQueueData) Lock() { 220 func (t *txnTaskQueueData) Lock() {
224 t.lock.Lock() 221 t.lock.Lock()
225 t.parent.Lock() 222 t.parent.Lock()
226 } 223 }
227 func (t *txnTaskQueueData) Unlock() { 224 func (t *txnTaskQueueData) Unlock() {
228 t.parent.Unlock() 225 t.parent.Unlock()
229 t.lock.Unlock() 226 t.lock.Unlock()
230 } 227 }
231 228
232 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData { 229 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
233 t.Lock() 230 t.Lock()
234 defer t.Unlock() 231 defer t.Unlock()
235 232
236 ret := make(tq.AnonymousQueueData, len(t.anony)) 233 ret := make(tq.AnonymousQueueData, len(t.anony))
237 for k, vs := range t.anony { 234 for k, vs := range t.anony {
238 ret[k] = make([]*tq.Task, len(vs)) 235 ret[k] = make([]*tq.Task, len(vs))
239 for i, v := range vs { 236 for i, v := range vs {
240 » » » tsk := dupTask(v) 237 » » » tsk := v.Duplicate()
241 tsk.Name = "" 238 tsk.Name = ""
242 ret[k][i] = tsk 239 ret[k][i] = tsk
243 } 240 }
244 } 241 }
245 242
246 return ret 243 return ret
247 } 244 }
248 245
249 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData { 246 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData {
250 return t.parent.GetTombstonedTasks() 247 return t.parent.GetTombstonedTasks()
251 } 248 }
252 249
253 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData { 250 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData {
254 return t.parent.GetScheduledTasks() 251 return t.parent.GetScheduledTasks()
255 } 252 }
256 253
257 func (t *txnTaskQueueData) CreateQueue(queueName string) { 254 func (t *txnTaskQueueData) CreateQueue(queueName string) {
258 t.parent.CreateQueue(queueName) 255 t.parent.CreateQueue(queueName)
259 } 256 }
OLDNEW
« no previous file with comments | « impl/memory/taskqueue.go ('k') | impl/memory/taskqueue_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698