Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(104)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1222903002: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: more fixes Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "fmt"
10 "infra/gae/libs/wrapper"
11 "net/http"
12 "sync"
13 "sync/atomic"
14
15 "appengine/datastore"
16 "appengine/taskqueue"
17 pb "appengine_internal/taskqueue"
18 "golang.org/x/net/context"
19 "infra/libs/clock"
20 )
21
22 var (
23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
25 )
26
27 //////////////////////////////// taskQueueData /////////////////////////////////
28
29 type taskQueueData struct {
30 sync.Mutex
31 wrapper.BrokenFeatures
32
33 named wrapper.QueueData
34 archived wrapper.QueueData
35 }
36
37 var (
38 _ = memContextObj((*taskQueueData)(nil))
39 _ = wrapper.TQTestable((*taskQueueData)(nil))
40 )
41
42 func newTaskQueueData() memContextObj {
43 return &taskQueueData{
44 BrokenFeatures: wrapper.BrokenFeatures{
45 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)},
46 named: wrapper.QueueData{"default": {}},
47 archived: wrapper.QueueData{"default": {}},
48 }
49 }
50
51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
52 func (t *taskQueueData) endTxn() {}
53 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
54 txn := obj.(*txnTaskQueueData)
55 for qn, tasks := range txn.anony {
56 for _, tsk := range tasks {
57 tsk.Name = mkName(c, tsk.Name, t.named[qn])
58 t.named[qn][tsk.Name] = tsk
59 }
60 }
61 txn.anony = nil
62 }
63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) {
64 return &txnTaskQueueData{
65 BrokenFeatures: &t.BrokenFeatures,
66 parent: t,
67 anony: wrapper.AnonymousQueueData{},
68 }, nil
69 }
70
71 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
72 return nil
73 }
74
75 func (t *taskQueueData) CreateQueue(queueName string) {
76 t.Lock()
77 defer t.Unlock()
78
79 if _, ok := t.named[queueName]; ok {
80 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
81 }
82 t.named[queueName] = map[string]*taskqueue.Task{}
83 t.archived[queueName] = map[string]*taskqueue.Task{}
84 }
85
86 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
87 t.Lock()
88 defer t.Unlock()
89
90 return dupQueue(t.named)
91 }
92
93 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
94 t.Lock()
95 defer t.Unlock()
96
97 return dupQueue(t.archived)
98 }
99
100 func (t *taskQueueData) resetTasksWithLock() {
101 for queuename := range t.named {
102 t.named[queuename] = map[string]*taskqueue.Task{}
103 t.archived[queuename] = map[string]*taskqueue.Task{}
104 }
105 }
106
107 func (t *taskQueueData) ResetTasks() {
108 t.Lock()
109 defer t.Unlock()
110
111 t.resetTasksWithLock()
112 }
113
114 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
115 if queueName == "" {
116 queueName = "default"
117 }
118 if _, ok := t.named[queueName]; !ok {
119 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
120 }
121 return queueName, nil
122 }
123
124 func (t *taskQueueData) prepTask(c context.Context, ns string, task *taskqueue.T ask, queueName string) (
125 *taskqueue.Task, string, error) {
126 queueName, err := t.getQueueName(queueName)
127 if err != nil {
128 return nil, "", err
129 }
130
131 toSched := dupTask(task)
132
133 if toSched.Path == "" {
134 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
135 }
136
137 if toSched.ETA.IsZero() {
138 toSched.ETA = clock.Now(c).Add(toSched.Delay)
139 } else if toSched.Delay != 0 {
140 panic("taskqueue: both Delay and ETA are set")
141 }
142 toSched.Delay = 0
143
144 if toSched.Method == "" {
145 toSched.Method = "POST"
146 }
147 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
148 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
149 }
150 if toSched.Method != "POST" && toSched.Method != "PUT" {
151 toSched.Payload = nil
152 }
153
154 if _, ok := toSched.Header[currentNamespace]; !ok {
155 if ns != "" {
156 if toSched.Header == nil {
157 toSched.Header = http.Header{}
158 }
159 toSched.Header[currentNamespace] = []string{ns}
160 }
161 }
162 // TODO(riannucci): implement DefaultNamespace
163
164 if toSched.Name == "" {
165 toSched.Name = mkName(c, "", t.named[queueName])
166 } else {
167 if !validTaskName.MatchString(toSched.Name) {
168 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME)
169 }
170 }
171
172 return toSched, queueName, nil
173 }
174
175 /////////////////////////////// txnTaskQueueData ///////////////////////////////
176
177 type txnTaskQueueData struct {
178 *wrapper.BrokenFeatures
179
180 lock sync.Mutex
181
182 // boolean 0 or 1, use atomic.*Int32 to access.
183 closed int32
184 anony wrapper.AnonymousQueueData
185 parent *taskQueueData
186 }
187
188 var (
189 _ = memContextObj((*txnTaskQueueData)(nil))
190 _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
191 )
192
193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
194
195 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) {
196 panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
197 }
198
199 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
200 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
201 }
202
203 func (t *txnTaskQueueData) endTxn() {
204 if atomic.LoadInt32(&t.closed) == 1 {
205 panic("cannot end transaction twice")
206 }
207 atomic.StoreInt32(&t.closed, 1)
208 }
209
210 func (t *txnTaskQueueData) IsBroken() error {
211 // Slightly different from the SDK... datastore and taskqueue each imple ment
212 // this here, where in the SDK only datastore.transaction.Call does.
213 if atomic.LoadInt32(&t.closed) == 1 {
214 return fmt.Errorf("taskqueue: transaction context has expired")
215 }
216 return t.parent.IsBroken()
217 }
218
219 func (t *txnTaskQueueData) ResetTasks() {
220 t.Lock()
221 defer t.Unlock()
222
223 for queuename := range t.anony {
224 t.anony[queuename] = nil
225 }
226 t.parent.resetTasksWithLock()
227 }
228
229 func (t *txnTaskQueueData) Lock() {
230 t.lock.Lock()
231 t.parent.Lock()
232 }
233 func (t *txnTaskQueueData) Unlock() {
234 t.parent.Unlock()
235 t.lock.Unlock()
236 }
237
238 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
239 t.Lock()
240 defer t.Unlock()
241
242 ret := make(wrapper.AnonymousQueueData, len(t.anony))
243 for k, vs := range t.anony {
244 ret[k] = make([]*taskqueue.Task, len(vs))
245 for i, v := range vs {
246 tsk := dupTask(v)
247 tsk.Name = ""
248 ret[k][i] = tsk
249 }
250 }
251
252 return ret
253 }
254
255 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
256 return t.parent.GetTombstonedTasks()
257 }
258
259 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
260 return t.parent.GetScheduledTasks()
261 }
262
263 func (t *txnTaskQueueData) CreateQueue(queueName string) {
264 t.parent.CreateQueue(queueName)
265 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698