OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package memory |
| 6 |
| 7 import ( |
| 8 "fmt" |
| 9 "infra/gae/libs/wrapper" |
| 10 "math/rand" |
| 11 "net/http" |
| 12 "regexp" |
| 13 "time" |
| 14 |
| 15 "golang.org/x/net/context" |
| 16 |
| 17 "appengine" |
| 18 "appengine/taskqueue" |
| 19 "appengine_internal" |
| 20 dbpb "appengine_internal/datastore" |
| 21 pb "appengine_internal/taskqueue" |
| 22 ) |
| 23 |
| 24 /////////////////////////////// public functions /////////////////////////////// |
| 25 |
| 26 // UseTQ adds a wrapper.TaskQueue implementation to context, accessible |
| 27 // by wrapper.GetTQ(c) |
| 28 func UseTQ(c context.Context) context.Context { |
| 29 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { |
| 30 tqd := cur(ic).Get("TQ") |
| 31 var ret interface { |
| 32 wrapper.TQTestable |
| 33 wrapper.TaskQueue |
| 34 } |
| 35 switch x := tqd.(type) { |
| 36 case *taskQueueData: |
| 37 ret = &taskqueueImpl{ |
| 38 wrapper.DummyTQ(), |
| 39 x, |
| 40 curGID(ic).namespace, |
| 41 func() time.Time { return wrapper.GetTimeNow(ic)
}, |
| 42 wrapper.GetMathRand(ic), |
| 43 } |
| 44 |
| 45 case *txnTaskQueueData: |
| 46 ret = &taskqueueTxnImpl{ |
| 47 wrapper.DummyTQ(), |
| 48 x, |
| 49 curGID(ic).namespace, |
| 50 func() time.Time { return wrapper.GetTimeNow(ic)
}, |
| 51 wrapper.GetMathRand(ic), |
| 52 } |
| 53 |
| 54 default: |
| 55 panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
| 56 } |
| 57 return ret |
| 58 }) |
| 59 } |
| 60 |
| 61 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 62 |
| 63 type taskqueueImpl struct { |
| 64 wrapper.TaskQueue |
| 65 *taskQueueData |
| 66 |
| 67 ns string |
| 68 timeNow func() time.Time |
| 69 mathRand *rand.Rand |
| 70 } |
| 71 |
| 72 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { |
| 73 toSched, queueName, err := t.prepTask(t.ns, task, queueName, t.timeNow()
, t.mathRand) |
| 74 if err != nil { |
| 75 return nil, err |
| 76 } |
| 77 |
| 78 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 79 // SDK converts TOMBSTONE -> already added too |
| 80 return nil, taskqueue.ErrTaskAlreadyAdded |
| 81 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 82 return nil, taskqueue.ErrTaskAlreadyAdded |
| 83 } else { |
| 84 t.named[queueName][toSched.Name] = toSched |
| 85 } |
| 86 |
| 87 return dupTask(toSched), nil |
| 88 } |
| 89 |
| 90 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.
Task, error) { |
| 91 if err := t.IsBroken(); err != nil { |
| 92 return nil, err |
| 93 } |
| 94 |
| 95 t.Lock() |
| 96 defer t.Unlock() |
| 97 |
| 98 return t.addLocked(task, queueName) |
| 99 } |
| 100 |
| 101 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err
or { |
| 102 queueName, err := t.getQueueName(queueName) |
| 103 if err != nil { |
| 104 return err |
| 105 } |
| 106 |
| 107 if _, ok := t.archived[queueName][task.Name]; ok { |
| 108 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) |
| 109 } |
| 110 |
| 111 if _, ok := t.named[queueName][task.Name]; !ok { |
| 112 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) |
| 113 } |
| 114 |
| 115 t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
| 116 delete(t.named[queueName], task.Name) |
| 117 |
| 118 return nil |
| 119 } |
| 120 |
| 121 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { |
| 122 if err := t.IsBroken(); err != nil { |
| 123 return err |
| 124 } |
| 125 |
| 126 t.Lock() |
| 127 defer t.Unlock() |
| 128 |
| 129 return t.deleteLocked(task, queueName) |
| 130 } |
| 131 |
| 132 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*
taskqueue.Task, error) { |
| 133 if err := t.IsBroken(); err != nil { |
| 134 return nil, err |
| 135 } |
| 136 |
| 137 t.Lock() |
| 138 defer t.Unlock() |
| 139 |
| 140 return multi(tasks, queueName, t.addLocked) |
| 141 } |
| 142 |
| 143 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e
rror { |
| 144 if err := t.IsBroken(); err != nil { |
| 145 return err |
| 146 } |
| 147 |
| 148 t.Lock() |
| 149 defer t.Unlock() |
| 150 |
| 151 _, err := multi(tasks, queueName, |
| 152 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { |
| 153 return nil, t.deleteLocked(tsk, qn) |
| 154 }) |
| 155 return err |
| 156 } |
| 157 |
| 158 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 159 |
| 160 type taskqueueTxnImpl struct { |
| 161 wrapper.TaskQueue |
| 162 *txnTaskQueueData |
| 163 |
| 164 ns string |
| 165 timeNow func() time.Time |
| 166 mathRand *rand.Rand |
| 167 } |
| 168 |
| 169 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { |
| 170 toSched, queueName, err := t.parent.prepTask(t.ns, task, queueName, t.ti
meNow(), t.mathRand) |
| 171 if err != nil { |
| 172 return nil, err |
| 173 } |
| 174 |
| 175 numTasks := 0 |
| 176 for _, vs := range t.anony { |
| 177 numTasks += len(vs) |
| 178 } |
| 179 if numTasks+1 > 5 { |
| 180 // transactional tasks are actually implemented 'for real' as Ac
tions which |
| 181 // ride on the datastore. The current datastore implementation o
nly allows |
| 182 // a maximum of 5 Actions per transaction, and more than that re
sult in a |
| 183 // BAD_REQUEST. |
| 184 return nil, newDSError(dbpb.Error_BAD_REQUEST) |
| 185 } |
| 186 |
| 187 t.anony[queueName] = append(t.anony[queueName], toSched) |
| 188 |
| 189 // the fact that we have generated a unique name for this task queue ite
m is |
| 190 // an implementation detail. |
| 191 // TODO(riannucci): now that I think about this... it may not actually b
e true. |
| 192 // We should verify that the .Name for a task added in a tr
ansaction is |
| 193 // meaningless. Maybe names generated in a transaction are
somehow |
| 194 // guaranteed to be meaningful? |
| 195 toRet := dupTask(toSched) |
| 196 toRet.Name = "" |
| 197 |
| 198 return toRet, nil |
| 199 } |
| 200 |
| 201 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque
ue.Task, error) { |
| 202 if err := t.IsBroken(); err != nil { |
| 203 return nil, err |
| 204 } |
| 205 |
| 206 t.Lock() |
| 207 defer t.Unlock() |
| 208 |
| 209 return t.addLocked(task, queueName) |
| 210 } |
| 211 |
| 212 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) (
[]*taskqueue.Task, error) { |
| 213 if err := t.IsBroken(); err != nil { |
| 214 return nil, err |
| 215 } |
| 216 |
| 217 t.Lock() |
| 218 defer t.Unlock() |
| 219 |
| 220 return multi(tasks, queueName, t.addLocked) |
| 221 } |
| 222 |
| 223 ////////////////////////////// private functions /////////////////////////////// |
| 224 |
| 225 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| 226 |
| 227 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
| 228 |
| 229 func mkName(rnd *rand.Rand, cur string, queue map[string]*taskqueue.Task) string
{ |
| 230 _, ok := queue[cur] |
| 231 for !ok && cur == "" { |
| 232 name := [500]byte{} |
| 233 for i := 0; i < 500; i++ { |
| 234 name[i] = validTaskChars[rnd.Intn(len(validTaskChars))] |
| 235 } |
| 236 cur = string(name[:]) |
| 237 _, ok = queue[cur] |
| 238 } |
| 239 return cur |
| 240 } |
| 241 |
| 242 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { |
| 243 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} |
| 244 } |
| 245 |
| 246 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st
ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { |
| 247 ret := []*taskqueue.Task(nil) |
| 248 me := appengine.MultiError(nil) |
| 249 foundErr := false |
| 250 for _, task := range tasks { |
| 251 rt, err := f(task, queueName) |
| 252 ret = append(ret, rt) |
| 253 me = append(me, err) |
| 254 if err != nil { |
| 255 foundErr = true |
| 256 } |
| 257 } |
| 258 if !foundErr { |
| 259 me = nil |
| 260 } |
| 261 return ret, me |
| 262 } |
| 263 |
| 264 func dupTask(t *taskqueue.Task) *taskqueue.Task { |
| 265 ret := &taskqueue.Task{} |
| 266 *ret = *t |
| 267 |
| 268 if t.Header != nil { |
| 269 ret.Header = make(http.Header, len(t.Header)) |
| 270 for k, vs := range t.Header { |
| 271 newVs := make([]string, len(vs)) |
| 272 copy(newVs, vs) |
| 273 ret.Header[k] = newVs |
| 274 } |
| 275 } |
| 276 |
| 277 if t.Payload != nil { |
| 278 ret.Payload = make([]byte, len(t.Payload)) |
| 279 copy(ret.Payload, t.Payload) |
| 280 } |
| 281 |
| 282 if t.RetryOptions != nil { |
| 283 ret.RetryOptions = &taskqueue.RetryOptions{} |
| 284 *ret.RetryOptions = *t.RetryOptions |
| 285 } |
| 286 |
| 287 return ret |
| 288 } |
| 289 |
| 290 func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
| 291 r := make(wrapper.QueueData, len(q)) |
| 292 for k, q := range q { |
| 293 r[k] = make(map[string]*taskqueue.Task, len(q)) |
| 294 for tn, t := range q { |
| 295 r[k][tn] = dupTask(t) |
| 296 } |
| 297 } |
| 298 return r |
| 299 } |
OLD | NEW |