| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "infra/gae/libs/wrapper" | |
| 10 "net/http" | |
| 11 "regexp" | |
| 12 | |
| 13 "golang.org/x/net/context" | |
| 14 | |
| 15 "appengine" | |
| 16 "appengine/taskqueue" | |
| 17 "appengine_internal" | |
| 18 dbpb "appengine_internal/datastore" | |
| 19 pb "appengine_internal/taskqueue" | |
| 20 ) | |
| 21 | |
| 22 /////////////////////////////// public functions /////////////////////////////// | |
| 23 | |
| 24 func useTQ(c context.Context) context.Context { | |
| 25 return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { | |
| 26 tqd := cur(ic).Get(memContextTQIdx) | |
| 27 var ret interface { | |
| 28 wrapper.TQTestable | |
| 29 wrapper.TaskQueue | |
| 30 } | |
| 31 switch x := tqd.(type) { | |
| 32 case *taskQueueData: | |
| 33 ret = &taskqueueImpl{ | |
| 34 wrapper.DummyTQ(), | |
| 35 x, | |
| 36 ic, | |
| 37 curGID(ic).namespace, | |
| 38 } | |
| 39 | |
| 40 case *txnTaskQueueData: | |
| 41 ret = &taskqueueTxnImpl{ | |
| 42 wrapper.DummyTQ(), | |
| 43 x, | |
| 44 ic, | |
| 45 curGID(ic).namespace, | |
| 46 } | |
| 47 | |
| 48 default: | |
| 49 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | |
| 50 } | |
| 51 return ret | |
| 52 }) | |
| 53 } | |
| 54 | |
| 55 //////////////////////////////// taskqueueImpl ///////////////////////////////// | |
| 56 | |
| 57 type taskqueueImpl struct { | |
| 58 wrapper.TaskQueue | |
| 59 *taskQueueData | |
| 60 | |
| 61 ctx context.Context | |
| 62 ns string | |
| 63 } | |
| 64 | |
| 65 var ( | |
| 66 _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) | |
| 67 _ = wrapper.TQTestable((*taskqueueImpl)(nil)) | |
| 68 ) | |
| 69 | |
| 70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { | |
| 71 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | |
| 72 if err != nil { | |
| 73 return nil, err | |
| 74 } | |
| 75 | |
| 76 if _, ok := t.archived[queueName][toSched.Name]; ok { | |
| 77 // SDK converts TOMBSTONE -> already added too | |
| 78 return nil, taskqueue.ErrTaskAlreadyAdded | |
| 79 } else if _, ok := t.named[queueName][toSched.Name]; ok { | |
| 80 return nil, taskqueue.ErrTaskAlreadyAdded | |
| 81 } else { | |
| 82 t.named[queueName][toSched.Name] = toSched | |
| 83 } | |
| 84 | |
| 85 return dupTask(toSched), nil | |
| 86 } | |
| 87 | |
| 88 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.
Task, error) { | |
| 89 if err := t.IsBroken(); err != nil { | |
| 90 return nil, err | |
| 91 } | |
| 92 | |
| 93 t.Lock() | |
| 94 defer t.Unlock() | |
| 95 | |
| 96 return t.addLocked(task, queueName) | |
| 97 } | |
| 98 | |
| 99 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err
or { | |
| 100 queueName, err := t.getQueueName(queueName) | |
| 101 if err != nil { | |
| 102 return err | |
| 103 } | |
| 104 | |
| 105 if _, ok := t.archived[queueName][task.Name]; ok { | |
| 106 return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) | |
| 107 } | |
| 108 | |
| 109 if _, ok := t.named[queueName][task.Name]; !ok { | |
| 110 return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) | |
| 111 } | |
| 112 | |
| 113 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | |
| 114 delete(t.named[queueName], task.Name) | |
| 115 | |
| 116 return nil | |
| 117 } | |
| 118 | |
| 119 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { | |
| 120 if err := t.IsBroken(); err != nil { | |
| 121 return err | |
| 122 } | |
| 123 | |
| 124 t.Lock() | |
| 125 defer t.Unlock() | |
| 126 | |
| 127 return t.deleteLocked(task, queueName) | |
| 128 } | |
| 129 | |
| 130 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*
taskqueue.Task, error) { | |
| 131 if err := t.IsBroken(); err != nil { | |
| 132 return nil, err | |
| 133 } | |
| 134 | |
| 135 t.Lock() | |
| 136 defer t.Unlock() | |
| 137 | |
| 138 return multi(tasks, queueName, t.addLocked) | |
| 139 } | |
| 140 | |
| 141 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e
rror { | |
| 142 if err := t.IsBroken(); err != nil { | |
| 143 return err | |
| 144 } | |
| 145 | |
| 146 t.Lock() | |
| 147 defer t.Unlock() | |
| 148 | |
| 149 _, err := multi(tasks, queueName, | |
| 150 func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { | |
| 151 return nil, t.deleteLocked(tsk, qn) | |
| 152 }) | |
| 153 return err | |
| 154 } | |
| 155 | |
| 156 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | |
| 157 | |
| 158 type taskqueueTxnImpl struct { | |
| 159 wrapper.TaskQueue | |
| 160 *txnTaskQueueData | |
| 161 | |
| 162 ctx context.Context | |
| 163 ns string | |
| 164 } | |
| 165 | |
| 166 var ( | |
| 167 _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) | |
| 168 _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) | |
| 169 ) | |
| 170 | |
| 171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { | |
| 172 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) | |
| 173 if err != nil { | |
| 174 return nil, err | |
| 175 } | |
| 176 | |
| 177 numTasks := 0 | |
| 178 for _, vs := range t.anony { | |
| 179 numTasks += len(vs) | |
| 180 } | |
| 181 if numTasks+1 > 5 { | |
| 182 // transactional tasks are actually implemented 'for real' as Ac
tions which | |
| 183 // ride on the datastore. The current datastore implementation o
nly allows | |
| 184 // a maximum of 5 Actions per transaction, and more than that re
sult in a | |
| 185 // BAD_REQUEST. | |
| 186 return nil, newDSError(dbpb.Error_BAD_REQUEST) | |
| 187 } | |
| 188 | |
| 189 t.anony[queueName] = append(t.anony[queueName], toSched) | |
| 190 | |
| 191 // the fact that we have generated a unique name for this task queue ite
m is | |
| 192 // an implementation detail. | |
| 193 // TODO(riannucci): now that I think about this... it may not actually b
e true. | |
| 194 // We should verify that the .Name for a task added in a tr
ansaction is | |
| 195 // meaningless. Maybe names generated in a transaction are
somehow | |
| 196 // guaranteed to be meaningful? | |
| 197 toRet := dupTask(toSched) | |
| 198 toRet.Name = "" | |
| 199 | |
| 200 return toRet, nil | |
| 201 } | |
| 202 | |
| 203 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque
ue.Task, error) { | |
| 204 if err := t.IsBroken(); err != nil { | |
| 205 return nil, err | |
| 206 } | |
| 207 | |
| 208 t.Lock() | |
| 209 defer t.Unlock() | |
| 210 | |
| 211 return t.addLocked(task, queueName) | |
| 212 } | |
| 213 | |
| 214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) (
[]*taskqueue.Task, error) { | |
| 215 if err := t.IsBroken(); err != nil { | |
| 216 return nil, err | |
| 217 } | |
| 218 | |
| 219 t.Lock() | |
| 220 defer t.Unlock() | |
| 221 | |
| 222 return multi(tasks, queueName, t.addLocked) | |
| 223 } | |
| 224 | |
| 225 ////////////////////////////// private functions /////////////////////////////// | |
| 226 | |
| 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | |
| 228 | |
| 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | |
| 230 | |
| 231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str
ing { | |
| 232 _, ok := queue[cur] | |
| 233 for !ok && cur == "" { | |
| 234 name := [500]byte{} | |
| 235 for i := 0; i < 500; i++ { | |
| 236 name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len
(validTaskChars))] | |
| 237 } | |
| 238 cur = string(name[:]) | |
| 239 _, ok = queue[cur] | |
| 240 } | |
| 241 return cur | |
| 242 } | |
| 243 | |
| 244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { | |
| 245 return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} | |
| 246 } | |
| 247 | |
| 248 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st
ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { | |
| 249 ret := []*taskqueue.Task(nil) | |
| 250 me := appengine.MultiError(nil) | |
| 251 foundErr := false | |
| 252 for _, task := range tasks { | |
| 253 rt, err := f(task, queueName) | |
| 254 ret = append(ret, rt) | |
| 255 me = append(me, err) | |
| 256 if err != nil { | |
| 257 foundErr = true | |
| 258 } | |
| 259 } | |
| 260 if !foundErr { | |
| 261 me = nil | |
| 262 } | |
| 263 return ret, me | |
| 264 } | |
| 265 | |
| 266 func dupTask(t *taskqueue.Task) *taskqueue.Task { | |
| 267 ret := &taskqueue.Task{} | |
| 268 *ret = *t | |
| 269 | |
| 270 if t.Header != nil { | |
| 271 ret.Header = make(http.Header, len(t.Header)) | |
| 272 for k, vs := range t.Header { | |
| 273 newVs := make([]string, len(vs)) | |
| 274 copy(newVs, vs) | |
| 275 ret.Header[k] = newVs | |
| 276 } | |
| 277 } | |
| 278 | |
| 279 if t.Payload != nil { | |
| 280 ret.Payload = make([]byte, len(t.Payload)) | |
| 281 copy(ret.Payload, t.Payload) | |
| 282 } | |
| 283 | |
| 284 if t.RetryOptions != nil { | |
| 285 ret.RetryOptions = &taskqueue.RetryOptions{} | |
| 286 *ret.RetryOptions = *t.RetryOptions | |
| 287 } | |
| 288 | |
| 289 return ret | |
| 290 } | |
| 291 | |
| 292 func dupQueue(q wrapper.QueueData) wrapper.QueueData { | |
| 293 r := make(wrapper.QueueData, len(q)) | |
| 294 for k, q := range q { | |
| 295 r[k] = make(map[string]*taskqueue.Task, len(q)) | |
| 296 for tn, t := range q { | |
| 297 r[k][tn] = dupTask(t) | |
| 298 } | |
| 299 } | |
| 300 return r | |
| 301 } | |
| OLD | NEW |