| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "infra/gae/libs/wrapper" | 9 "infra/gae/libs/wrapper" |
| 10 "math/rand" | |
| 11 "net/http" | 10 "net/http" |
| 12 "regexp" | 11 "regexp" |
| 13 "time" | |
| 14 | 12 |
| 15 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
| 16 | 14 |
| 17 "appengine" | 15 "appengine" |
| 18 "appengine/taskqueue" | 16 "appengine/taskqueue" |
| 19 "appengine_internal" | 17 "appengine_internal" |
| 20 dbpb "appengine_internal/datastore" | 18 dbpb "appengine_internal/datastore" |
| 21 pb "appengine_internal/taskqueue" | 19 pb "appengine_internal/taskqueue" |
| 22 ) | 20 ) |
| 23 | 21 |
| 24 /////////////////////////////// public functions /////////////////////////////// | 22 /////////////////////////////// public functions /////////////////////////////// |
| 25 | 23 |
| 26 func useTQ(c context.Context) context.Context { | 24 func useTQ(c context.Context) context.Context { |
| 27 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { | 25 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { |
| 28 tqd := cur(ic).Get(memContextTQIdx) | 26 tqd := cur(ic).Get(memContextTQIdx) |
| 29 var ret interface { | 27 var ret interface { |
| 30 wrapper.TQTestable | 28 wrapper.TQTestable |
| 31 wrapper.TaskQueue | 29 wrapper.TaskQueue |
| 32 } | 30 } |
| 33 switch x := tqd.(type) { | 31 switch x := tqd.(type) { |
| 34 case *taskQueueData: | 32 case *taskQueueData: |
| 35 ret = &taskqueueImpl{ | 33 ret = &taskqueueImpl{ |
| 36 wrapper.DummyTQ(), | 34 wrapper.DummyTQ(), |
| 37 x, | 35 x, |
| 36 ic, |
| 38 curGID(ic).namespace, | 37 curGID(ic).namespace, |
| 39 func() time.Time { return wrapper.GetTimeNow(ic)
}, | |
| 40 wrapper.GetMathRand(ic), | |
| 41 } | 38 } |
| 42 | 39 |
| 43 case *txnTaskQueueData: | 40 case *txnTaskQueueData: |
| 44 ret = &taskqueueTxnImpl{ | 41 ret = &taskqueueTxnImpl{ |
| 45 wrapper.DummyTQ(), | 42 wrapper.DummyTQ(), |
| 46 x, | 43 x, |
| 44 ic, |
| 47 curGID(ic).namespace, | 45 curGID(ic).namespace, |
| 48 func() time.Time { return wrapper.GetTimeNow(ic)
}, | |
| 49 wrapper.GetMathRand(ic), | |
| 50 } | 46 } |
| 51 | 47 |
| 52 default: | 48 default: |
| 53 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | 49 panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
| 54 } | 50 } |
| 55 return ret | 51 return ret |
| 56 }) | 52 }) |
| 57 } | 53 } |
| 58 | 54 |
| 59 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 55 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 60 | 56 |
| 61 type taskqueueImpl struct { | 57 type taskqueueImpl struct { |
| 62 wrapper.TaskQueue | 58 wrapper.TaskQueue |
| 63 *taskQueueData | 59 *taskQueueData |
| 64 | 60 |
| 65 » ns string | 61 » ctx context.Context |
| 66 » timeNow func() time.Time | 62 » ns string |
| 67 » mathRand *rand.Rand | |
| 68 } | 63 } |
| 69 | 64 |
| 70 var ( | 65 var ( |
| 71 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) | 66 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
| 72 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) | 67 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
| 73 ) | 68 ) |
| 74 | 69 |
| 75 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { | 70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { |
| 76 » toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow()
, t.mathRand) | 71 » toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
| 77 if err != nil { | 72 if err != nil { |
| 78 return nil, err | 73 return nil, err |
| 79 } | 74 } |
| 80 | 75 |
| 81 if _, ok := t.archived[queueName][toSched.Name]; ok { | 76 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 82 // SDK converts TOMBSTONE -> already added too | 77 // SDK converts TOMBSTONE -> already added too |
| 83 return nil, taskqueue.ErrTaskAlreadyAdded | 78 return nil, taskqueue.ErrTaskAlreadyAdded |
| 84 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 79 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 85 return nil, taskqueue.ErrTaskAlreadyAdded | 80 return nil, taskqueue.ErrTaskAlreadyAdded |
| 86 } else { | 81 } else { |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 157 }) | 152 }) |
| 158 return err | 153 return err |
| 159 } | 154 } |
| 160 | 155 |
| 161 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 156 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 162 | 157 |
| 163 type taskqueueTxnImpl struct { | 158 type taskqueueTxnImpl struct { |
| 164 wrapper.TaskQueue | 159 wrapper.TaskQueue |
| 165 *txnTaskQueueData | 160 *txnTaskQueueData |
| 166 | 161 |
| 167 » ns string | 162 » ctx context.Context |
| 168 » timeNow func() time.Time | 163 » ns string |
| 169 » mathRand *rand.Rand | |
| 170 } | 164 } |
| 171 | 165 |
| 172 var ( | 166 var ( |
| 173 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) | 167 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
| 174 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) | 168 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
| 175 ) | 169 ) |
| 176 | 170 |
| 177 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { | 171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { |
| 178 » toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti
meNow(), t.mathRand) | 172 » toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) |
| 179 if err != nil { | 173 if err != nil { |
| 180 return nil, err | 174 return nil, err |
| 181 } | 175 } |
| 182 | 176 |
| 183 numTasks := 0 | 177 numTasks := 0 |
| 184 for _, vs := range t.anony { | 178 for _, vs := range t.anony { |
| 185 numTasks += len(vs) | 179 numTasks += len(vs) |
| 186 } | 180 } |
| 187 if numTasks+1 > 5 { | 181 if numTasks+1 > 5 { |
| 188 // transactional tasks are actually implemented 'for real' as Ac
tions which | 182 // transactional tasks are actually implemented 'for real' as Ac
tions which |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 227 | 221 |
| 228 return multi(tasks, queueName, t.addLocked) | 222 return multi(tasks, queueName, t.addLocked) |
| 229 } | 223 } |
| 230 | 224 |
| 231 ////////////////////////////// private functions /////////////////////////////// | 225 ////////////////////////////// private functions /////////////////////////////// |
| 232 | 226 |
| 233 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| 234 | 228 |
| 235 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
| 236 | 230 |
| 237 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string
{ | 231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str
ing { |
| 238 _, ok := queue[cur] | 232 _, ok := queue[cur] |
| 239 for !ok && cur == "" { | 233 for !ok && cur == "" { |
| 240 name := [500]byte{} | 234 name := [500]byte{} |
| 241 for i := 0; i < 500; i++ { | 235 for i := 0; i < 500; i++ { |
| 242 » » » name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] | 236 » » » name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len
(validTaskChars))] |
| 243 } | 237 } |
| 244 cur = string(name[:]) | 238 cur = string(name[:]) |
| 245 _, ok = queue[cur] | 239 _, ok = queue[cur] |
| 246 } | 240 } |
| 247 return cur | 241 return cur |
| 248 } | 242 } |
| 249 | 243 |
| 250 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { | 244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { |
| 251 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} | 245 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} |
| 252 } | 246 } |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 298 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | 292 func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
| 299 r := make(wrapper.QueueData, len(q)) | 293 r := make(wrapper.QueueData, len(q)) |
| 300 for k, q := range q { | 294 for k, q := range q { |
| 301 r[k] = make(map[string]*taskqueue.Task, len(q)) | 295 r[k] = make(map[string]*taskqueue.Task, len(q)) |
| 302 for tn, t := range q { | 296 for tn, t := range q { |
| 303 r[k][tn] = dupTask(t) | 297 r[k][tn] = dupTask(t) |
| 304 } | 298 } |
| 305 } | 299 } |
| 306 return r | 300 return r |
| 307 } | 301 } |
| OLD | NEW |