Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(120)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/taskqueue_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: add go-slab dependency Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "fmt"
10 "infra/gae/libs/wrapper"
11 "math/rand"
12 "net/http"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "appengine/datastore"
18 "appengine/taskqueue"
19 pb "appengine_internal/taskqueue"
20 )
21
22 var (
23 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
24 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
25 )
26
27 //////////////////////////////// taskQueueData /////////////////////////////////
28
29 type taskQueueData struct {
30 sync.Mutex
31 wrapper.BrokenFeatures
32
33 named wrapper.QueueData
34 archived wrapper.QueueData
35 }
36
37 var (
38 _ = memContextObj((*taskQueueData)(nil))
39 _ = wrapper.TQTestable((*taskQueueData)(nil))
40 )
41
42 func newTaskQueueData() memContextObj {
43 return &taskQueueData{
44 BrokenFeatures: wrapper.BrokenFeatures{
45 DefaultError: newTQError(pb.TaskQueueServiceError_TRANSI ENT_ERROR)},
46 named: wrapper.QueueData{"default": {}},
47 archived: wrapper.QueueData{"default": {}},
48 }
49 }
50
51 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
52 func (t *taskQueueData) endTxn() {}
53 func (t *taskQueueData) applyTxn(rnd *rand.Rand, obj memContextObj) {
54 txn := obj.(*txnTaskQueueData)
55 for qn, tasks := range txn.anony {
56 for _, tsk := range tasks {
57 tsk.Name = mkName(rnd, tsk.Name, t.named[qn])
58 t.named[qn][tsk.Name] = tsk
59 }
60 }
61 txn.anony = nil
62 }
63 func (t *taskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, err or) {
64 return &txnTaskQueueData{
65 BrokenFeatures: &t.BrokenFeatures,
66 parent: t,
67 anony: wrapper.AnonymousQueueData{},
68 }, nil
69 }
70
71 func (t *taskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
72 return nil
73 }
74
75 func (t *taskQueueData) CreateQueue(queueName string) {
76 t.Lock()
77 defer t.Unlock()
78
79 if _, ok := t.named[queueName]; ok {
80 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
81 }
82 t.named[queueName] = map[string]*taskqueue.Task{}
83 t.archived[queueName] = map[string]*taskqueue.Task{}
84 }
85
86 func (t *taskQueueData) GetScheduledTasks() wrapper.QueueData {
87 t.Lock()
88 defer t.Unlock()
89
90 return dupQueue(t.named)
91 }
92
93 func (t *taskQueueData) GetTombstonedTasks() wrapper.QueueData {
94 t.Lock()
95 defer t.Unlock()
96
97 return dupQueue(t.archived)
98 }
99
100 func (t *taskQueueData) resetTasksWithLock() {
101 for queuename := range t.named {
102 t.named[queuename] = map[string]*taskqueue.Task{}
103 t.archived[queuename] = map[string]*taskqueue.Task{}
104 }
105 }
106
107 func (t *taskQueueData) ResetTasks() {
108 t.Lock()
109 defer t.Unlock()
110
111 t.resetTasksWithLock()
112 }
113
114 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
115 if queueName == "" {
116 queueName = "default"
117 }
118 if _, ok := t.named[queueName]; !ok {
119 return "", newTQError(pb.TaskQueueServiceError_UNKNOWN_QUEUE)
120 }
121 return queueName, nil
122 }
123
124 func (t *taskQueueData) prepTask(ns string, task *taskqueue.Task, queueName stri ng, now time.Time, rnd *rand.Rand) (*taskqueue.Task, string, error) {
125 queueName, err := t.getQueueName(queueName)
126 if err != nil {
127 return nil, "", err
128 }
129
130 toSched := dupTask(task)
131
132 if toSched.Path == "" {
133 return nil, "", newTQError(pb.TaskQueueServiceError_INVALID_URL)
134 }
135
136 if toSched.ETA.IsZero() {
137 toSched.ETA = now.Add(toSched.Delay)
138 } else if toSched.Delay != 0 {
139 panic("taskqueue: both Delay and ETA are set")
140 }
141 toSched.Delay = 0
142
143 if toSched.Method == "" {
144 toSched.Method = "POST"
145 }
146 if _, ok := pb.TaskQueueAddRequest_RequestMethod_value[toSched.Method]; !ok {
147 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
148 }
149 if toSched.Method != "POST" && toSched.Method != "PUT" {
150 toSched.Payload = nil
151 }
152
153 if _, ok := toSched.Header[currentNamespace]; !ok {
154 if ns != "" {
155 if toSched.Header == nil {
156 toSched.Header = http.Header{}
157 }
158 toSched.Header[currentNamespace] = []string{ns}
159 }
160 }
161 // TODO(riannucci): implement DefaultNamespace
162
163 if toSched.Name == "" {
164 toSched.Name = mkName(rnd, "", t.named[queueName])
165 } else {
166 if !validTaskName.MatchString(toSched.Name) {
167 return nil, "", newTQError(pb.TaskQueueServiceError_INVA LID_TASK_NAME)
168 }
169 }
170
171 return toSched, queueName, nil
172 }
173
174 /////////////////////////////// txnTaskQueueData ///////////////////////////////
175
176 type txnTaskQueueData struct {
177 *wrapper.BrokenFeatures
178
179 lock sync.Mutex
180
181 // boolean 0 or 1, use atomic.*Int32 to access.
182 closed int32
183 anony wrapper.AnonymousQueueData
184 parent *taskQueueData
185 }
186
187 var (
188 _ = memContextObj((*txnTaskQueueData)(nil))
189 _ = wrapper.TQTestable((*txnTaskQueueData)(nil))
190 )
191
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
193
194 func (t *txnTaskQueueData) applyTxn(*rand.Rand, memContextObj) {
195 panic(errors.New("txnTaskQueueData.applyTxn is not implemented"))
196 }
197
198 func (t *txnTaskQueueData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) {
199 return nil, errors.New("txnTaskQueueData.mkTxn is not implemented")
200 }
201
202 func (t *txnTaskQueueData) endTxn() {
203 if atomic.LoadInt32(&t.closed) == 1 {
204 panic("cannot end transaction twice")
205 }
206 atomic.StoreInt32(&t.closed, 1)
207 }
208
209 func (t *txnTaskQueueData) IsBroken() error {
210 // Slightly different from the SDK... datastore and taskqueue each imple ment
211 // this here, where in the SDK only datastore.transaction.Call does.
212 if atomic.LoadInt32(&t.closed) == 1 {
213 return fmt.Errorf("taskqueue: transaction context has expired")
214 }
215 return t.parent.IsBroken()
216 }
217
218 func (t *txnTaskQueueData) ResetTasks() {
219 t.Lock()
220 defer t.Unlock()
221
222 for queuename := range t.anony {
223 t.anony[queuename] = nil
224 }
225 t.parent.resetTasksWithLock()
226 }
227
228 func (t *txnTaskQueueData) Lock() {
229 t.lock.Lock()
230 t.parent.Lock()
231 }
232 func (t *txnTaskQueueData) Unlock() {
233 t.parent.Unlock()
234 t.lock.Unlock()
235 }
236
237 func (t *txnTaskQueueData) GetTransactionTasks() wrapper.AnonymousQueueData {
238 t.Lock()
239 defer t.Unlock()
240
241 ret := make(wrapper.AnonymousQueueData, len(t.anony))
242 for k, vs := range t.anony {
243 ret[k] = make([]*taskqueue.Task, len(vs))
244 for i, v := range vs {
245 tsk := dupTask(v)
246 tsk.Name = ""
247 ret[k][i] = tsk
248 }
249 }
250
251 return ret
252 }
253
254 func (t *txnTaskQueueData) GetTombstonedTasks() wrapper.QueueData {
255 return t.parent.GetTombstonedTasks()
256 }
257
258 func (t *txnTaskQueueData) GetScheduledTasks() wrapper.QueueData {
259 return t.parent.GetScheduledTasks()
260 }
261
262 func (t *txnTaskQueueData) CreateQueue(queueName string) {
263 t.parent.CreateQueue(queueName)
264 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/taskqueue.go ('k') | go/src/infra/gae/libs/wrapper/memory/taskqueue_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698