Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(315)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: fixes Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "fmt"
10 "math/rand"
11 "net/http"
12 "sync"
13 "sync/atomic"
14 "time"
15
16 "appengine/datastore"
17 "appengine/taskqueue"
18 pb "appengine_internal/taskqueue"
19
20 "infra/gae/libs/wrapper"
21 )
22
23 var (
24 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
25 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
26 )
27
28 //////////////////////////////// taskQueueData /////////////////////////////////
29
30 type taskQueueData struct {
31 sync.Mutex
32 *wrapper.BrokenFeatures
33
34 named wrapper.QueueData
35 archived wrapper.QueueData
36 }
37
38 ////////////////////////////// New(taskQueueData) //////////////////////////////
39
40 func newTaskQueueData() memContextObj {
41 return &taskQueueData{
42 BrokenFeatures: wrapper.NewBrokenFeatures(
43 newTQError(pb.TaskQueueServiceError_TRANSIENT_ERROR)),
44 named: wrapper.QueueData{"default": {}},
45 archived: wrapper.QueueData{"default": {}},
46 }
47 }
48
49 ///////////////////////// memContextObj(taskQueueData) /////////////////////////
50
51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
52 func (t *taskQueueData) endTxn() {}
53 func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) {
54 txn := obj.(*txnTaskQueueData)
55 for qn, tasks := range txn.anony {
56 for _, tsk := range tasks {
57 tsk.Name = mkName(rnd, tsk.Name, t.named[qn])
58 t.named[qn][tsk.Name] = tsk
59 }
60 }
61 txn.anony = nil
62 }
63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) {
64 return &txnTaskQueueData{
65 BrokenFeatures: t.BrokenFeatures,
66 parent: t,
67 anony: wrapper.AnonymousQueueData{},
68 }, nil
69 }
70
71 ////////////////////// wrapper.TQTestable(taskQueueData) ///////////////////////
72
73 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
74 return nil
75 }
76
77 func (t *taskQueueData) AddQueue(queueName string) {
78 t.Lock()
79 defer t.Unlock()
80
81 if _, ok := t.named[queueName]; ok {
82 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
83 }
84 t.named[queueName] = map[string]*taskqueue.Task{}
85 t.archived[queueName] = map[string]*taskqueue.Task{}
86 }
87
88 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
89 t.Lock()
90 defer t.Unlock()
91
92 return dupQueue(t.named)
93 }
94
95 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
96 t.Lock()
97 defer t.Unlock()
98
99 return dupQueue(t.archived)
100 }
101
102 func (t *taskQueueData) resetTasksWithLock() {
103 for queuename := range t.named {
104 t.named[queuename] = map[string]*taskqueue.Task{}
105 t.archived[queuename] = map[string]*taskqueue.Task{}
106 }
107 }
108
109 func (t *taskQueueData) ResetTasks() {
110 t.Lock()
111 defer t.Unlock()
112
113 t.resetTasksWithLock()
114 }
115
116 /////////////////////////// taskQueueData (private) ////////////////////////////
117
118 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
119 if queueName == "" {
120 queueName = "default"
121 }
122 if _, ok := t.named[queueName]; !ok {
123 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
124 }
125 return queueName, nil
126 }
127
128 func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri ng, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) {
129 queueName, err := t.getQueueName(queueName)
130 if err != nil {
131 return nil, "", err
132 }
133
134 toSched := dupTask(task)
135
136 if toSched.Path == "" {
137 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
138 }
139
140 if toSched.ETA.IsZero() {
141 toSched.ETA = now.Add(toSched.Delay)
142 } else if toSched.Delay != 0 {
143 panic("taskqueue: both Delay and ETA are set")
144 }
145 toSched.Delay = 0
146
147 if toSched.Method == "" {
148 toSched.Method = "POST"
149 }
150 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
151 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
152 }
153 if toSched.Method != "POST" && toSched.Method != "PUT" {
154 toSched.Payload = nil
155 }
156
157 if _, ok := toSched.Header[currentNamespace]; !ok {
158 if ns != "" {
159 if toSched.Header == nil {
160 toSched.Header = http.Header{}
161 }
162 toSched.Header[currentNamespace] = []string{ns}
163 }
164 }
165 // TODO(riannucci): implement DefaultNamespace
166
167 if toSched.Name == "" {
168 toSched.Name = mkName(rnd, "", t.named[queueName])
169 } else {
170 if !validTaskName.MatchString(toSched.Name) {
171 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME)
172 }
173 }
174
175 return toSched, queueName, nil
176 }
177
178 /////////////////////////////// txnTaskQueueData ///////////////////////////////
179
180 type txnTaskQueueData struct {
181 *wrapper.BrokenFeatures
182
183 lock sync.Mutex
184
185 // boolean 0 or 1, use atomic.*Int32 to access.
186 closed int32
187 anony wrapper.AnonymousQueueData
188 parent *taskQueueData
189 }
190
191 /////////////////////// memContextObj(txnTaskQueueData) ////////////////////////
192
193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
194
195 func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) {
196 panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
197 }
198
199 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
200 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
201 }
202
203 func (t *txnTaskQueueData) endTxn() {
204 if atomic.LoadInt32(&t.closed) == 1 {
205 panic("cannot end transaction twice")
206 }
207 atomic.StoreInt32(&t.closed, 1)
208 }
209
210 /////////////////// wrapper.BrokenFeatures(txnTaskQueueData) ///////////////////
211
212 func (t *txnTaskQueueData) IsBroken() error {
213 // Slightly different from the SDK... datastore and taskqueue each imple ment
214 // this here, where in the SDK only datastore.transaction.Call does.
215 if atomic.LoadInt32(&t.closed) == 1 {
216 return fmt.Errorf("taskqueue: transaction context has expired")
217 }
218 return t.parent.IsBroken()
219 }
220
221 ///////////////////// wrapper.TQTestable(txnTaskQueueData) /////////////////////
222
223 func (t *txnTaskQueueData) ResetTasks() {
224 t.Lock()
225 defer t.Unlock()
226
227 for queuename := range t.anony {
228 t.anony[queuename] = []*taskqueue.Task{}
M-A Ruel 2015/05/25 18:21:10 you could set it to nil and it'd be fine
iannucci 2015/05/27 19:33:32 done
229 }
230 t.parent.resetTasksWithLock()
231 }
232
233 func (t *txnTaskQueueData) Lock() {
234 t.lock.Lock()
235 t.parent.Lock()
236 }
237 func (t *txnTaskQueueData) Unlock() {
238 t.parent.Unlock()
239 t.lock.Unlock()
240 }
241
242 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
243 t.Lock()
244 defer t.Unlock()
245
246 ret := make(wrapper.AnonymousQueueData, len(t.anony))
247 for k, vs := range t.anony {
248 ret[k] = make([]*taskqueue.Task, len(vs))
249 for i, v := range vs {
250 tsk := dupTask(v)
251 tsk.Name = ""
252 ret[k][i] = tsk
253 }
254 }
255
256 return ret
257 }
258
259 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
260 return t.parent.GetTombstonedTasks()
261 }
262
263 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
264 return t.parent.GetScheduledTasks()
265 }
266
267 func (t *txnTaskQueueData) AddQueue(queueName string) {
268 t.parent.AddQueue(queueName)
269 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698