| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 9 "infra/gae/libs/wrapper" |
| 10 "net/http" | 10 "net/http" |
| 11 "regexp" | 11 "regexp" |
| 12 | 12 |
| 13 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
| 14 | 14 |
| 15 » "infra/gae/libs/gae" | 15 » "appengine" |
| 16 » "appengine/taskqueue" |
| 17 » "appengine_internal" |
| 18 » dbpb "appengine_internal/datastore" |
| 19 » pb "appengine_internal/taskqueue" |
| 16 ) | 20 ) |
| 17 | 21 |
| 18 /////////////////////////////// public functions /////////////////////////////// | 22 /////////////////////////////// public functions /////////////////////////////// |
| 19 | 23 |
| 20 func useTQ(c context.Context) context.Context { | 24 func useTQ(c context.Context) context.Context { |
| 21 » return gae.SetTQFactory(c, func(ic context.Context) gae.TaskQueue { | 25 » return wrapper.SetTQFactory(c, func(ic context.Context) wrapper.TaskQueu
e { |
| 22 tqd := cur(ic).Get(memContextTQIdx) | 26 tqd := cur(ic).Get(memContextTQIdx) |
| 23 var ret interface { | 27 var ret interface { |
| 24 » » » gae.TQTestable | 28 » » » wrapper.TQTestable |
| 25 » » » gae.TaskQueue | 29 » » » wrapper.TaskQueue |
| 26 } | 30 } |
| 27 switch x := tqd.(type) { | 31 switch x := tqd.(type) { |
| 28 case *taskQueueData: | 32 case *taskQueueData: |
| 29 ret = &taskqueueImpl{ | 33 ret = &taskqueueImpl{ |
| 30 » » » » gae.DummyTQ(), | 34 » » » » wrapper.DummyTQ(), |
| 31 x, | 35 x, |
| 32 ic, | 36 ic, |
| 33 curGID(ic).namespace, | 37 curGID(ic).namespace, |
| 34 } | 38 } |
| 35 | 39 |
| 36 case *txnTaskQueueData: | 40 case *txnTaskQueueData: |
| 37 ret = &taskqueueTxnImpl{ | 41 ret = &taskqueueTxnImpl{ |
| 38 » » » » gae.DummyTQ(), | 42 » » » » wrapper.DummyTQ(), |
| 39 x, | 43 x, |
| 40 ic, | 44 ic, |
| 41 curGID(ic).namespace, | 45 curGID(ic).namespace, |
| 42 } | 46 } |
| 43 | 47 |
| 44 default: | 48 default: |
| 45 panic(fmt.Errorf("TQ: bad type: %v", tqd)) | 49 panic(fmt.Errorf("TQ: bad type: %v", tqd)) |
| 46 } | 50 } |
| 47 return ret | 51 return ret |
| 48 }) | 52 }) |
| 49 } | 53 } |
| 50 | 54 |
| 51 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 55 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 52 | 56 |
| 53 type taskqueueImpl struct { | 57 type taskqueueImpl struct { |
| 54 » gae.TaskQueue | 58 » wrapper.TaskQueue |
| 55 *taskQueueData | 59 *taskQueueData |
| 56 | 60 |
| 57 ctx context.Context | 61 ctx context.Context |
| 58 ns string | 62 ns string |
| 59 } | 63 } |
| 60 | 64 |
| 61 var ( | 65 var ( |
| 62 » _ = gae.TaskQueue((*taskqueueImpl)(nil)) | 66 » _ = wrapper.TaskQueue((*taskqueueImpl)(nil)) |
| 63 » _ = gae.TQTestable((*taskqueueImpl)(nil)) | 67 » _ = wrapper.TQTestable((*taskqueueImpl)(nil)) |
| 64 ) | 68 ) |
| 65 | 69 |
| 66 func (t *taskqueueImpl) addLocked(task *gae.TQTask, queueName string) (*gae.TQTa
sk, error) { | 70 func (t *taskqueueImpl) addLocked(task *taskqueue.Task, queueName string) (*task
queue.Task, error) { |
| 67 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | 71 toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) |
| 68 if err != nil { | 72 if err != nil { |
| 69 return nil, err | 73 return nil, err |
| 70 } | 74 } |
| 71 | 75 |
| 72 if _, ok := t.archived[queueName][toSched.Name]; ok { | 76 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 73 // SDK converts TOMBSTONE -> already added too | 77 // SDK converts TOMBSTONE -> already added too |
| 74 » » return nil, gae.ErrTQTaskAlreadyAdded | 78 » » return nil, taskqueue.ErrTaskAlreadyAdded |
| 75 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 79 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 76 » » return nil, gae.ErrTQTaskAlreadyAdded | 80 » » return nil, taskqueue.ErrTaskAlreadyAdded |
| 77 } else { | 81 } else { |
| 78 t.named[queueName][toSched.Name] = toSched | 82 t.named[queueName][toSched.Name] = toSched |
| 79 } | 83 } |
| 80 | 84 |
| 81 return dupTask(toSched), nil | 85 return dupTask(toSched), nil |
| 82 } | 86 } |
| 83 | 87 |
| 84 func (t *taskqueueImpl) Add(task *gae.TQTask, queueName string) (retTask *gae.TQ
Task, err error) { | 88 func (t *taskqueueImpl) Add(task *taskqueue.Task, queueName string) (*taskqueue.
Task, error) { |
| 85 » err = t.RunIfNotBroken(func() (err error) { | 89 » if err := t.IsBroken(); err != nil { |
| 86 » » t.Lock() | 90 » » return nil, err |
| 87 » » defer t.Unlock() | 91 » } |
| 88 » » retTask, err = t.addLocked(task, queueName) | 92 |
| 89 » » return | 93 » t.Lock() |
| 90 » }) | 94 » defer t.Unlock() |
| 91 » return | 95 |
| 96 » return t.addLocked(task, queueName) |
| 92 } | 97 } |
| 93 | 98 |
| 94 func (t *taskqueueImpl) deleteLocked(task *gae.TQTask, queueName string) error { | 99 func (t *taskqueueImpl) deleteLocked(task *taskqueue.Task, queueName string) err
or { |
| 95 queueName, err := t.getQueueName(queueName) | 100 queueName, err := t.getQueueName(queueName) |
| 96 if err != nil { | 101 if err != nil { |
| 97 return err | 102 return err |
| 98 } | 103 } |
| 99 | 104 |
| 100 if _, ok := t.archived[queueName][task.Name]; ok { | 105 if _, ok := t.archived[queueName][task.Name]; ok { |
| 101 » » return errors.New("TOMBSTONED_TASK") | 106 » » return newTQError(pb.TaskQueueServiceError_TOMBSTONED_TASK) |
| 102 } | 107 } |
| 103 | 108 |
| 104 if _, ok := t.named[queueName][task.Name]; !ok { | 109 if _, ok := t.named[queueName][task.Name]; !ok { |
| 105 » » return errors.New("UNKNOWN_TASK") | 110 » » return newTQError(pb.TaskQueueServiceError_UNKNOWN_TASK) |
| 106 } | 111 } |
| 107 | 112 |
| 108 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | 113 t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
| 109 delete(t.named[queueName], task.Name) | 114 delete(t.named[queueName], task.Name) |
| 110 | 115 |
| 111 return nil | 116 return nil |
| 112 } | 117 } |
| 113 | 118 |
| 114 func (t *taskqueueImpl) Delete(task *gae.TQTask, queueName string) error { | 119 func (t *taskqueueImpl) Delete(task *taskqueue.Task, queueName string) error { |
| 115 » return t.RunIfNotBroken(func() error { | 120 » if err := t.IsBroken(); err != nil { |
| 116 » » t.Lock() | 121 » » return err |
| 117 » » defer t.Unlock() | 122 » } |
| 118 » » return t.deleteLocked(task, queueName) | 123 |
| 119 » }) | 124 » t.Lock() |
| 125 » defer t.Unlock() |
| 126 |
| 127 » return t.deleteLocked(task, queueName) |
| 120 } | 128 } |
| 121 | 129 |
| 122 func (t *taskqueueImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retTask
s []*gae.TQTask, err error) { | 130 func (t *taskqueueImpl) AddMulti(tasks []*taskqueue.Task, queueName string) ([]*
taskqueue.Task, error) { |
| 123 » err = t.RunIfNotBroken(func() (err error) { | 131 » if err := t.IsBroken(); err != nil { |
| 124 » » t.Lock() | 132 » » return nil, err |
| 125 » » defer t.Unlock() | 133 » } |
| 126 » » retTasks, err = multi(tasks, queueName, t.addLocked) | 134 |
| 127 » » return | 135 » t.Lock() |
| 128 » }) | 136 » defer t.Unlock() |
| 129 » return | 137 |
| 138 » return multi(tasks, queueName, t.addLocked) |
| 130 } | 139 } |
| 131 | 140 |
| 132 func (t *taskqueueImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error
{ | 141 func (t *taskqueueImpl) DeleteMulti(tasks []*taskqueue.Task, queueName string) e
rror { |
| 133 » return t.RunIfNotBroken(func() error { | 142 » if err := t.IsBroken(); err != nil { |
| 134 » » t.Lock() | 143 » » return err |
| 135 » » defer t.Unlock() | 144 » } |
| 136 | 145 |
| 137 » » _, err := multi(tasks, queueName, | 146 » t.Lock() |
| 138 » » » func(tsk *gae.TQTask, qn string) (*gae.TQTask, error) { | 147 » defer t.Unlock() |
| 139 » » » » return nil, t.deleteLocked(tsk, qn) | 148 |
| 140 » » » }) | 149 » _, err := multi(tasks, queueName, |
| 141 » » return err | 150 » » func(tsk *taskqueue.Task, qn string) (*taskqueue.Task, error) { |
| 142 » }) | 151 » » » return nil, t.deleteLocked(tsk, qn) |
| 152 » » }) |
| 153 » return err |
| 143 } | 154 } |
| 144 | 155 |
| 145 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 156 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 146 | 157 |
| 147 type taskqueueTxnImpl struct { | 158 type taskqueueTxnImpl struct { |
| 148 » gae.TaskQueue | 159 » wrapper.TaskQueue |
| 149 *txnTaskQueueData | 160 *txnTaskQueueData |
| 150 | 161 |
| 151 ctx context.Context | 162 ctx context.Context |
| 152 ns string | 163 ns string |
| 153 } | 164 } |
| 154 | 165 |
| 155 var ( | 166 var ( |
| 156 » _ = gae.TaskQueue((*taskqueueTxnImpl)(nil)) | 167 » _ = wrapper.TaskQueue((*taskqueueTxnImpl)(nil)) |
| 157 » _ = gae.TQTestable((*taskqueueTxnImpl)(nil)) | 168 » _ = wrapper.TQTestable((*taskqueueTxnImpl)(nil)) |
| 158 ) | 169 ) |
| 159 | 170 |
| 160 func (t *taskqueueTxnImpl) addLocked(task *gae.TQTask, queueName string) (*gae.T
QTask, error) { | 171 func (t *taskqueueTxnImpl) addLocked(task *taskqueue.Task, queueName string) (*t
askqueue.Task, error) { |
| 161 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) | 172 toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam
e) |
| 162 if err != nil { | 173 if err != nil { |
| 163 return nil, err | 174 return nil, err |
| 164 } | 175 } |
| 165 | 176 |
| 166 numTasks := 0 | 177 numTasks := 0 |
| 167 for _, vs := range t.anony { | 178 for _, vs := range t.anony { |
| 168 numTasks += len(vs) | 179 numTasks += len(vs) |
| 169 } | 180 } |
| 170 if numTasks+1 > 5 { | 181 if numTasks+1 > 5 { |
| 171 // transactional tasks are actually implemented 'for real' as Ac
tions which | 182 // transactional tasks are actually implemented 'for real' as Ac
tions which |
| 172 // ride on the datastore. The current datastore implementation o
nly allows | 183 // ride on the datastore. The current datastore implementation o
nly allows |
| 173 // a maximum of 5 Actions per transaction, and more than that re
sult in a | 184 // a maximum of 5 Actions per transaction, and more than that re
sult in a |
| 174 // BAD_REQUEST. | 185 // BAD_REQUEST. |
| 175 » » return nil, errors.New("BAD_REQUEST") | 186 » » return nil, newDSError(dbpb.Error_BAD_REQUEST) |
| 176 } | 187 } |
| 177 | 188 |
| 178 t.anony[queueName] = append(t.anony[queueName], toSched) | 189 t.anony[queueName] = append(t.anony[queueName], toSched) |
| 179 | 190 |
| 180 // the fact that we have generated a unique name for this task queue ite
m is | 191 // the fact that we have generated a unique name for this task queue ite
m is |
| 181 // an implementation detail. | 192 // an implementation detail. |
| 182 // TODO(riannucci): now that I think about this... it may not actually b
e true. | 193 // TODO(riannucci): now that I think about this... it may not actually b
e true. |
| 183 // We should verify that the .Name for a task added in a tr
ansaction is | 194 // We should verify that the .Name for a task added in a tr
ansaction is |
| 184 // meaningless. Maybe names generated in a transaction are
somehow | 195 // meaningless. Maybe names generated in a transaction are
somehow |
| 185 // guaranteed to be meaningful? | 196 // guaranteed to be meaningful? |
| 186 toRet := dupTask(toSched) | 197 toRet := dupTask(toSched) |
| 187 toRet.Name = "" | 198 toRet.Name = "" |
| 188 | 199 |
| 189 return toRet, nil | 200 return toRet, nil |
| 190 } | 201 } |
| 191 | 202 |
| 192 func (t *taskqueueTxnImpl) Add(task *gae.TQTask, queueName string) (retTask *gae
.TQTask, err error) { | 203 func (t *taskqueueTxnImpl) Add(task *taskqueue.Task, queueName string) (*taskque
ue.Task, error) { |
| 193 » err = t.RunIfNotBroken(func() (err error) { | 204 » if err := t.IsBroken(); err != nil { |
| 194 » » t.Lock() | 205 » » return nil, err |
| 195 » » defer t.Unlock() | 206 » } |
| 196 » » retTask, err = t.addLocked(task, queueName) | 207 |
| 197 » » return | 208 » t.Lock() |
| 198 » }) | 209 » defer t.Unlock() |
| 199 » return | 210 |
| 211 » return t.addLocked(task, queueName) |
| 200 } | 212 } |
| 201 | 213 |
| 202 func (t *taskqueueTxnImpl) AddMulti(tasks []*gae.TQTask, queueName string) (retT
asks []*gae.TQTask, err error) { | 214 func (t *taskqueueTxnImpl) AddMulti(tasks []*taskqueue.Task, queueName string) (
[]*taskqueue.Task, error) { |
| 203 » err = t.RunIfNotBroken(func() (err error) { | 215 » if err := t.IsBroken(); err != nil { |
| 204 » » t.Lock() | 216 » » return nil, err |
| 205 » » defer t.Unlock() | 217 » } |
| 206 » » retTasks, err = multi(tasks, queueName, t.addLocked) | 218 |
| 207 » » return | 219 » t.Lock() |
| 208 » }) | 220 » defer t.Unlock() |
| 209 » return | 221 |
| 222 » return multi(tasks, queueName, t.addLocked) |
| 210 } | 223 } |
| 211 | 224 |
| 212 ////////////////////////////// private functions /////////////////////////////// | 225 ////////////////////////////// private functions /////////////////////////////// |
| 213 | 226 |
| 214 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 227 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| 215 | 228 |
| 216 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | 229 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
| 217 | 230 |
| 218 func mkName(c context.Context, cur string, queue map[string]*gae.TQTask) string
{ | 231 func mkName(c context.Context, cur string, queue map[string]*taskqueue.Task) str
ing { |
| 219 _, ok := queue[cur] | 232 _, ok := queue[cur] |
| 220 for !ok && cur == "" { | 233 for !ok && cur == "" { |
| 221 name := [500]byte{} | 234 name := [500]byte{} |
| 222 for i := 0; i < 500; i++ { | 235 for i := 0; i < 500; i++ { |
| 223 » » » name[i] = validTaskChars[gae.GetMathRand(c).Intn(len(val
idTaskChars))] | 236 » » » name[i] = validTaskChars[wrapper.GetMathRand(c).Intn(len
(validTaskChars))] |
| 224 } | 237 } |
| 225 cur = string(name[:]) | 238 cur = string(name[:]) |
| 226 _, ok = queue[cur] | 239 _, ok = queue[cur] |
| 227 } | 240 } |
| 228 return cur | 241 return cur |
| 229 } | 242 } |
| 230 | 243 |
| 231 func multi(tasks []*gae.TQTask, queueName string, f func(*gae.TQTask, string) (*
gae.TQTask, error)) ([]*gae.TQTask, error) { | 244 func newTQError(code pb.TaskQueueServiceError_ErrorCode) *appengine_internal.API
Error { |
| 232 » ret := []*gae.TQTask(nil) | 245 » return &appengine_internal.APIError{Service: "taskqueue", Code: int32(co
de)} |
| 233 » me := gae.MultiError(nil) | 246 } |
| 247 |
| 248 func multi(tasks []*taskqueue.Task, queueName string, f func(*taskqueue.Task, st
ring) (*taskqueue.Task, error)) ([]*taskqueue.Task, error) { |
| 249 » ret := []*taskqueue.Task(nil) |
| 250 » me := appengine.MultiError(nil) |
| 234 foundErr := false | 251 foundErr := false |
| 235 for _, task := range tasks { | 252 for _, task := range tasks { |
| 236 rt, err := f(task, queueName) | 253 rt, err := f(task, queueName) |
| 237 ret = append(ret, rt) | 254 ret = append(ret, rt) |
| 238 me = append(me, err) | 255 me = append(me, err) |
| 239 if err != nil { | 256 if err != nil { |
| 240 foundErr = true | 257 foundErr = true |
| 241 } | 258 } |
| 242 } | 259 } |
| 243 if !foundErr { | 260 if !foundErr { |
| 244 me = nil | 261 me = nil |
| 245 } | 262 } |
| 246 return ret, me | 263 return ret, me |
| 247 } | 264 } |
| 248 | 265 |
| 249 func dupTask(t *gae.TQTask) *gae.TQTask { | 266 func dupTask(t *taskqueue.Task) *taskqueue.Task { |
| 250 » ret := &gae.TQTask{} | 267 » ret := &taskqueue.Task{} |
| 251 *ret = *t | 268 *ret = *t |
| 252 | 269 |
| 253 if t.Header != nil { | 270 if t.Header != nil { |
| 254 ret.Header = make(http.Header, len(t.Header)) | 271 ret.Header = make(http.Header, len(t.Header)) |
| 255 for k, vs := range t.Header { | 272 for k, vs := range t.Header { |
| 256 newVs := make([]string, len(vs)) | 273 newVs := make([]string, len(vs)) |
| 257 copy(newVs, vs) | 274 copy(newVs, vs) |
| 258 ret.Header[k] = newVs | 275 ret.Header[k] = newVs |
| 259 } | 276 } |
| 260 } | 277 } |
| 261 | 278 |
| 262 if t.Payload != nil { | 279 if t.Payload != nil { |
| 263 ret.Payload = make([]byte, len(t.Payload)) | 280 ret.Payload = make([]byte, len(t.Payload)) |
| 264 copy(ret.Payload, t.Payload) | 281 copy(ret.Payload, t.Payload) |
| 265 } | 282 } |
| 266 | 283 |
| 267 if t.RetryOptions != nil { | 284 if t.RetryOptions != nil { |
| 268 » » ret.RetryOptions = &gae.TQRetryOptions{} | 285 » » ret.RetryOptions = &taskqueue.RetryOptions{} |
| 269 *ret.RetryOptions = *t.RetryOptions | 286 *ret.RetryOptions = *t.RetryOptions |
| 270 } | 287 } |
| 271 | 288 |
| 272 return ret | 289 return ret |
| 273 } | 290 } |
| 274 | 291 |
| 275 func dupQueue(q gae.QueueData) gae.QueueData { | 292 func dupQueue(q wrapper.QueueData) wrapper.QueueData { |
| 276 » r := make(gae.QueueData, len(q)) | 293 » r := make(wrapper.QueueData, len(q)) |
| 277 for k, q := range q { | 294 for k, q := range q { |
| 278 » » r[k] = make(map[string]*gae.TQTask, len(q)) | 295 » » r[k] = make(map[string]*taskqueue.Task, len(q)) |
| 279 for tn, t := range q { | 296 for tn, t := range q { |
| 280 r[k][tn] = dupTask(t) | 297 r[k][tn] = dupTask(t) |
| 281 } | 298 } |
| 282 } | 299 } |
| 283 return r | 300 return r |
| 284 } | 301 } |
| OLD | NEW |