Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(254)

Side by Side Diff: impl/memory/taskqueue_data.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/taskqueue.go ('k') | impl/memory/taskqueue_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
11 "sync" 11 "sync"
12 "sync/atomic" 12 "sync/atomic"
13 13
14 "golang.org/x/net/context" 14 "golang.org/x/net/context"
15 15
16 » "github.com/luci/gae" 16 » rds "github.com/luci/gae/service/rawdatastore"
17 » tq "github.com/luci/gae/service/taskqueue"
17 "github.com/luci/luci-go/common/clock" 18 "github.com/luci/luci-go/common/clock"
18 ) 19 )
19 20
20 var ( 21 var (
21 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e") 22 currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespac e")
22 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e") 23 defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespac e")
23 ) 24 )
24 25
25 //////////////////////////////// taskQueueData ///////////////////////////////// 26 //////////////////////////////// taskQueueData /////////////////////////////////
26 27
27 type taskQueueData struct { 28 type taskQueueData struct {
28 sync.Mutex 29 sync.Mutex
29 30
30 » named gae.QueueData 31 » named tq.QueueData
31 » archived gae.QueueData 32 » archived tq.QueueData
32 } 33 }
33 34
34 var ( 35 var (
35 _ = memContextObj((*taskQueueData)(nil)) 36 _ = memContextObj((*taskQueueData)(nil))
36 » _ = gae.TQTestable((*taskQueueData)(nil)) 37 » _ = tq.Testable((*taskQueueData)(nil))
37 ) 38 )
38 39
39 func newTaskQueueData() memContextObj { 40 func newTaskQueueData() memContextObj {
40 return &taskQueueData{ 41 return &taskQueueData{
41 » » named: gae.QueueData{"default": {}}, 42 » » named: tq.QueueData{"default": {}},
42 » » archived: gae.QueueData{"default": {}}, 43 » » archived: tq.QueueData{"default": {}},
43 } 44 }
44 } 45 }
45 46
46 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true } 47 func (t *taskQueueData) canApplyTxn(obj memContextObj) bool { return true }
47 func (t *taskQueueData) endTxn() {} 48 func (t *taskQueueData) endTxn() {}
48 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) { 49 func (t *taskQueueData) applyTxn(c context.Context, obj memContextObj) {
49 txn := obj.(*txnTaskQueueData) 50 txn := obj.(*txnTaskQueueData)
50 for qn, tasks := range txn.anony { 51 for qn, tasks := range txn.anony {
51 for _, tsk := range tasks { 52 for _, tsk := range tasks {
52 tsk.Name = mkName(c, tsk.Name, t.named[qn]) 53 tsk.Name = mkName(c, tsk.Name, t.named[qn])
53 t.named[qn][tsk.Name] = tsk 54 t.named[qn][tsk.Name] = tsk
54 } 55 }
55 } 56 }
56 txn.anony = nil 57 txn.anony = nil
57 } 58 }
58 func (t *taskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { 59 func (t *taskQueueData) mkTxn(*rds.TransactionOptions) memContextObj {
59 return &txnTaskQueueData{ 60 return &txnTaskQueueData{
60 parent: t, 61 parent: t,
61 » » anony: gae.AnonymousQueueData{}, 62 » » anony: tq.AnonymousQueueData{},
62 } 63 }
63 } 64 }
64 65
65 func (t *taskQueueData) GetTransactionTasks() gae.AnonymousQueueData { 66 func (t *taskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
66 return nil 67 return nil
67 } 68 }
68 69
69 func (t *taskQueueData) CreateQueue(queueName string) { 70 func (t *taskQueueData) CreateQueue(queueName string) {
70 t.Lock() 71 t.Lock()
71 defer t.Unlock() 72 defer t.Unlock()
72 73
73 if _, ok := t.named[queueName]; ok { 74 if _, ok := t.named[queueName]; ok {
74 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName)) 75 panic(fmt.Errorf("memory/taskqueue: cannot add the same queue tw ice! %q", queueName))
75 } 76 }
76 » t.named[queueName] = map[string]*gae.TQTask{} 77 » t.named[queueName] = map[string]*tq.Task{}
77 » t.archived[queueName] = map[string]*gae.TQTask{} 78 » t.archived[queueName] = map[string]*tq.Task{}
78 } 79 }
79 80
80 func (t *taskQueueData) GetScheduledTasks() gae.QueueData { 81 func (t *taskQueueData) GetScheduledTasks() tq.QueueData {
81 t.Lock() 82 t.Lock()
82 defer t.Unlock() 83 defer t.Unlock()
83 84
84 return dupQueue(t.named) 85 return dupQueue(t.named)
85 } 86 }
86 87
87 func (t *taskQueueData) GetTombstonedTasks() gae.QueueData { 88 func (t *taskQueueData) GetTombstonedTasks() tq.QueueData {
88 t.Lock() 89 t.Lock()
89 defer t.Unlock() 90 defer t.Unlock()
90 91
91 return dupQueue(t.archived) 92 return dupQueue(t.archived)
92 } 93 }
93 94
94 func (t *taskQueueData) resetTasksWithLock() { 95 func (t *taskQueueData) resetTasksWithLock() {
95 for queueName := range t.named { 96 for queueName := range t.named {
96 » » t.named[queueName] = map[string]*gae.TQTask{} 97 » » t.named[queueName] = map[string]*tq.Task{}
97 » » t.archived[queueName] = map[string]*gae.TQTask{} 98 » » t.archived[queueName] = map[string]*tq.Task{}
98 } 99 }
99 } 100 }
100 101
101 func (t *taskQueueData) ResetTasks() { 102 func (t *taskQueueData) ResetTasks() {
102 t.Lock() 103 t.Lock()
103 defer t.Unlock() 104 defer t.Unlock()
104 105
105 t.resetTasksWithLock() 106 t.resetTasksWithLock()
106 } 107 }
107 108
108 func (t *taskQueueData) getQueueName(queueName string) (string, error) { 109 func (t *taskQueueData) getQueueName(queueName string) (string, error) {
109 if queueName == "" { 110 if queueName == "" {
110 queueName = "default" 111 queueName = "default"
111 } 112 }
112 if _, ok := t.named[queueName]; !ok { 113 if _, ok := t.named[queueName]; !ok {
113 return "", errors.New("UNKNOWN_QUEUE") 114 return "", errors.New("UNKNOWN_QUEUE")
114 } 115 }
115 return queueName, nil 116 return queueName, nil
116 } 117 }
117 118
118 var tqOkMethods = map[string]struct{}{ 119 var tqOkMethods = map[string]struct{}{
119 "GET": {}, 120 "GET": {},
120 "POST": {}, 121 "POST": {},
121 "HEAD": {}, 122 "HEAD": {},
122 "PUT": {}, 123 "PUT": {},
123 "DELETE": {}, 124 "DELETE": {},
124 } 125 }
125 126
126 func (t *taskQueueData) prepTask(c context.Context, ns string, task *gae.TQTask, queueName string) (*gae.TQTask, string, error) { 127 func (t *taskQueueData) prepTask(c context.Context, ns string, task *tq.Task, qu eueName string) (*tq.Task, string, error) {
127 queueName, err := t.getQueueName(queueName) 128 queueName, err := t.getQueueName(queueName)
128 if err != nil { 129 if err != nil {
129 return nil, "", err 130 return nil, "", err
130 } 131 }
131 132
132 toSched := dupTask(task) 133 toSched := dupTask(task)
133 134
134 if toSched.Path == "" { 135 if toSched.Path == "" {
135 toSched.Path = "/_ah/queue/" + queueName 136 toSched.Path = "/_ah/queue/" + queueName
136 } 137 }
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
173 return toSched, queueName, nil 174 return toSched, queueName, nil
174 } 175 }
175 176
176 /////////////////////////////// txnTaskQueueData /////////////////////////////// 177 /////////////////////////////// txnTaskQueueData ///////////////////////////////
177 178
178 type txnTaskQueueData struct { 179 type txnTaskQueueData struct {
179 lock sync.Mutex 180 lock sync.Mutex
180 181
181 // boolean 0 or 1, use atomic.*Int32 to access. 182 // boolean 0 or 1, use atomic.*Int32 to access.
182 closed int32 183 closed int32
183 » anony gae.AnonymousQueueData 184 » anony tq.AnonymousQueueData
184 parent *taskQueueData 185 parent *taskQueueData
185 } 186 }
186 187
187 var ( 188 var (
188 _ = memContextObj((*txnTaskQueueData)(nil)) 189 _ = memContextObj((*txnTaskQueueData)(nil))
189 » _ = gae.TQTestable((*txnTaskQueueData)(nil)) 190 » _ = tq.Testable((*txnTaskQueueData)(nil))
190 ) 191 )
191 192
192 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { retu rn false } 193 func (t *txnTaskQueueData) canApplyTxn(obj memContextObj) bool { return false }
193 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { pani c("impossible") } 194 func (t *txnTaskQueueData) applyTxn(context.Context, memContextObj) { panic( "impossible") }
194 func (t *txnTaskQueueData) mkTxn(*gae.DSTransactionOptions) memContextObj { pani c("impossible") } 195 func (t *txnTaskQueueData) mkTxn(*rds.TransactionOptions) memContextObj { panic( "impossible") }
195 196
196 func (t *txnTaskQueueData) endTxn() { 197 func (t *txnTaskQueueData) endTxn() {
197 if atomic.LoadInt32(&t.closed) == 1 { 198 if atomic.LoadInt32(&t.closed) == 1 {
198 panic("cannot end transaction twice") 199 panic("cannot end transaction twice")
199 } 200 }
200 atomic.StoreInt32(&t.closed, 1) 201 atomic.StoreInt32(&t.closed, 1)
201 } 202 }
202 203
203 func (t *txnTaskQueueData) run(f func() error) error { 204 func (t *txnTaskQueueData) run(f func() error) error {
204 // Slightly different from the SDK... datastore and taskqueue each imple ment 205 // Slightly different from the SDK... datastore and taskqueue each imple ment
(...skipping 16 matching lines...) Expand all
221 222
222 func (t *txnTaskQueueData) Lock() { 223 func (t *txnTaskQueueData) Lock() {
223 t.lock.Lock() 224 t.lock.Lock()
224 t.parent.Lock() 225 t.parent.Lock()
225 } 226 }
226 func (t *txnTaskQueueData) Unlock() { 227 func (t *txnTaskQueueData) Unlock() {
227 t.parent.Unlock() 228 t.parent.Unlock()
228 t.lock.Unlock() 229 t.lock.Unlock()
229 } 230 }
230 231
231 func (t *txnTaskQueueData) GetTransactionTasks() gae.AnonymousQueueData { 232 func (t *txnTaskQueueData) GetTransactionTasks() tq.AnonymousQueueData {
232 t.Lock() 233 t.Lock()
233 defer t.Unlock() 234 defer t.Unlock()
234 235
235 » ret := make(gae.AnonymousQueueData, len(t.anony)) 236 » ret := make(tq.AnonymousQueueData, len(t.anony))
236 for k, vs := range t.anony { 237 for k, vs := range t.anony {
237 » » ret[k] = make([]*gae.TQTask, len(vs)) 238 » » ret[k] = make([]*tq.Task, len(vs))
238 for i, v := range vs { 239 for i, v := range vs {
239 tsk := dupTask(v) 240 tsk := dupTask(v)
240 tsk.Name = "" 241 tsk.Name = ""
241 ret[k][i] = tsk 242 ret[k][i] = tsk
242 } 243 }
243 } 244 }
244 245
245 return ret 246 return ret
246 } 247 }
247 248
248 func (t *txnTaskQueueData) GetTombstonedTasks() gae.QueueData { 249 func (t *txnTaskQueueData) GetTombstonedTasks() tq.QueueData {
249 return t.parent.GetTombstonedTasks() 250 return t.parent.GetTombstonedTasks()
250 } 251 }
251 252
252 func (t *txnTaskQueueData) GetScheduledTasks() gae.QueueData { 253 func (t *txnTaskQueueData) GetScheduledTasks() tq.QueueData {
253 return t.parent.GetScheduledTasks() 254 return t.parent.GetScheduledTasks()
254 } 255 }
255 256
256 func (t *txnTaskQueueData) CreateQueue(queueName string) { 257 func (t *txnTaskQueueData) CreateQueue(queueName string) {
257 t.parent.CreateQueue(queueName) 258 t.parent.CreateQueue(queueName)
258 } 259 }
OLDNEW
« no previous file with comments | « impl/memory/taskqueue.go ('k') | impl/memory/taskqueue_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698