| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "regexp" | 8 "regexp" |
| 9 "sync/atomic" | |
| 10 | 9 |
| 11 "golang.org/x/net/context" | 10 "golang.org/x/net/context" |
| 12 | 11 |
| 13 tq "github.com/luci/gae/service/taskqueue" | 12 tq "github.com/luci/gae/service/taskqueue" |
| 13 |
| 14 "github.com/luci/luci-go/common/data/rand/mathrand" | 14 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 /////////////////////////////// public functions /////////////////////////////// | 18 /////////////////////////////// public functions /////////////////////////////// |
| 19 | 19 |
| 20 func useTQ(c context.Context) context.Context { | 20 func useTQ(c context.Context) context.Context { |
| 21 » return tq.SetRawFactory(c, func(ic context.Context, wantTxn bool) tq.Raw
Interface { | 21 » return tq.SetRawFactory(c, func(ic context.Context) tq.RawInterface { |
| 22 » » ns, _ := curGID(ic).getNamespace() | 22 » » memCtx, isTxn := cur(ic) |
| 23 » » var tqd memContextObj | 23 » » tqd := memCtx.Get(memContextTQIdx) |
| 24 | 24 |
| 25 » » if !wantTxn { | 25 » » if isTxn { |
| 26 » » » tqd = curNoTxn(ic).Get(memContextTQIdx) | 26 » » » return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic} |
| 27 » » } else { | |
| 28 » » » tqd = cur(ic).Get(memContextTQIdx) | |
| 29 } | 27 } |
| 30 | 28 » » return &taskqueueImpl{tqd.(*taskQueueData), ic} |
| 31 » » if x, ok := tqd.(*taskQueueData); ok { | |
| 32 » » » return &taskqueueImpl{x, ic, ns} | |
| 33 » » } | |
| 34 » » return &taskqueueTxnImpl{tqd.(*txnTaskQueueData), ic, ns} | |
| 35 }) | 29 }) |
| 36 } | 30 } |
| 37 | 31 |
| 38 //////////////////////////////// taskqueueImpl ///////////////////////////////// | 32 //////////////////////////////// taskqueueImpl ///////////////////////////////// |
| 39 | 33 |
| 40 type taskqueueImpl struct { | 34 type taskqueueImpl struct { |
| 41 *taskQueueData | 35 *taskQueueData |
| 42 | 36 |
| 43 ctx context.Context | 37 ctx context.Context |
| 44 ns string | |
| 45 } | 38 } |
| 46 | 39 |
| 47 var ( | 40 var ( |
| 48 _ = tq.RawInterface((*taskqueueImpl)(nil)) | 41 _ = tq.RawInterface((*taskqueueImpl)(nil)) |
| 49 _ = tq.Testable((*taskqueueImpl)(nil)) | 42 _ = tq.Testable((*taskqueueImpl)(nil)) |
| 50 ) | 43 ) |
| 51 | 44 |
| 52 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { | 45 func (t *taskqueueImpl) addLocked(task *tq.Task, queueName string) (*tq.Task, er
ror) { |
| 53 » toSched, err := t.prepTask(t.ctx, t.ns, task, queueName) | 46 » toSched, err := t.prepTask(t.ctx, task, queueName) |
| 54 if err != nil { | 47 if err != nil { |
| 55 return nil, err | 48 return nil, err |
| 56 } | 49 } |
| 57 | 50 |
| 58 if _, ok := t.archived[queueName][toSched.Name]; ok { | 51 if _, ok := t.archived[queueName][toSched.Name]; ok { |
| 59 // SDK converts TOMBSTONE -> already added too | 52 // SDK converts TOMBSTONE -> already added too |
| 60 return nil, tq.ErrTaskAlreadyAdded | 53 return nil, tq.ErrTaskAlreadyAdded |
| 61 } else if _, ok := t.named[queueName][toSched.Name]; ok { | 54 } else if _, ok := t.named[queueName][toSched.Name]; ok { |
| 62 return nil, tq.ErrTaskAlreadyAdded | 55 return nil, tq.ErrTaskAlreadyAdded |
| 63 } else { | 56 } else { |
| (...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 138 s.OldestETA = t.ETA | 131 s.OldestETA = t.ETA |
| 139 } | 132 } |
| 140 } | 133 } |
| 141 cb(&s, nil) | 134 cb(&s, nil) |
| 142 } | 135 } |
| 143 } | 136 } |
| 144 | 137 |
| 145 return nil | 138 return nil |
| 146 } | 139 } |
| 147 | 140 |
| 148 func (t *taskqueueImpl) Testable() tq.Testable { | 141 func (t *taskqueueImpl) GetTestable() tq.Testable { return t } |
| 149 » return t | |
| 150 } | |
| 151 | 142 |
| 152 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// | 143 /////////////////////////////// taskqueueTxnImpl /////////////////////////////// |
| 153 | 144 |
| 154 type taskqueueTxnImpl struct { | 145 type taskqueueTxnImpl struct { |
| 155 *txnTaskQueueData | 146 *txnTaskQueueData |
| 156 | 147 |
| 157 ctx context.Context | 148 ctx context.Context |
| 158 ns string | |
| 159 } | 149 } |
| 160 | 150 |
| 161 var _ interface { | 151 var _ interface { |
| 162 tq.RawInterface | 152 tq.RawInterface |
| 163 tq.Testable | 153 tq.Testable |
| 164 } = (*taskqueueTxnImpl)(nil) | 154 } = (*taskqueueTxnImpl)(nil) |
| 165 | 155 |
| 166 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { | 156 func (t *taskqueueTxnImpl) addLocked(task *tq.Task, queueName string) (*tq.Task,
error) { |
| 167 » toSched, err := t.parent.prepTask(t.ctx, t.ns, task, queueName) | 157 » toSched, err := t.parent.prepTask(t.ctx, task, queueName) |
| 168 if err != nil { | 158 if err != nil { |
| 169 return nil, err | 159 return nil, err |
| 170 } | 160 } |
| 171 | 161 |
| 172 numTasks := 0 | 162 numTasks := 0 |
| 173 for _, vs := range t.anony { | 163 for _, vs := range t.anony { |
| 174 numTasks += len(vs) | 164 numTasks += len(vs) |
| 175 } | 165 } |
| 176 if numTasks+1 > 5 { | 166 if numTasks+1 > 5 { |
| 177 // transactional tasks are actually implemented 'for real' as Ac
tions which | 167 // transactional tasks are actually implemented 'for real' as Ac
tions which |
| (...skipping 11 matching lines...) Expand all Loading... |
| 189 // We should verify that the .Name for a task added in a tr
ansaction is | 179 // We should verify that the .Name for a task added in a tr
ansaction is |
| 190 // meaningless. Maybe names generated in a transaction are
somehow | 180 // meaningless. Maybe names generated in a transaction are
somehow |
| 191 // guaranteed to be meaningful? | 181 // guaranteed to be meaningful? |
| 192 toRet := toSched.Duplicate() | 182 toRet := toSched.Duplicate() |
| 193 toRet.Name = "" | 183 toRet.Name = "" |
| 194 | 184 |
| 195 return toRet, nil | 185 return toRet, nil |
| 196 } | 186 } |
| 197 | 187 |
| 198 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra
wTaskCB) error { | 188 func (t *taskqueueTxnImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.Ra
wTaskCB) error { |
| 199 » if atomic.LoadInt32(&t.closed) == 1 { | 189 » if err := assertTxnValid(t.ctx); err != nil { |
| 200 » » return errors.New("taskqueue: transaction context has expired") | 190 » » return err |
| 201 } | 191 } |
| 202 | 192 |
| 203 t.Lock() | 193 t.Lock() |
| 204 defer t.Unlock() | 194 defer t.Unlock() |
| 205 | 195 |
| 206 queueName, err := t.parent.getQueueNameLocked(queueName) | 196 queueName, err := t.parent.getQueueNameLocked(queueName) |
| 207 if err != nil { | 197 if err != nil { |
| 208 return err | 198 return err |
| 209 } | 199 } |
| 210 | 200 |
| 211 for _, task := range tasks { | 201 for _, task := range tasks { |
| 212 cb(t.addLocked(task, queueName)) | 202 cb(t.addLocked(task, queueName)) |
| 213 } | 203 } |
| 214 return nil | 204 return nil |
| 215 } | 205 } |
| 216 | 206 |
| 217 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { | 207 func (t *taskqueueTxnImpl) DeleteMulti([]*tq.Task, string, tq.RawCB) error { |
| 218 return errors.New("taskqueue: cannot DeleteMulti from a transaction") | 208 return errors.New("taskqueue: cannot DeleteMulti from a transaction") |
| 219 } | 209 } |
| 220 | 210 |
| 221 func (t *taskqueueTxnImpl) Purge(string) error { | 211 func (t *taskqueueTxnImpl) Purge(string) error { |
| 222 return errors.New("taskqueue: cannot Purge from a transaction") | 212 return errors.New("taskqueue: cannot Purge from a transaction") |
| 223 } | 213 } |
| 224 | 214 |
| 225 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { | 215 func (t *taskqueueTxnImpl) Stats([]string, tq.RawStatsCB) error { |
| 226 return errors.New("taskqueue: cannot Stats from a transaction") | 216 return errors.New("taskqueue: cannot Stats from a transaction") |
| 227 } | 217 } |
| 228 | 218 |
| 229 func (t *taskqueueTxnImpl) Testable() tq.Testable { | 219 func (t *taskqueueTxnImpl) GetTestable() tq.Testable { return t } |
| 230 » return t | |
| 231 } | |
| 232 | 220 |
| 233 ////////////////////////////// private functions /////////////////////////////// | 221 ////////////////////////////// private functions /////////////////////////////// |
| 234 | 222 |
| 235 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") | 223 var validTaskName = regexp.MustCompile("^[0-9a-zA-Z\\-\\_]{0,500}$") |
| 236 | 224 |
| 237 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" | 225 const validTaskChars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
UVWXYZ-_" |
| 238 | 226 |
| 239 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { | 227 func mkName(c context.Context, cur string, queue map[string]*tq.Task) string { |
| 240 _, ok := queue[cur] | 228 _, ok := queue[cur] |
| 241 for !ok && cur == "" { | 229 for !ok && cur == "" { |
| (...skipping 10 matching lines...) Expand all Loading... |
| 252 func dupQueue(q tq.QueueData) tq.QueueData { | 240 func dupQueue(q tq.QueueData) tq.QueueData { |
| 253 r := make(tq.QueueData, len(q)) | 241 r := make(tq.QueueData, len(q)) |
| 254 for k, q := range q { | 242 for k, q := range q { |
| 255 r[k] = make(map[string]*tq.Task, len(q)) | 243 r[k] = make(map[string]*tq.Task, len(q)) |
| 256 for tn, t := range q { | 244 for tn, t := range q { |
| 257 r[k][tn] = t.Duplicate() | 245 r[k][tn] = t.Duplicate() |
| 258 } | 246 } |
| 259 } | 247 } |
| 260 return r | 248 return r |
| 261 } | 249 } |
| OLD | NEW |