Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: fix capitalization Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "fmt"
10 "infra/gae/libs/wrapper"
11 "math/rand"
12 "net/http"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "appengine/datastore"
18 "appengine/taskqueue"
19 pb "appengine_internal/taskqueue"
20 )
21
22 var (
23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
25 )
26
27 //////////////////////////////// taskQueueData /////////////////////////////////
28
29 type taskQueueData struct {
30 sync.Mutex
31 *wrapper.BrokenFeatures
M-A Ruel 2015/05/27 20:14:48 Doesn't need to be a pointer.
iannucci 2015/05/27 21:36:22 done
32
33 named wrapper.QueueData
34 archived wrapper.QueueData
35 }
36
37 ////////////////////////////// New(taskQueueData) //////////////////////////////
38
39 func newTaskQueueData() memContextObj {
40 return &taskQueueData{
41 BrokenFeatures: &wrapper.BrokenFeatures{
42 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)},
43 named: wrapper.QueueData{"default": {}},
44 archived: wrapper.QueueData{"default": {}},
45 }
46 }
47
48 ///////////////////////// memContextObj(taskQueueData) /////////////////////////
49
50 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
51 func (t *taskQueueData) endTxn() {}
52 func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) {
53 txn := obj.(*txnTaskQueueData)
54 for qn, tasks := range txn.anony {
55 for _, tsk := range tasks {
56 tsk.Name = mkName(rnd, tsk.Name, t.named[qn])
57 t.named[qn][tsk.Name] = tsk
58 }
59 }
60 txn.anony = nil
61 }
62 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) {
63 return &txnTaskQueueData{
64 BrokenFeatures: t.BrokenFeatures,
65 parent: t,
66 anony: wrapper.AnonymousQueueData{},
67 }, nil
68 }
69
70 ////////////////////// wrapper.TQTestable(taskQueueData) ///////////////////////
71
72 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
73 return nil
74 }
75
76 func (t *taskQueueData) CreateQueue(queueName string) {
77 t.Lock()
78 defer t.Unlock()
79
80 if _, ok := t.named[queueName]; ok {
81 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
82 }
83 t.named[queueName] = map[string]*taskqueue.Task{}
84 t.archived[queueName] = map[string]*taskqueue.Task{}
85 }
86
87 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
88 t.Lock()
89 defer t.Unlock()
90
91 return dupQueue(t.named)
92 }
93
94 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
95 t.Lock()
96 defer t.Unlock()
97
98 return dupQueue(t.archived)
99 }
100
101 func (t *taskQueueData) resetTasksWithLock() {
102 for queuename := range t.named {
103 t.named[queuename] = map[string]*taskqueue.Task{}
104 t.archived[queuename] = map[string]*taskqueue.Task{}
105 }
106 }
107
108 func (t *taskQueueData) ResetTasks() {
109 t.Lock()
110 defer t.Unlock()
111
112 t.resetTasksWithLock()
113 }
114
115 /////////////////////////// taskQueueData (private) ////////////////////////////
116
117 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
118 if queueName == "" {
119 queueName = "default"
120 }
121 if _, ok := t.named[queueName]; !ok {
122 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
123 }
124 return queueName, nil
125 }
126
127 func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri ng, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) {
128 queueName, err := t.getQueueName(queueName)
129 if err != nil {
130 return nil, "", err
131 }
132
133 toSched := dupTask(task)
134
135 if toSched.Path == "" {
136 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
137 }
138
139 if toSched.ETA.IsZero() {
140 toSched.ETA = now.Add(toSched.Delay)
141 } else if toSched.Delay != 0 {
142 panic("taskqueue: both Delay and ETA are set")
143 }
144 toSched.Delay = 0
145
146 if toSched.Method == "" {
147 toSched.Method = "POST"
148 }
149 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
150 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
151 }
152 if toSched.Method != "POST" && toSched.Method != "PUT" {
153 toSched.Payload = nil
154 }
155
156 if _, ok := toSched.Header[currentNamespace]; !ok {
157 if ns != "" {
158 if toSched.Header == nil {
159 toSched.Header = http.Header{}
160 }
161 toSched.Header[currentNamespace] = []string{ns}
162 }
163 }
164 // TODO(riannucci): implement DefaultNamespace
165
166 if toSched.Name == "" {
167 toSched.Name = mkName(rnd, "", t.named[queueName])
168 } else {
169 if !validTaskName.MatchString(toSched.Name) {
170 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME)
171 }
172 }
173
174 return toSched, queueName, nil
175 }
176
177 /////////////////////////////// txnTaskQueueData ///////////////////////////////
178
179 type txnTaskQueueData struct {
180 *wrapper.BrokenFeatures
181
182 lock sync.Mutex
183
184 // boolean 0 or 1, use atomic.*Int32 to access.
185 closed int32
186 anony wrapper.AnonymousQueueData
187 parent *taskQueueData
188 }
189
190 /////////////////////// memContextObj(txnTaskQueueData) ////////////////////////
191
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
193
194 func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) {
195 panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
196 }
197
198 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
199 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
200 }
201
202 func (t *txnTaskQueueData) endTxn() {
203 if atomic.LoadInt32(&t.closed) == 1 {
204 panic("cannot end transaction twice")
205 }
206 atomic.StoreInt32(&t.closed, 1)
207 }
208
209 /////////////////// wrapper.BrokenFeatures(txnTaskQueueData) ///////////////////
210
211 func (t *txnTaskQueueData) IsBroken() error {
212 // Slightly different from the SDK... datastore and taskqueue each imple ment
213 // this here, where in the SDK only datastore.transaction.Call does.
214 if atomic.LoadInt32(&t.closed) == 1 {
215 return fmt.Errorf("taskqueue: transaction context has expired")
216 }
217 return t.parent.IsBroken()
218 }
219
220 ///////////////////// wrapper.TQTestable(txnTaskQueueData) /////////////////////
221
222 func (t *txnTaskQueueData) ResetTasks() {
223 t.Lock()
224 defer t.Unlock()
225
226 for queuename := range t.anony {
227 t.anony[queuename] = nil
228 }
229 t.parent.resetTasksWithLock()
230 }
231
232 func (t *txnTaskQueueData) Lock() {
233 t.lock.Lock()
234 t.parent.Lock()
235 }
236 func (t *txnTaskQueueData) Unlock() {
237 t.parent.Unlock()
238 t.lock.Unlock()
239 }
240
241 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
242 t.Lock()
243 defer t.Unlock()
244
245 ret := make(wrapper.AnonymousQueueData, len(t.anony))
246 for k, vs := range t.anony {
247 ret[k] = make([]*taskqueue.Task, len(vs))
248 for i, v := range vs {
249 tsk := dupTask(v)
250 tsk.Name = ""
251 ret[k][i] = tsk
252 }
253 }
254
255 return ret
256 }
257
258 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
259 return t.parent.GetTombstonedTasks()
260 }
261
262 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
263 return t.parent.GetScheduledTasks()
264 }
265
266 func (t *txnTaskQueueData) CreateQueue(queueName string) {
267 t.parent.CreateQueue(queueName)
268 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698