 Chromium Code Reviews
 Chromium Code Reviews Issue 1152383003:
  Simple memory testing for gae/wrapper  (Closed) 
  Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
    
  
    Issue 1152383003:
  Simple memory testing for gae/wrapper  (Closed) 
  Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite| OLD | NEW | 
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "infra/gae/libs/wrapper" | |
| 10 "math/rand" | |
| 11 "net/http" | |
| 12 "regexp" | |
| 13 "time" | |
| 14 | |
| 15 "golang.org/x/net/context" | |
| 16 | |
| 17 "appengine" | |
| 18 "appengine/taskqueue" | |
| 19 "appengine_internal" | |
| 20 dbpb "appengine_internal/datastore" | |
| 21 pb "appengine_internal/taskqueue" | |
| 22 ) | |
| 23 | |
| 24 /////////////////////////////// public functions /////////////////////////////// | |
| 25 | |
| 26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible | |
| 27 // by wrapper.GetTQ(c) | |
| 28 func UseTQ(c context.Context) context.Context { | |
| 29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu e { | |
| 30 tqd := cur(ic).Get("TQ") | |
| 31 var ret interface { | |
| 32 wrapper.TQTestable | |
| 33 wrapper.TaskQueue | |
| 34 } | |
| 35 switch x := tqd.(type) { | |
| 36 case *taskQueueData: | |
| 37 ret = &taskqueueImpl{ | |
| 38 wrapper.DummyTQ(), | |
| 
M-A Ruel
2015/05/28 22:42:39
you create a TQ everytime?
 
iannucci
2015/05/28 23:00:35
it returns a constant object (a named struct{}) in
 | |
| 39 x, | |
| 40 curGID(ic).namespace, | |
| 41 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
| 42 wrapper.GetMathRand(ic), | |
| 43 } | |
| 44 | |
| 45 case *txnTaskQueueData: | |
| 46 ret = &taskqueueTxnImpl{ | |
| 47 wrapper.DummyTQ(), | |
| 48 x, | |
| 49 curGID(ic).namespace, | |
| 50 func() time.Time { return wrapper.GetTimeNow(ic) }, | |
| 51 wrapper.GetMathRand(ic), | |
| 52 } | |
| 53 | |
| 54 default: | |
| 55 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | |
| 56 } | |
| 57 return ret | |
| 58 }) | |
| 59 } | |
| 60 | |
| 61 //////////////////////////////// taskqueueImpl ///////////////////////////////// | |
| 62 | |
| 63 type taskqueueImpl struct { | |
| 64 wrapper.TaskQueue | |
| 65 *taskQueueData | |
| 66 | |
| 67 ns string | |
| 68 timeNow func() time.Time | |
| 69 mathRand *rand.Rand | |
| 70 } | |
| 71 | |
| 72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task queue.Task, error) { | |
| 73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow() , t.mathRand) | |
| 74 if err != nil { | |
| 75 return nil, err | |
| 76 } | |
| 77 | |
| 78 if _, ok := t.archived[queueName][toSched.Name]; ok { | |
| 79 // SDK converts TOMBSTONE -> already added too | |
| 80 return nil, taskqueue.ErrTaskAlreadyAdded | |
| 81 } else if _, ok := t.named[queueName][toSched.Name]; ok { | |
| 82 return nil, taskqueue.ErrTaskAlreadyAdded | |
| 83 } else { | |
| 84 t.named[queueName][toSched.Name] = toSched | |
| 85 } | |
| 86 | |
| 87 return dupTask(toSched), nil | |
| 88 } | |
| 89 | |
| 90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue. Task, error) { | |
| 91 if err := t.IsBroken(); err != nil { | |
| 92 return nil, err | |
| 93 } | |
| 94 | |
| 95 t.Lock() | |
| 96 defer t.Unlock() | |
| 97 | |
| 98 return t.addLocked(task, queueName) | |
| 99 } | |
| 100 | |
| 101 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err or { | |
| 102 queueName, err := t.getQueueName(queueName) | |
| 103 if err != nil { | |
| 104 return err | |
| 105 } | |
| 106 | |
| 107 if _, ok := t.archived[queueName][task.Name]; ok { | |
| 108 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) | |
| 109 } | |
| 110 | |
| 111 if _, ok := t.named[queueName][task.Name]; !ok { | |
| 112 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) | |
| 113 } | |
| 114 | |
| 115 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | |
| 116 delete(t.named[queueName], task.Name) | |
| 117 | |
| 118 return nil | |
| 119 } | |
| 120 | |
| 121 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { | |
| 122 if err := t.IsBroken(); err != nil { | |
| 123 return err | |
| 124 } | |
| 125 | |
| 126 t.Lock() | |
| 127 defer t.Unlock() | |
| 128 | |
| 129 return t.deleteLocked(task, queueName) | |
| 130 } | |
| 131 | |
| 132 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]* taskqueue.Task, error) { | |
| 133 if err := t.IsBroken(); err != nil { | |
| 134 return nil, err | |
| 135 } | |
| 136 | |
| 
M-A Ruel
2015/05/28 22:42:39
Isn't a lock missing here?
 
iannucci
2015/05/28 23:00:35
indeed
 | |
| 137 return multi(tasks, queueName, t.addLocked) | |
| 138 } | |
| 139 | |
| 140 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e rror { | |
| 141 if err := t.IsBroken(); err != nil { | |
| 142 return err | |
| 143 } | |
| 144 | |
| 145 t.Lock() | |
| 146 defer t.Unlock() | |
| 147 | |
| 148 _, err := multi(tasks, queueName, | |
| 149 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { | |
| 150 return nil, t.deleteLocked(tsk, qn) | |
| 151 }) | |
| 152 return err | |
| 153 } | |
| 154 | |
| 155 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | |
| 156 | |
| 157 type taskqueueTxnImpl struct { | |
| 158 wrapper.TaskQueue | |
| 159 *txnTaskQueueData | |
| 160 | |
| 161 ns string | |
| 162 timeNow func() time.Time | |
| 163 mathRand *rand.Rand | |
| 164 } | |
| 165 | |
| 166 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t askqueue.Task, error) { | |
| 167 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti meNow(), t.mathRand) | |
| 168 if err != nil { | |
| 169 return nil, err | |
| 170 } | |
| 171 | |
| 172 numTasks := 0 | |
| 173 for _, vs := range t.anony { | |
| 174 numTasks += len(vs) | |
| 175 } | |
| 176 if numTasks+1 > 5 { | |
| 177 // transactional tasks are actually implemented 'for real' as Ac tions which | |
| 178 // ride on the datastore. The current datastore implementation o nly allows | |
| 179 // a maximum of 5 Actions per transaction, and more than that re sult in a | |
| 180 // BAD_REQUEST. | |
| 181 return nil, newDSError(dbpb.Error_BAD_REQUEST) | |
| 182 } | |
| 183 | |
| 184 t.anony[queueName] = append(t.anony[queueName], toSched) | |
| 185 | |
| 186 // the fact that we have generated a unique name for this task queue ite m is | |
| 187 // an implementation detail. | |
| 188 // TODO(riannucci): now that I think about this... it may not actually b e true. | |
| 189 // We should verify that the .Name for a task added in a tr ansaction is | |
| 190 // meaningless. Maybe names generated in a transaction are somehow | |
| 191 // guaranteed to be meaningful? | |
| 192 toRet := dupTask(toSched) | |
| 193 toRet.Name = "" | |
| 194 | |
| 195 return toRet, nil | |
| 196 } | |
| 197 | |
| 198 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque ue.Task, error) { | |
| 199 if err := t.IsBroken(); err != nil { | |
| 200 return nil, err | |
| 201 } | |
| 202 | |
| 203 t.Lock() | |
| 204 defer t.Unlock() | |
| 205 | |
| 206 return t.addLocked(task, queueName) | |
| 207 } | |
| 208 | |
| 209 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ( []*taskqueue.Task, error) { | |
| 210 if err := t.IsBroken(); err != nil { | |
| 211 return nil, err | |
| 212 } | |
| 213 | |
| 214 t.Lock() | |
| 215 defer t.Unlock() | |
| 216 | |
| 217 return multi(tasks, queueName, t.addLocked) | |
| 218 } | |
| 219 | |
| 220 ////////////////////////////// private functions /////////////////////////////// | |
| 221 | |
| 222 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | |
| 223 | |
| 224 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" | |
| 225 | |
| 226 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string { | |
| 227 _, ok := queue[cur] | |
| 228 for !ok && cur == "" { | |
| 229 name := [500]byte{} | |
| 230 for i := 0; i < 500; i++ { | |
| 231 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] | |
| 232 } | |
| 233 cur = string(name[:]) | |
| 234 _, ok = queue[cur] | |
| 235 } | |
| 236 return cur | |
| 237 } | |
| 238 | |
| 239 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API Error { | |
| 240 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co de)} | |
| 241 } | |
| 242 | |
| 243 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { | |
| 244 ret := []*taskqueue.Task(nil) | |
| 245 me := appengine.MultiError(nil) | |
| 246 foundErr := false | |
| 247 for _, task := range tasks { | |
| 248 rt, err := f(task, queueName) | |
| 249 ret = append(ret, rt) | |
| 250 me = append(me, err) | |
| 251 if err != nil { | |
| 252 foundErr = true | |
| 253 } | |
| 254 } | |
| 255 if !foundErr { | |
| 256 me = nil | |
| 257 } | |
| 258 return ret, me | |
| 259 } | |
| 260 | |
| 261 func dupTask(t *taskqueue.Task) *taskqueue.Task { | |
| 262 ret := &taskqueue.Task{} | |
| 263 *ret = *t | |
| 264 | |
| 265 if t.Header != nil { | |
| 266 ret.Header = make(http.Header, len(t.Header)) | |
| 267 for k, vs := range t.Header { | |
| 268 newVs := make([]string, len(vs)) | |
| 269 copy(newVs, vs) | |
| 270 ret.Header[k] = newVs | |
| 271 } | |
| 272 } | |
| 273 | |
| 274 if t.Payload != nil { | |
| 275 ret.Payload = make([]byte, len(t.Payload)) | |
| 276 copy(ret.Payload, t.Payload) | |
| 277 } | |
| 278 | |
| 279 if t.RetryOptions != nil { | |
| 280 ret.RetryOptions = &taskqueue.RetryOptions{} | |
| 281 *ret.RetryOptions = *t.RetryOptions | |
| 282 } | |
| 283 | |
| 284 return ret | |
| 285 } | |
| 286 | |
| 287 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | |
| 288 r := make(wrapper.QueueData, len(q)) | |
| 289 for k, q := range q { | |
| 290 r[k] = make(map[string]*taskqueue.Task, len(q)) | |
| 291 for tn, t := range q { | |
| 292 r[k][tn] = dupTask(t) | |
| 293 } | |
| 294 } | |
| 295 return r | |
| 296 } | |
| OLD | NEW |