Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | |
| 9 "regexp" | 8 "regexp" |
| 9 "sync/atomic" | |
| 10 | 10 |
| 11 "golang.org/x/net/context" | 11 "golang.org/x/net/context" |
| 12 | 12 |
| 13 "github.com/luci/gae/impl/dummy" | |
| 14 tq "github.com/luci/gae/service/taskqueue" | 13 tq "github.com/luci/gae/service/taskqueue" |
| 15 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/mathrand" | 15 "github.com/luci/luci-go/common/mathrand" |
| 17 ) | 16 ) |
| 18 | 17 |
| 19 /////////////////////////////// public functions /////////////////////////////// | 18 /////////////////////////////// public functions /////////////////////////////// |
| 20 | 19 |
| 21 func useTQ(c context.Context) context.Context { | 20 func useTQ(c context.Context) context.Context { |
| 22 » return tq.SetFactory(c, func(ic context.Context) tq.Interface { | 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface { |
| 23 tqd := cur(ic).Get(memContextTQIdx) | 22 tqd := cur(ic).Get(memContextTQIdx) |
| 24 if x, ok := tqd.(*taskQueueData); ok { | 23 if x, ok := tqd.(*taskQueueData); ok { |
| 25 return &taskqueueImpl{ | 24 return &taskqueueImpl{ |
| 26 dummy.TaskQueue(), | |
| 27 x, | 25 x, |
| 28 ic, | 26 ic, |
| 29 curGID(ic).namespace, | 27 curGID(ic).namespace, |
| 30 } | 28 } |
| 31 } | 29 } |
| 32 return &taskqueueTxnImpl{ | 30 return &taskqueueTxnImpl{ |
| 33 dummy.TaskQueue(), | |
| 34 tqd.(*txnTaskQueueData), | 31 tqd.(*txnTaskQueueData), |
| 35 ic, | 32 ic, |
| 36 curGID(ic).namespace, | 33 curGID(ic).namespace, |
| 37 } | 34 } |
| 38 }) | 35 }) |
| 39 } | 36 } |
| 40 | 37 |
| 41 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 38 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 42 | 39 |
| 43 type taskqueueImpl struct { | 40 type taskqueueImpl struct { |
| 44 tq.Interface | |
| 45 *taskQueueData | 41 *taskQueueData |
| 46 | 42 |
| 47 ctx context.Context | 43 ctx context.Context |
| 48 ns string | 44 ns string |
| 49 } | 45 } |
| 50 | 46 |
| 51 var ( | 47 var ( |
| 52 » _ = tq.Interface((*taskqueueImpl)(nil)) | 48 » _ = tq.RawInterface((*taskqueueImpl)(nil)) |
| 53 _ = tq.Testable((*taskqueueImpl)(nil)) | 49 _ = tq.Testable((*taskqueueImpl)(nil)) |
| 54 ) | 50 ) |
| 55 | 51 |
| 56 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) { | 52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er ror) { |
| 57 » toSched, queueName, err := t.prepTask(t.ctx, t.ns, task, queueName) | 53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) |
| 58 if err != nil { | 54 if err != nil { |
| 59 return nil, err | 55 return nil, err |
| 60 } | 56 } |
| 61 | 57 |
| 62 if _, ok := t.archived[queueName][toSched.Name]; ok { | 58 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 63 // SDK converts TOMBSTONE -> already added too | 59 // SDK converts TOMBSTONE -> already added too |
| 64 return nil, tq.ErrTaskAlreadyAdded | 60 return nil, tq.ErrTaskAlreadyAdded |
| 65 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 61 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 66 return nil, tq.ErrTaskAlreadyAdded | 62 return nil, tq.ErrTaskAlreadyAdded |
| 67 } else { | 63 } else { |
| 68 t.named[queueName][toSched.Name] = toSched | 64 t.named[queueName][toSched.Name] = toSched |
| 69 } | 65 } |
| 70 | 66 |
| 71 » return dupTask(toSched), nil | 67 » return toSched.Duplicate(), nil |
| 72 } | |
| 73 | |
| 74 func (t *taskqueueImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { | |
| 75 » t.Lock() | |
| 76 » defer t.Unlock() | |
| 77 » return t.addLocked(task, queueName) | |
| 78 } | 68 } |
| 79 | 69 |
| 80 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { | 70 func (t *taskqueueImpl) deleteLocked(task *tq.Task, queueName string) error { |
| 81 queueName, err := t.getQueueName(queueName) | |
| 82 if err != nil { | |
| 83 return err | |
| 84 } | |
| 85 | |
| 86 if _, ok := t.archived[queueName][task.Name]; ok { | 71 if _, ok := t.archived[queueName][task.Name]; ok { |
| 87 return errors.New("TOMBSTONED_TASK") | 72 return errors.New("TOMBSTONED_TASK") |
| 88 } | 73 } |
| 89 | 74 |
| 90 if _, ok := t.named[queueName][task.Name]; !ok { | 75 if _, ok := t.named[queueName][task.Name]; !ok { |
| 91 return errors.New("UNKNOWN_TASK") | 76 return errors.New("UNKNOWN_TASK") |
| 92 } | 77 } |
| 93 | 78 |
| 94 t.archived[queueName][task.Name] = t.named[queueName][task.Name] | 79 t.archived[queueName][task.Name] = t.named[queueName][task.Name] |
| 95 delete(t.named[queueName], task.Name) | 80 delete(t.named[queueName], task.Name) |
| 96 | 81 |
| 97 return nil | 82 return nil |
| 98 } | 83 } |
| 99 | 84 |
| 100 func (t *taskqueueImpl) Delete(task *tq.Task, queueName string) error { | 85 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTa skCB) error { |
| 101 » t.Lock() | |
| 102 » defer t.Unlock() | |
| 103 » return t.deleteLocked(task, queueName) | |
| 104 } | |
| 105 | |
| 106 func (t *taskqueueImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task , error) { | |
| 107 » t.Lock() | |
| 108 » defer t.Unlock() | |
| 109 » return multi(tasks, queueName, t.addLocked) | |
| 110 } | |
| 111 | |
| 112 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { | |
| 113 t.Lock() | 86 t.Lock() |
| 114 defer t.Unlock() | 87 defer t.Unlock() |
| 115 | 88 |
| 116 » _, err := multi(tasks, queueName, | 89 » queueName, err := t.getQueueName(queueName) |
| 117 » » func(tsk *tq.Task, qn string) (*tq.Task, error) { | 90 » if err != nil { |
| 118 » » » return nil, t.deleteLocked(tsk, qn) | 91 » » return err |
| 119 » » }) | 92 » } |
| 120 » return err | 93 |
| 94 » for _, task := range tasks { | |
| 95 » » cb(t.addLocked(task, queueName)) | |
| 96 » } | |
| 97 » return nil | |
| 98 } | |
| 99 | |
| 100 func (t *taskqueueImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.Ra wCB) error { | |
| 101 » t.Lock() | |
| 102 » defer t.Unlock() | |
| 103 | |
| 104 » queueName, err := t.getQueueName(queueName) | |
| 105 » if err != nil { | |
| 106 » » return err | |
| 107 » } | |
| 108 | |
| 109 » for _, task := range tasks { | |
| 110 » » cb(t.deleteLocked(task, queueName)) | |
|
dnj
2015/08/03 22:37:25
Consider calling "deleteLocked" in the loop, aggre
iannucci
2015/08/04 01:21:21
yeah mumble. I ended up doing a thing for all of t
| |
| 111 » } | |
| 112 » return nil | |
| 113 } | |
| 114 | |
| 115 func (t *taskqueueImpl) Purge(queueName string) error { | |
| 116 » t.Lock() | |
| 117 » defer t.Unlock() | |
| 118 | |
| 119 » queueName, err := t.getQueueName(queueName) | |
| 120 » if err != nil { | |
| 121 » » return err | |
| 122 » } | |
| 123 | |
| 124 » t.named[queueName] = map[string]*tq.Task{} | |
|
dnj
2015/08/03 22:37:25
Consider making this a "taskQueueData" method: res
iannucci
2015/08/04 01:21:21
Done.
| |
| 125 » t.archived[queueName] = map[string]*tq.Task{} | |
| 126 » return nil | |
| 127 } | |
| 128 | |
| 129 func (t *taskqueueImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { | |
| 130 » t.Lock() | |
| 131 » defer t.Unlock() | |
| 132 | |
| 133 » for _, qn := range queueNames { | |
| 134 » » qn, err := t.getQueueName(qn) | |
| 135 » » if err != nil { | |
| 136 » » » cb(nil, err) | |
| 137 » » } else { | |
| 138 » » » s := tq.Statistics{ | |
| 139 » » » » Tasks: len(t.named[qn]), | |
| 140 » » » } | |
| 141 » » » for _, t := range t.named[qn] { | |
| 142 » » » » if s.OldestETA.IsZero() { | |
| 143 » » » » » s.OldestETA = t.ETA | |
| 144 » » » » } else if t.ETA.Before(s.OldestETA) { | |
| 145 » » » » » s.OldestETA = t.ETA | |
| 146 » » » » } | |
| 147 » » » } | |
| 148 » » » cb(&s, nil) | |
| 149 » » } | |
| 150 » } | |
| 151 | |
| 152 » return nil | |
| 121 } | 153 } |
| 122 | 154 |
| 123 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 155 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 124 | 156 |
| 125 type taskqueueTxnImpl struct { | 157 type taskqueueTxnImpl struct { |
| 126 tq.Interface | |
| 127 *txnTaskQueueData | 158 *txnTaskQueueData |
| 128 | 159 |
| 129 ctx context.Context | 160 ctx context.Context |
| 130 ns string | 161 ns string |
| 131 } | 162 } |
| 132 | 163 |
| 133 var _ interface { | 164 var _ interface { |
| 134 » tq.Interface | 165 » tq.RawInterface |
| 135 tq.Testable | 166 tq.Testable |
| 136 } = (*taskqueueTxnImpl)(nil) | 167 } = (*taskqueueTxnImpl)(nil) |
| 137 | 168 |
| 138 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { | 169 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, error) { |
| 139 » toSched, queueName, err := t.parent.prepTask(t.ctx, t.ns, task, queueNam e) | 170 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) |
| 140 if err != nil { | 171 if err != nil { |
| 141 return nil, err | 172 return nil, err |
| 142 } | 173 } |
| 143 | 174 |
| 144 numTasks := 0 | 175 numTasks := 0 |
| 145 for _, vs := range t.anony { | 176 for _, vs := range t.anony { |
| 146 numTasks += len(vs) | 177 numTasks += len(vs) |
| 147 } | 178 } |
| 148 if numTasks+1 > 5 { | 179 if numTasks+1 > 5 { |
| 149 // transactional tasks are actually implemented 'for real' as Ac tions which | 180 // transactional tasks are actually implemented 'for real' as Ac tions which |
| 150 // ride on the datastore. The current datastore implementation o nly allows | 181 // ride on the datastore. The current datastore implementation o nly allows |
| 151 // a maximum of 5 Actions per transaction, and more than that re sult in a | 182 // a maximum of 5 Actions per transaction, and more than that re sult in a |
| 152 // BAD_REQUEST. | 183 // BAD_REQUEST. |
| 153 return nil, errors.New("BAD_REQUEST") | 184 return nil, errors.New("BAD_REQUEST") |
| 154 } | 185 } |
| 155 | 186 |
| 156 t.anony[queueName] = append(t.anony[queueName], toSched) | 187 t.anony[queueName] = append(t.anony[queueName], toSched) |
| 157 | 188 |
| 158 // the fact that we have generated a unique name for this task queue ite m is | 189 // the fact that we have generated a unique name for this task queue ite m is |
| 159 // an implementation detail. | 190 // an implementation detail. |
| 160 // TODO(riannucci): now that I think about this... it may not actually b e true. | 191 // TODO(riannucci): now that I think about this... it may not actually b e true. |
| 161 // We should verify that the .Name for a task added in a tr ansaction is | 192 // We should verify that the .Name for a task added in a tr ansaction is |
| 162 // meaningless. Maybe names generated in a transaction are somehow | 193 // meaningless. Maybe names generated in a transaction are somehow |
| 163 // guaranteed to be meaningful? | 194 // guaranteed to be meaningful? |
| 164 » toRet := dupTask(toSched) | 195 » toRet := toSched.Duplicate() |
| 165 toRet.Name = "" | 196 toRet.Name = "" |
| 166 | 197 |
| 167 return toRet, nil | 198 return toRet, nil |
| 168 } | 199 } |
| 169 | 200 |
| 170 func (t *taskqueueTxnImpl) Add(task *tq.Task, queueName string) (retTask *tq.Tas k, err error) { | 201 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra wTaskCB) error { |
| 171 » err = t.run(func() (err error) { | 202 » if atomic.LoadInt32(&t.closed) == 1 { |
| 172 » » t.Lock() | 203 » » return errors.New("taskqueue: transaction context has expired") |
| 173 » » defer t.Unlock() | 204 » } |
| 174 » » retTask, err = t.addLocked(task, queueName) | 205 |
| 175 » » return | 206 » t.Lock() |
| 176 » }) | 207 » defer t.Unlock() |
| 177 » return | 208 |
| 209 » queueName, err := t.parent.getQueueName(queueName) | |
| 210 » if err != nil { | |
| 211 » » return err | |
| 212 » } | |
| 213 | |
| 214 » for _, task := range tasks { | |
| 215 » » cb(t.addLocked(task, queueName)) | |
| 216 » } | |
| 217 » return nil | |
| 178 } | 218 } |
| 179 | 219 |
| 180 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string) (retTask s []*tq.Task, err error) { | 220 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { |
| 181 » err = t.run(func() (err error) { | 221 » return errors.New("taskqueue: cannot DeleteMulti from a transaction") |
| 182 » » t.Lock() | 222 } |
| 183 » » defer t.Unlock() | 223 |
| 184 » » retTasks, err = multi(tasks, queueName, t.addLocked) | 224 func (t *taskqueueTxnImpl) Purge(string) error { |
| 185 » » return | 225 » return errors.New("taskqueue: cannot Purge from a transaction") |
| 186 » }) | 226 } |
| 187 » return | 227 |
| 228 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { | |
| 229 » return errors.New("taskqueue: cannot Stats from a transaction") | |
| 188 } | 230 } |
| 189 | 231 |
| 190 ////////////////////////////// private functions /////////////////////////////// | 232 ////////////////////////////// private functions /////////////////////////////// |
| 191 | 233 |
| 192 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 234 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| 193 | 235 |
| 194 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" | 236 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST UVWXYZ-_" |
| 195 | 237 |
| 196 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { | 238 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
| 197 _, ok := queue[cur] | 239 _, ok := queue[cur] |
| 198 for !ok && cur == "" { | 240 for !ok && cur == "" { |
| 199 name := [500]byte{} | 241 name := [500]byte{} |
| 200 for i := 0; i < 500; i++ { | 242 for i := 0; i < 500; i++ { |
| 201 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))] | 243 name[i] = validTaskChars[mathrand.Get(c).Intn(len(validT askChars))] |
| 202 } | 244 } |
| 203 cur = string(name[:]) | 245 cur = string(name[:]) |
| 204 _, ok = queue[cur] | 246 _, ok = queue[cur] |
| 205 } | 247 } |
| 206 return cur | 248 return cur |
| 207 } | 249 } |
| 208 | 250 |
| 209 func multi(tasks []*tq.Task, queueName string, f func(*tq.Task, string) (*tq.Tas k, error)) ([]*tq.Task, error) { | |
| 210 ret := []*tq.Task(nil) | |
| 211 lme := errors.LazyMultiError{Size: len(tasks)} | |
| 212 for i, task := range tasks { | |
| 213 rt, err := f(task, queueName) | |
| 214 ret = append(ret, rt) | |
| 215 lme.Assign(i, err) | |
| 216 } | |
| 217 return ret, lme.Get() | |
| 218 } | |
| 219 | |
| 220 func dupTask(t *tq.Task) *tq.Task { | |
| 221 ret := &tq.Task{} | |
| 222 *ret = *t | |
| 223 | |
| 224 if t.Header != nil { | |
| 225 ret.Header = make(http.Header, len(t.Header)) | |
| 226 for k, vs := range t.Header { | |
| 227 newVs := make([]string, len(vs)) | |
| 228 copy(newVs, vs) | |
| 229 ret.Header[k] = newVs | |
| 230 } | |
| 231 } | |
| 232 | |
| 233 if t.Payload != nil { | |
| 234 ret.Payload = make([]byte, len(t.Payload)) | |
| 235 copy(ret.Payload, t.Payload) | |
| 236 } | |
| 237 | |
| 238 if t.RetryOptions != nil { | |
| 239 ret.RetryOptions = &tq.RetryOptions{} | |
| 240 *ret.RetryOptions = *t.RetryOptions | |
| 241 } | |
| 242 | |
| 243 return ret | |
| 244 } | |
| 245 | |
| 246 func dupQueue(q tq.QueueData) tq.QueueData { | 251 func dupQueue(q tq.QueueData) tq.QueueData { |
| 247 r := make(tq.QueueData, len(q)) | 252 r := make(tq.QueueData, len(q)) |
| 248 for k, q := range q { | 253 for k, q := range q { |
| 249 r[k] = make(map[string]*tq.Task, len(q)) | 254 r[k] = make(map[string]*tq.Task, len(q)) |
| 250 for tn, t := range q { | 255 for tn, t := range q { |
| 251 » » » r[k][tn] = dupTask(t) | 256 » » » r[k][tn] = t.Duplicate() |
| 252 } | 257 } |
| 253 } | 258 } |
| 254 return r | 259 return r |
| 255 } | 260 } |
| OLD | NEW |