Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(172)

Side by Side Diff: memory/taskqueue_data.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « memory/taskqueue.go ('k') | memory/taskqueue_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "fmt"
10 "net/http"
11 "sync"
12 "sync/atomic"
13
14 "golang.org/x/net/context"
15
16 "github.com/luci/gae"
17 "github.com/luci/luci-go/common/clock"
18 )
19
20 var (
21 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
22 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
23 )
24
25 //////////////////////////////// taskQueueData /////////////////////////////////
26
27 type taskQueueData struct {
28 sync.Mutex
29
30 named gae.QueueData
31 archived gae.QueueData
32 }
33
34 var (
35 _ = memContextObj((*taskQueueData)(nil))
36 _ = gae.TQTestable((*taskQueueData)(nil))
37 )
38
39 func newTaskQueueData() memContextObj {
40 return &taskQueueData{
41 named: gae.QueueData{"default": {}},
42 archived: gae.QueueData{"default": {}},
43 }
44 }
45
46 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
47 func (t *taskQueueData) endTxn() {}
48 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
49 txn := obj.(*txnTaskQueueData)
50 for qn, tasks := range txn.anony {
51 for _, tsk := range tasks {
52 tsk.Name = mkName(c, tsk.Name, t.named[qn])
53 t.named[qn][tsk.Name] = tsk
54 }
55 }
56 txn.anony = nil
57 }
58 func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj {
59 return &txnTaskQueueData{
60 parent: t,
61 anony: gae.AnonymousQueueData{},
62 }
63 }
64
65 func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
66 return nil
67 }
68
69 func (t *taskQueueData) CreateQueue(queueName string) {
70 t.Lock()
71 defer t.Unlock()
72
73 if _, ok := t.named[queueName]; ok {
74 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
75 }
76 t.named[queueName] = map[string]*gae.TQTask{}
77 t.archived[queueName] = map[string]*gae.TQTask{}
78 }
79
80 func (t *taskQueueData) GetScheduledTasks() gae.QueueData {
81 t.Lock()
82 defer t.Unlock()
83
84 return dupQueue(t.named)
85 }
86
87 func (t *taskQueueData) GetTombstonedTasks() gae.QueueData {
88 t.Lock()
89 defer t.Unlock()
90
91 return dupQueue(t.archived)
92 }
93
94 func (t *taskQueueData) resetTasksWithLock() {
95 for queueName := range t.named {
96 t.named[queueName] = map[string]*gae.TQTask{}
97 t.archived[queueName] = map[string]*gae.TQTask{}
98 }
99 }
100
101 func (t *taskQueueData) ResetTasks() {
102 t.Lock()
103 defer t.Unlock()
104
105 t.resetTasksWithLock()
106 }
107
108 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
109 if queueName == "" {
110 queueName = "default"
111 }
112 if _, ok := t.named[queueName]; !ok {
113 return "", errors.New("UNKNOWN_QUEUE")
114 }
115 return queueName, nil
116 }
117
118 var tqOkMethods = map[string]struct{}{
119 "GET": {},
120 "POST": {},
121 "HEAD": {},
122 "PUT": {},
123 "DELETE": {},
124 }
125
126 func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) {
127 queueName, err := t.getQueueName(queueName)
128 if err != nil {
129 return nil, "", err
130 }
131
132 toSched := dupTask(task)
133
134 if toSched.Path == "" {
135 toSched.Path = "/_ah/queue/" + queueName
136 }
137
138 if toSched.ETA.IsZero() {
139 toSched.ETA = clock.Now(c).Add(toSched.Delay)
140 } else if toSched.Delay != 0 {
141 panic("taskqueue: both Delay and ETA are set")
142 }
143 toSched.Delay = 0
144
145 if toSched.Method == "" {
146 toSched.Method = "POST"
147 }
148 if _, ok := tqOkMethods[toSched.Method]; !ok {
149 return nil, "", fmt.Errorf("taskqueue: bad method %q", toSched.M ethod)
150 }
151 if toSched.Method != "POST" && toSched.Method != "PUT" {
152 toSched.Payload = nil
153 }
154
155 if _, ok := toSched.Header[currentNamespace]; !ok {
156 if ns != "" {
157 if toSched.Header == nil {
158 toSched.Header = http.Header{}
159 }
160 toSched.Header[currentNamespace] = []string{ns}
161 }
162 }
163 // TODO(riannucci): implement DefaultNamespace
164
165 if toSched.Name == "" {
166 toSched.Name = mkName(c, "", t.named[queueName])
167 } else {
168 if !validTaskName.MatchString(toSched.Name) {
169 return nil, "", errors.New("INVALID_TASK_NAME")
170 }
171 }
172
173 return toSched, queueName, nil
174 }
175
176 /////////////////////////////// txnTaskQueueData ///////////////////////////////
177
178 type txnTaskQueueData struct {
179 lock sync.Mutex
180
181 // boolean 0 or 1, use atomic.*Int32 to access.
182 closed int32
183 anony gae.AnonymousQueueData
184 parent *taskQueueData
185 }
186
187 var (
188 _ = memContextObj((*txnTaskQueueData)(nil))
189 _ = gae.TQTestable((*txnTaskQueueData)(nil))
190 )
191
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { retu rn false }
193 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { pani c("impossible") }
194 func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { pani c("impossible") }
195
196 func (t *txnTaskQueueData) endTxn() {
197 if atomic.LoadInt32(&t.closed) == 1 {
198 panic("cannot end transaction twice")
199 }
200 atomic.StoreInt32(&t.closed, 1)
201 }
202
203 func (t *txnTaskQueueData) run(f func() error) error {
204 // Slightly different from the SDK... datastore and taskqueue each imple ment
205 // this here, where in the SDK only datastore.transaction.Call does.
206 if atomic.LoadInt32(&t.closed) == 1 {
207 return fmt.Errorf("taskqueue: transaction context has expired")
208 }
209 return f()
210 }
211
212 func (t *txnTaskQueueData) ResetTasks() {
213 t.Lock()
214 defer t.Unlock()
215
216 for queuename := range t.anony {
217 t.anony[queuename] = nil
218 }
219 t.parent.resetTasksWithLock()
220 }
221
222 func (t *txnTaskQueueData) Lock() {
223 t.lock.Lock()
224 t.parent.Lock()
225 }
226 func (t *txnTaskQueueData) Unlock() {
227 t.parent.Unlock()
228 t.lock.Unlock()
229 }
230
231 func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData {
232 t.Lock()
233 defer t.Unlock()
234
235 ret := make(gae.AnonymousQueueData, len(t.anony))
236 for k, vs := range t.anony {
237 ret[k] = make([]*gae.TQTask, len(vs))
238 for i, v := range vs {
239 tsk := dupTask(v)
240 tsk.Name = ""
241 ret[k][i] = tsk
242 }
243 }
244
245 return ret
246 }
247
248 func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData {
249 return t.parent.GetTombstonedTasks()
250 }
251
252 func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData {
253 return t.parent.GetScheduledTasks()
254 }
255
256 func (t *txnTaskQueueData) CreateQueue(queueName string) {
257 t.parent.CreateQueue(queueName)
258 }
OLDNEW
« no previous file with comments | « memory/taskqueue.go ('k') | memory/taskqueue_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698