Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "infra/gae/libs/wrapper" | 9 "infra/gae/libs/wrapper" |
| 10 "infra/libs/clock" | |
| 10 "math/rand" | 11 "math/rand" |
| 11 "net/http" | 12 "net/http" |
| 12 "regexp" | 13 "regexp" |
| 13 "time" | |
| 14 | 14 |
| 15 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 16 | 16 |
| 17 "appengine" | 17 "appengine" |
| 18 "appengine/taskqueue" | 18 "appengine/taskqueue" |
| 19 "appengine_internal" | 19 "appengine_internal" |
| 20 dbpb "appengine_internal/datastore" | 20 dbpb "appengine_internal/datastore" |
| 21 pb "appengine_internal/taskqueue" | 21 pb "appengine_internal/taskqueue" |
| 22 ) | 22 ) |
| 23 | 23 |
| 24 /////////////////////////////// public functions /////////////////////////////// | 24 /////////////////////////////// public functions /////////////////////////////// |
| 25 | 25 |
| 26 func useTQ(c context.Context) context.Context { | 26 func useTQ(c context.Context) context.Context { |
| 27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { | 27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { |
| 28 tqd := cur(ic).Get(memContextTQIdx) | 28 tqd := cur(ic).Get(memContextTQIdx) |
| 29 var ret interface { | 29 var ret interface { |
| 30 wrapper.TQTestable | 30 wrapper.TQTestable |
| 31 wrapper.TaskQueue | 31 wrapper.TaskQueue |
| 32 } | 32 } |
| 33 switch x := tqd.(type) { | 33 switch x := tqd.(type) { |
| 34 case *taskQueueData: | 34 case *taskQueueData: |
| 35 ret = &taskqueueImpl{ | 35 ret = &taskqueueImpl{ |
| 36 wrapper.DummyTQ(), | 36 wrapper.DummyTQ(), |
| 37 x, | 37 x, |
| 38 ic, | |
| 38 curGID(ic).namespace, | 39 curGID(ic).namespace, |
| 39 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
| 40 wrapper.GetMathRand(ic), | 40 wrapper.GetMathRand(ic), |
| 41 } | 41 } |
| 42 | 42 |
| 43 case *txnTaskQueueData: | 43 case *txnTaskQueueData: |
| 44 ret = &taskqueueTxnImpl{ | 44 ret = &taskqueueTxnImpl{ |
| 45 wrapper.DummyTQ(), | 45 wrapper.DummyTQ(), |
| 46 x, | 46 x, |
| 47 ic, | |
| 47 curGID(ic).namespace, | 48 curGID(ic).namespace, |
| 48 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
| 49 wrapper.GetMathRand(ic), | 49 wrapper.GetMathRand(ic), |
| 50 } | 50 } |
| 51 | 51 |
| 52 default: | 52 default: |
| 53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | 53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
| 54 } | 54 } |
| 55 return ret | 55 return ret |
| 56 }) | 56 }) |
| 57 } | 57 } |
| 58 | 58 |
| 59 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 59 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 60 | 60 |
| 61 type taskqueueImpl struct { | 61 type taskqueueImpl struct { |
| 62 wrapper.TaskQueue | 62 wrapper.TaskQueue |
| 63 *taskQueueData | 63 *taskQueueData |
| 64 | 64 |
| 65 ctx context.Context | |
| 65 ns string | 66 ns string |
| 66 timeNow func() time.Time | |
| 67 mathRand *rand.Rand | 67 mathRand *rand.Rand |
|
iannucci
2015/06/03 17:37:19
could we pull out mathRand the same way too? (e.g.
dnj
2015/06/03 18:21:26
Done.
| |
| 68 } | 68 } |
| 69 | 69 |
| 70 var ( | 70 var ( |
| 71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) | 71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
| 72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) | 72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
| 73 ) | 73 ) |
| 74 | 74 |
| 75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { | 75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { |
| 76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) | 76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, clock.Now(t .ctx), t.mathRand) |
| 77 if err != nil { | 77 if err != nil { |
| 78 return nil, err | 78 return nil, err |
| 79 } | 79 } |
| 80 | 80 |
| 81 if _, ok := t.archived[queueName][toSched.Name]; ok { | 81 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 82 // SDK converts TOMBSTONE -> already added too | 82 // SDK converts TOMBSTONE -> already added too |
| 83 return nil, taskqueue.ErrTaskAlreadyAdded | 83 return nil, taskqueue.ErrTaskAlreadyAdded |
| 84 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 84 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 85 return nil, taskqueue.ErrTaskAlreadyAdded | 85 return nil, taskqueue.ErrTaskAlreadyAdded |
| 86 } else { | 86 } else { |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 157 }) | 157 }) |
| 158 return err | 158 return err |
| 159 } | 159 } |
| 160 | 160 |
| 161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 162 | 162 |
| 163 type taskqueueTxnImpl struct { | 163 type taskqueueTxnImpl struct { |
| 164 wrapper.TaskQueue | 164 wrapper.TaskQueue |
| 165 *txnTaskQueueData | 165 *txnTaskQueueData |
| 166 | 166 |
| 167 ctx context.Context | |
| 167 ns string | 168 ns string |
| 168 timeNow func() time.Time | |
| 169 mathRand *rand.Rand | 169 mathRand *rand.Rand |
| 170 } | 170 } |
| 171 | 171 |
| 172 var ( | 172 var ( |
| 173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) | 173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
| 174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) | 174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
| 175 ) | 175 ) |
| 176 | 176 |
| 177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { | 177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { |
| 178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) | 178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, cloc k.Now(t.ctx), t.mathRand) |
| 179 if err != nil { | 179 if err != nil { |
| 180 return nil, err | 180 return nil, err |
| 181 } | 181 } |
| 182 | 182 |
| 183 numTasks := 0 | 183 numTasks := 0 |
| 184 for _, vs := range t.anony { | 184 for _, vs := range t.anony { |
| 185 numTasks += len(vs) | 185 numTasks += len(vs) |
| 186 } | 186 } |
| 187 if numTasks+1 > 5 { | 187 if numTasks+1 > 5 { |
| 188 // transactional tasks are actually implemented 'for real' as Ac tions which | 188 // transactional tasks are actually implemented 'for real' as Ac tions which |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | 298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
| 299 r := make(wrapper.QueueData, len(q)) | 299 r := make(wrapper.QueueData, len(q)) |
| 300 for k, q := range q { | 300 for k, q := range q { |
| 301 r[k] = make(map[string]*taskqueue.Task, len(q)) | 301 r[k] = make(map[string]*taskqueue.Task, len(q)) |
| 302 for tn, t := range q { | 302 for tn, t := range q { |
| 303 r[k][tn] = dupTask(t) | 303 r[k][tn] = dupTask(t) |
| 304 } | 304 } |
| 305 } | 305 } |
| 306 return r | 306 return r |
| 307 } | 307 } |
| OLD | NEW |