Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(240)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1230303003: Revert "Refactor current GAE abstraction library to be free of the SDK*" (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 » "golang.org/x/net/context" 10 » "infra/gae/libs/wrapper"
11 "net/http" 11 "net/http"
12 "sync" 12 "sync"
13 "sync/atomic" 13 "sync/atomic"
14 14
15 » "infra/gae/libs/gae" 15 » "appengine/datastore"
16 16 » "appengine/taskqueue"
17 » "github.com/luci/luci-go/common/clock" 17 » pb "appengine_internal/taskqueue"
18 » "golang.org/x/net/context"
19 » "infra/libs/clock"
18 ) 20 )
19 21
20 var ( 22 var (
21 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e") 23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
22 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e") 24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
23 ) 25 )
24 26
25 //////////////////////////////// taskQueueData ///////////////////////////////// 27 //////////////////////////////// taskQueueData /////////////////////////////////
26 28
27 type taskQueueData struct { 29 type taskQueueData struct {
28 sync.Mutex 30 sync.Mutex
29 » gae.BrokenFeatures 31 » wrapper.BrokenFeatures
30 32
31 » named gae.QueueData 33 » named wrapper.QueueData
32 » archived gae.QueueData 34 » archived wrapper.QueueData
33 } 35 }
34 36
35 var ( 37 var (
36 _ = memContextObj((*taskQueueData)(nil)) 38 _ = memContextObj((*taskQueueData)(nil))
37 » _ = gae.TQTestable((*taskQueueData)(nil)) 39 » _ = wrapper.TQTestable((*taskQueueData)(nil))
38 ) 40 )
39 41
40 func newTaskQueueData() memContextObj { 42 func newTaskQueueData() memContextObj {
41 return &taskQueueData{ 43 return &taskQueueData{
42 » » BrokenFeatures: gae.BrokenFeatures{ 44 » » BrokenFeatures: wrapper.BrokenFeatures{
43 » » » DefaultError: errors.New("TRANSIENT_ERROR")}, 45 » » » DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)},
44 » » named: gae.QueueData{"default": {}}, 46 » » named: wrapper.QueueData{"default": {}},
45 » » archived: gae.QueueData{"default": {}}, 47 » » archived: wrapper.QueueData{"default": {}},
46 } 48 }
47 } 49 }
48 50
49 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } 51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
50 func (t *taskQueueData) endTxn() {} 52 func (t *taskQueueData) endTxn() {}
51 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { 53 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
52 txn := obj.(*txnTaskQueueData) 54 txn := obj.(*txnTaskQueueData)
53 for qn, tasks := range txn.anony { 55 for qn, tasks := range txn.anony {
54 for _, tsk := range tasks { 56 for _, tsk := range tasks {
55 tsk.Name = mkName(c, tsk.Name, t.named[qn]) 57 tsk.Name = mkName(c, tsk.Name, t.named[qn])
56 t.named[qn][tsk.Name] = tsk 58 t.named[qn][tsk.Name] = tsk
57 } 59 }
58 } 60 }
59 txn.anony = nil 61 txn.anony = nil
60 } 62 }
61 func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) { 63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) {
62 return &txnTaskQueueData{ 64 return &txnTaskQueueData{
63 BrokenFeatures: &t.BrokenFeatures, 65 BrokenFeatures: &t.BrokenFeatures,
64 parent: t, 66 parent: t,
65 » » anony: gae.AnonymousQueueData{}, 67 » » anony: wrapper.AnonymousQueueData{},
66 }, nil 68 }, nil
67 } 69 }
68 70
69 func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData { 71 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
70 return nil 72 return nil
71 } 73 }
72 74
73 func (t *taskQueueData) CreateQueue(queueName string) { 75 func (t *taskQueueData) CreateQueue(queueName string) {
74 t.Lock() 76 t.Lock()
75 defer t.Unlock() 77 defer t.Unlock()
76 78
77 if _, ok := t.named[queueName]; ok { 79 if _, ok := t.named[queueName]; ok {
78 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName)) 80 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
79 } 81 }
80 » t.named[queueName] = map[string]*gae.TQTask{} 82 » t.named[queueName] = map[string]*taskqueue.Task{}
81 » t.archived[queueName] = map[string]*gae.TQTask{} 83 » t.archived[queueName] = map[string]*taskqueue.Task{}
82 } 84 }
83 85
84 func (t *taskQueueData) GetScheduledTasks() gae.QueueData { 86 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
85 t.Lock() 87 t.Lock()
86 defer t.Unlock() 88 defer t.Unlock()
87 89
88 return dupQueue(t.named) 90 return dupQueue(t.named)
89 } 91 }
90 92
91 func (t *taskQueueData) GetTombstonedTasks() gae.QueueData { 93 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
92 t.Lock() 94 t.Lock()
93 defer t.Unlock() 95 defer t.Unlock()
94 96
95 return dupQueue(t.archived) 97 return dupQueue(t.archived)
96 } 98 }
97 99
98 func (t *taskQueueData) resetTasksWithLock() { 100 func (t *taskQueueData) resetTasksWithLock() {
99 » for queueName := range t.named { 101 » for queuename := range t.named {
100 » » t.named[queueName] = map[string]*gae.TQTask{} 102 » » t.named[queuename] = map[string]*taskqueue.Task{}
101 » » t.archived[queueName] = map[string]*gae.TQTask{} 103 » » t.archived[queuename] = map[string]*taskqueue.Task{}
102 } 104 }
103 } 105 }
104 106
105 func (t *taskQueueData) ResetTasks() { 107 func (t *taskQueueData) ResetTasks() {
106 t.Lock() 108 t.Lock()
107 defer t.Unlock() 109 defer t.Unlock()
108 110
109 t.resetTasksWithLock() 111 t.resetTasksWithLock()
110 } 112 }
111 113
112 func (t *taskQueueData) getQueueName(queueName string) (string, error) { 114 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
113 if queueName == "" { 115 if queueName == "" {
114 queueName = "default" 116 queueName = "default"
115 } 117 }
116 if _, ok := t.named[queueName]; !ok { 118 if _, ok := t.named[queueName]; !ok {
117 » » return "", errors.New("UNKNOWN_QUEUE") 119 » » return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
118 } 120 }
119 return queueName, nil 121 return queueName, nil
120 } 122 }
121 123
122 var tqOkMethods = map[string]struct{}{ 124 func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T ask, queueName string) (
123 » "GET": {}, 125 » *taskqueue.Task, string, error) {
124 » "POST": {},
125 » "HEAD": {},
126 » "PUT": {},
127 » "DELETE": {},
128 }
129
130 func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) {
131 queueName, err := t.getQueueName(queueName) 126 queueName, err := t.getQueueName(queueName)
132 if err != nil { 127 if err != nil {
133 return nil, "", err 128 return nil, "", err
134 } 129 }
135 130
136 toSched := dupTask(task) 131 toSched := dupTask(task)
137 132
138 if toSched.Path == "" { 133 if toSched.Path == "" {
139 » » return nil, "", errors.New("INVALID_URL") 134 » » return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
140 } 135 }
141 136
142 if toSched.ETA.IsZero() { 137 if toSched.ETA.IsZero() {
143 toSched.ETA = clock.Now(c).Add(toSched.Delay) 138 toSched.ETA = clock.Now(c).Add(toSched.Delay)
144 } else if toSched.Delay != 0 { 139 } else if toSched.Delay != 0 {
145 panic("taskqueue: both Delay and ETA are set") 140 panic("taskqueue: both Delay and ETA are set")
146 } 141 }
147 toSched.Delay = 0 142 toSched.Delay = 0
148 143
149 if toSched.Method == "" { 144 if toSched.Method == "" {
150 toSched.Method = "POST" 145 toSched.Method = "POST"
151 } 146 }
152 » if _, ok := tqOkMethods[toSched.Method]; !ok { 147 » if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
153 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod) 148 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
154 } 149 }
155 if toSched.Method != "POST" && toSched.Method != "PUT" { 150 if toSched.Method != "POST" && toSched.Method != "PUT" {
156 toSched.Payload = nil 151 toSched.Payload = nil
157 } 152 }
158 153
159 if _, ok := toSched.Header[currentNamespace]; !ok { 154 if _, ok := toSched.Header[currentNamespace]; !ok {
160 if ns != "" { 155 if ns != "" {
161 if toSched.Header == nil { 156 if toSched.Header == nil {
162 toSched.Header = http.Header{} 157 toSched.Header = http.Header{}
163 } 158 }
164 toSched.Header[currentNamespace] = []string{ns} 159 toSched.Header[currentNamespace] = []string{ns}
165 } 160 }
166 } 161 }
167 // TODO(riannucci): implement DefaultNamespace 162 // TODO(riannucci): implement DefaultNamespace
168 163
169 if toSched.Name == "" { 164 if toSched.Name == "" {
170 toSched.Name = mkName(c, "", t.named[queueName]) 165 toSched.Name = mkName(c, "", t.named[queueName])
171 } else { 166 } else {
172 if !validTaskName.MatchString(toSched.Name) { 167 if !validTaskName.MatchString(toSched.Name) {
173 » » » return nil, "", errors.New("INVALID_TASK_NAME") 168 » » » return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME)
174 } 169 }
175 } 170 }
176 171
177 return toSched, queueName, nil 172 return toSched, queueName, nil
178 } 173 }
179 174
180 /////////////////////////////// txnTaskQueueData /////////////////////////////// 175 /////////////////////////////// txnTaskQueueData ///////////////////////////////
181 176
182 type txnTaskQueueData struct { 177 type txnTaskQueueData struct {
183 » *gae.BrokenFeatures 178 » *wrapper.BrokenFeatures
184 179
185 lock sync.Mutex 180 lock sync.Mutex
186 181
187 // boolean 0 or 1, use atomic.*Int32 to access. 182 // boolean 0 or 1, use atomic.*Int32 to access.
188 closed int32 183 closed int32
189 » anony gae.AnonymousQueueData 184 » anony wrapper.AnonymousQueueData
190 parent *taskQueueData 185 parent *taskQueueData
191 } 186 }
192 187
193 var ( 188 var (
194 _ = memContextObj((*txnTaskQueueData)(nil)) 189 _ = memContextObj((*txnTaskQueueData)(nil))
195 » _ = gae.TQTestable((*txnTaskQueueData)(nil)) 190 » _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
196 ) 191 )
197 192
198 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false } 193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
199 194
200 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { 195 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
201 panic(errors.New("txnTaskQueueData.applyTxn is not implemented")) 196 panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
202 } 197 }
203 198
204 func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) (memContextObj, erro r) { 199 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
205 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented") 200 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
206 } 201 }
207 202
208 func (t *txnTaskQueueData) endTxn() { 203 func (t *txnTaskQueueData) endTxn() {
209 if atomic.LoadInt32(&t.closed) == 1 { 204 if atomic.LoadInt32(&t.closed) == 1 {
210 panic("cannot end transaction twice") 205 panic("cannot end transaction twice")
211 } 206 }
212 atomic.StoreInt32(&t.closed, 1) 207 atomic.StoreInt32(&t.closed, 1)
213 } 208 }
214 209
215 func (t *txnTaskQueueData) RunIfNotBroken(f func() error) error { 210 func (t *txnTaskQueueData) IsBroken() error {
216 // Slightly different from the SDK... datastore and taskqueue each imple ment 211 // Slightly different from the SDK... datastore and taskqueue each imple ment
217 // this here, where in the SDK only datastore.transaction.Call does. 212 // this here, where in the SDK only datastore.transaction.Call does.
218 if atomic.LoadInt32(&t.closed) == 1 { 213 if atomic.LoadInt32(&t.closed) == 1 {
219 return fmt.Errorf("taskqueue: transaction context has expired") 214 return fmt.Errorf("taskqueue: transaction context has expired")
220 } 215 }
221 » return t.parent.RunIfNotBroken(f) 216 » return t.parent.IsBroken()
222 } 217 }
223 218
224 func (t *txnTaskQueueData) ResetTasks() { 219 func (t *txnTaskQueueData) ResetTasks() {
225 t.Lock() 220 t.Lock()
226 defer t.Unlock() 221 defer t.Unlock()
227 222
228 for queuename := range t.anony { 223 for queuename := range t.anony {
229 t.anony[queuename] = nil 224 t.anony[queuename] = nil
230 } 225 }
231 t.parent.resetTasksWithLock() 226 t.parent.resetTasksWithLock()
232 } 227 }
233 228
234 func (t *txnTaskQueueData) Lock() { 229 func (t *txnTaskQueueData) Lock() {
235 t.lock.Lock() 230 t.lock.Lock()
236 t.parent.Lock() 231 t.parent.Lock()
237 } 232 }
238 func (t *txnTaskQueueData) Unlock() { 233 func (t *txnTaskQueueData) Unlock() {
239 t.parent.Unlock() 234 t.parent.Unlock()
240 t.lock.Unlock() 235 t.lock.Unlock()
241 } 236 }
242 237
243 func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData { 238 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
244 t.Lock() 239 t.Lock()
245 defer t.Unlock() 240 defer t.Unlock()
246 241
247 » ret := make(gae.AnonymousQueueData, len(t.anony)) 242 » ret := make(wrapper.AnonymousQueueData, len(t.anony))
248 for k, vs := range t.anony { 243 for k, vs := range t.anony {
249 » » ret[k] = make([]*gae.TQTask, len(vs)) 244 » » ret[k] = make([]*taskqueue.Task, len(vs))
250 for i, v := range vs { 245 for i, v := range vs {
251 tsk := dupTask(v) 246 tsk := dupTask(v)
252 tsk.Name = "" 247 tsk.Name = ""
253 ret[k][i] = tsk 248 ret[k][i] = tsk
254 } 249 }
255 } 250 }
256 251
257 return ret 252 return ret
258 } 253 }
259 254
260 func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData { 255 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
261 return t.parent.GetTombstonedTasks() 256 return t.parent.GetTombstonedTasks()
262 } 257 }
263 258
264 func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData { 259 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
265 return t.parent.GetScheduledTasks() 260 return t.parent.GetScheduledTasks()
266 } 261 }
267 262
268 func (t *txnTaskQueueData) CreateQueue(queueName string) { 263 func (t *txnTaskQueueData) CreateQueue(queueName string) {
269 t.parent.CreateQueue(queueName) 264 t.parent.CreateQueue(queueName)
270 } 265 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/taskqueue.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698