OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package memory |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "infra/gae/libs/wrapper" |
| 10 "math/rand" |
| 11 "net/http" |
| 12 "regexp" |
| 13 "time" |
| 14 |
| 15 "golang.org/x/net/context" |
| 16 |
| 17 "appengine" |
| 18 "appengine/taskqueue" |
| 19 "appengine_internal" |
| 20 dbpb "appengine_internal/datastore" |
| 21 pb "appengine_internal/taskqueue" |
| 22 ) |
| 23 |
| 24 /////////////////////////////// public functions /////////////////////////////// |
| 25 |
| 26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible |
| 27 // by wrapper.GetTQ(c) |
| 28 func UseTQ(c context.Context) context.Context { |
| 29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { |
| 30 tqd := cur(ic).Get(memContextTQIdx) |
| 31 var ret interface { |
| 32 wrapper.TQTestable |
| 33 wrapper.TaskQueue |
| 34 } |
| 35 switch x := tqd.(type) { |
| 36 case *taskQueueData: |
| 37 ret = &taskqueueImpl{ |
| 38 wrapper.DummyTQ(), |
| 39 x, |
| 40 curGID(ic).namespace, |
| 41 func() time.Time { return wrapper.GetTimeNow(ic)
}, |
| 42 wrapper.GetMathRand(ic), |
| 43 } |
| 44 |
| 45 case *txnTaskQueueData: |
| 46 ret = &taskqueueTxnImpl{ |
| 47 wrapper.DummyTQ(), |
| 48 x, |
| 49 curGID(ic).namespace, |
| 50 func() time.Time { return wrapper.GetTimeNow(ic)
}, |
| 51 wrapper.GetMathRand(ic), |
| 52 } |
| 53 |
| 54 default: |
| 55 panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
| 56 } |
| 57 return ret |
| 58 }) |
| 59 } |
| 60 |
| 61 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 62 |
| 63 type taskqueueImpl struct { |
| 64 wrapper.TaskQueue |
| 65 *taskQueueData |
| 66 |
| 67 ns string |
| 68 timeNow func() time.Time |
| 69 mathRand *rand.Rand |
| 70 } |
| 71 |
| 72 var ( |
| 73 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
| 74 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
| 75 ) |
| 76 |
| 77 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { |
| 78 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow()
, t.mathRand) |
| 79 if err != nil { |
| 80 return nil, err |
| 81 } |
| 82 |
| 83 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 84 // SDK converts TOMBSTONE -> already added too |
| 85 return nil, taskqueue.ErrTaskAlreadyAdded |
| 86 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 87 return nil, taskqueue.ErrTaskAlreadyAdded |
| 88 } else { |
| 89 t.named[queueName][toSched.Name] = toSched |
| 90 } |
| 91 |
| 92 return dupTask(toSched), nil |
| 93 } |
| 94 |
| 95 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.
Task, error) { |
| 96 if err := t.IsBroken(); err != nil { |
| 97 return nil, err |
| 98 } |
| 99 |
| 100 t.Lock() |
| 101 defer t.Unlock() |
| 102 |
| 103 return t.addLocked(task, queueName) |
| 104 } |
| 105 |
| 106 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err
or { |
| 107 queueName, err := t.getQueueName(queueName) |
| 108 if err != nil { |
| 109 return err |
| 110 } |
| 111 |
| 112 if _, ok := t.archived[queueName][task.Name]; ok { |
| 113 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) |
| 114 } |
| 115 |
| 116 if _, ok := t.named[queueName][task.Name]; !ok { |
| 117 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) |
| 118 } |
| 119 |
| 120 t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
| 121 delete(t.named[queueName], task.Name) |
| 122 |
| 123 return nil |
| 124 } |
| 125 |
| 126 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { |
| 127 if err := t.IsBroken(); err != nil { |
| 128 return err |
| 129 } |
| 130 |
| 131 t.Lock() |
| 132 defer t.Unlock() |
| 133 |
| 134 return t.deleteLocked(task, queueName) |
| 135 } |
| 136 |
| 137 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*
taskqueue.Task, error) { |
| 138 if err := t.IsBroken(); err != nil { |
| 139 return nil, err |
| 140 } |
| 141 |
| 142 t.Lock() |
| 143 defer t.Unlock() |
| 144 |
| 145 return multi(tasks, queueName, t.addLocked) |
| 146 } |
| 147 |
| 148 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e
rror { |
| 149 if err := t.IsBroken(); err != nil { |
| 150 return err |
| 151 } |
| 152 |
| 153 t.Lock() |
| 154 defer t.Unlock() |
| 155 |
| 156 _, err := multi(tasks, queueName, |
| 157 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { |
| 158 return nil, t.deleteLocked(tsk, qn) |
| 159 }) |
| 160 return err |
| 161 } |
| 162 |
| 163 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 164 |
| 165 type taskqueueTxnImpl struct { |
| 166 wrapper.TaskQueue |
| 167 *txnTaskQueueData |
| 168 |
| 169 ns string |
| 170 timeNow func() time.Time |
| 171 mathRand *rand.Rand |
| 172 } |
| 173 |
| 174 var ( |
| 175 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
| 176 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
| 177 ) |
| 178 |
| 179 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { |
| 180 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti
meNow(), t.mathRand) |
| 181 if err != nil { |
| 182 return nil, err |
| 183 } |
| 184 |
| 185 numTasks := 0 |
| 186 for _, vs := range t.anony { |
| 187 numTasks += len(vs) |
| 188 } |
| 189 if numTasks+1 > 5 { |
| 190 // transactional tasks are actually implemented 'for real' as Ac
tions which |
| 191 // ride on the datastore. The current datastore implementation o
nly allows |
| 192 // a maximum of 5 Actions per transaction, and more than that re
sult in a |
| 193 // BAD_REQUEST. |
| 194 return nil, newDSError(dbpb.Error_BAD_REQUEST) |
| 195 } |
| 196 |
| 197 t.anony[queueName] = append(t.anony[queueName], toSched) |
| 198 |
| 199 // the fact that we have generated a unique name for this task queue ite
m is |
| 200 // an implementation detail. |
| 201 // TODO(riannucci): now that I think about this... it may not actually b
e true. |
| 202 // We should verify that the .Name for a task added in a tr
ansaction is |
| 203 // meaningless. Maybe names generated in a transaction are
somehow |
| 204 // guaranteed to be meaningful? |
| 205 toRet := dupTask(toSched) |
| 206 toRet.Name = "" |
| 207 |
| 208 return toRet, nil |
| 209 } |
| 210 |
| 211 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque
ue.Task, error) { |
| 212 if err := t.IsBroken(); err != nil { |
| 213 return nil, err |
| 214 } |
| 215 |
| 216 t.Lock() |
| 217 defer t.Unlock() |
| 218 |
| 219 return t.addLocked(task, queueName) |
| 220 } |
| 221 |
| 222 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) (
[]*taskqueue.Task, error) { |
| 223 if err := t.IsBroken(); err != nil { |
| 224 return nil, err |
| 225 } |
| 226 |
| 227 t.Lock() |
| 228 defer t.Unlock() |
| 229 |
| 230 return multi(tasks, queueName, t.addLocked) |
| 231 } |
| 232 |
| 233 ////////////////////////////// private functions /////////////////////////////// |
| 234 |
| 235 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| 236 |
| 237 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
| 238 |
| 239 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string
{ |
| 240 _, ok := queue[cur] |
| 241 for !ok && cur == "" { |
| 242 name := [500]byte{} |
| 243 for i := 0; i < 500; i++ { |
| 244 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] |
| 245 } |
| 246 cur = string(name[:]) |
| 247 _, ok = queue[cur] |
| 248 } |
| 249 return cur |
| 250 } |
| 251 |
| 252 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { |
| 253 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} |
| 254 } |
| 255 |
| 256 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st
ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { |
| 257 ret := []*taskqueue.Task(nil) |
| 258 me := appengine.MultiError(nil) |
| 259 foundErr := false |
| 260 for _, task := range tasks { |
| 261 rt, err := f(task, queueName) |
| 262 ret = append(ret, rt) |
| 263 me = append(me, err) |
| 264 if err != nil { |
| 265 foundErr = true |
| 266 } |
| 267 } |
| 268 if !foundErr { |
| 269 me = nil |
| 270 } |
| 271 return ret, me |
| 272 } |
| 273 |
| 274 func dupTask(t *taskqueue.Task) *taskqueue.Task { |
| 275 ret := &taskqueue.Task{} |
| 276 *ret = *t |
| 277 |
| 278 if t.Header != nil { |
| 279 ret.Header = make(http.Header, len(t.Header)) |
| 280 for k, vs := range t.Header { |
| 281 newVs := make([]string, len(vs)) |
| 282 copy(newVs, vs) |
| 283 ret.Header[k] = newVs |
| 284 } |
| 285 } |
| 286 |
| 287 if t.Payload != nil { |
| 288 ret.Payload = make([]byte, len(t.Payload)) |
| 289 copy(ret.Payload, t.Payload) |
| 290 } |
| 291 |
| 292 if t.RetryOptions != nil { |
| 293 ret.RetryOptions = &taskqueue.RetryOptions{} |
| 294 *ret.RetryOptions = *t.RetryOptions |
| 295 } |
| 296 |
| 297 return ret |
| 298 } |
| 299 |
| 300 func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
| 301 r := make(wrapper.QueueData, len(q)) |
| 302 for k, q := range q { |
| 303 r[k] = make(map[string]*taskqueue.Task, len(q)) |
| 304 for tn, t := range q { |
| 305 r[k][tn] = dupTask(t) |
| 306 } |
| 307 } |
| 308 return r |
| 309 } |
OLD | NEW |