Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(79)

Side by Side Diff: filter/txnBuf/state.go

Issue 1434873003: Fix races in txnBuf (Closed) Base URL: https://github.com/luci/gae.git@race_tests
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package txnBuf 5 package txnBuf
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sync" 9 "sync"
10 10
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 } 87 }
88 return &sizeTracker{k2s, s.total} 88 return &sizeTracker{k2s, s.total}
89 } 89 }
90 90
91 type txnBufState struct { 91 type txnBufState struct {
92 sync.Mutex 92 sync.Mutex
93 93
94 // encoded key -> size of entity. A size of 0 means that the entity is 94 // encoded key -> size of entity. A size of 0 means that the entity is
95 // deleted. 95 // deleted.
96 entState *sizeTracker 96 entState *sizeTracker
97 » memDS datastore.RawInterface 97 » bufDS datastore.RawInterface
98 98
99 roots stringset.Set 99 roots stringset.Set
100 rootLimit int 100 rootLimit int
101 101
102 » aid string 102 » aid string
103 » ns string 103 » ns string
104 » parentDS datastore.RawInterface 104 » parentDS datastore.RawInterface
105 » parentState *txnBufState
106 105
107 // sizeBudget is the number of bytes that this transaction has to operat e 106 // sizeBudget is the number of bytes that this transaction has to operat e
108 // within. It's only used when attempting to apply() the transaction, an d 107 // within. It's only used when attempting to apply() the transaction, an d
109 // it is the threshold for the delta of applying this transaction to the 108 // it is the threshold for the delta of applying this transaction to the
110 // parent transaction. Note that a buffered transaction could actually h ave 109 // parent transaction. Note that a buffered transaction could actually h ave
111 // a negative delta if the parent transaction had many large entities wh ich 110 // a negative delta if the parent transaction had many large entities wh ich
112 // the inner transaction deleted. 111 // the inner transaction deleted.
113 sizeBudget int64 112 sizeBudget int64
114
115 // siblingLock is to prevent two nested transactions from running at the same
116 // time.
117 //
118 // Example:
119 // RunInTransaction() { // root
120 // RunInTransaction() // A
121 // RunInTransaction() // B
122 // }
123 //
124 // This will prevent A and B from running simulatneously.
125 siblingLock sync.Mutex
126 } 113 }
127 114
128 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error { 115 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error {
129 inf := info.Get(ctx) 116 inf := info.Get(ctx)
130 ns := inf.GetNamespace() 117 ns := inf.GetNamespace()
131 118
132 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) 119 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState)
133 roots := stringset.New(0) 120 roots := stringset.New(0)
134 rootLimit := 1 121 rootLimit := 1
135 if opts != nil && opts.XG { 122 if opts != nil && opts.XG {
136 rootLimit = XGTransactionGroupLimit 123 rootLimit = XGTransactionGroupLimit
137 } 124 }
138 sizeBudget := DefaultSizeBudget 125 sizeBudget := DefaultSizeBudget
139 if parentState != nil { 126 if parentState != nil {
140 parentState.siblingLock.Lock()
141 defer parentState.siblingLock.Unlock()
142
143 // TODO(riannucci): this is a bit wonky since it means that a ch ild 127 // TODO(riannucci): this is a bit wonky since it means that a ch ild
144 // transaction declaring XG=true will only get to modify 25 grou ps IF 128 // transaction declaring XG=true will only get to modify 25 grou ps IF
145 // they're same groups affected by the parent transactions. So i nstead of 129 // they're same groups affected by the parent transactions. So i nstead of
146 // respecting opts.XG for inner transactions, we just dup everyt hing from 130 // respecting opts.XG for inner transactions, we just dup everyt hing from
147 // the parent transaction. 131 // the parent transaction.
148 roots = parentState.roots.Dup() 132 roots = parentState.roots.Dup()
149 rootLimit = parentState.rootLimit 133 rootLimit = parentState.rootLimit
150 134
151 sizeBudget = parentState.sizeBudget - parentState.entState.total 135 sizeBudget = parentState.sizeBudget - parentState.entState.total
152 if sizeBudget < DefaultSizeThreshold { 136 if sizeBudget < DefaultSizeThreshold {
153 return ErrTransactionTooLarge 137 return ErrTransactionTooLarge
154 } 138 }
155 } 139 }
156 140
157 » memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) 141 » bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
158 if err != nil { 142 if err != nil {
159 return err 143 return err
160 } 144 }
161 145
162 state := &txnBufState{ 146 state := &txnBufState{
163 » » entState: &sizeTracker{}, 147 » » entState: &sizeTracker{},
164 » » memDS: memDS.Raw(), 148 » » bufDS: bufDS.Raw(),
165 » » roots: roots, 149 » » roots: roots,
166 » » rootLimit: rootLimit, 150 » » rootLimit: rootLimit,
167 » » ns: ns, 151 » » ns: ns,
168 » » aid: inf.AppID(), 152 » » aid: inf.AppID(),
169 » » parentDS: datastore.Get(ctx).Raw(), 153 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLoc k, true)).Raw(),
170 » » parentState: parentState, 154 » » sizeBudget: sizeBudget,
171 » » sizeBudget: sizeBudget,
172 } 155 }
173 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { 156 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil {
174 return err 157 return err
175 } 158 }
176 » return state.apply() 159
160 » // no reason to unlock this ever. At this point it's toast.
161 » state.Lock()
162
163 » if parentState == nil {
164 » » return commitToReal(state)
165 » }
166
167 » if err = parentState.canApply(state); err != nil {
168 » » return err
169 » }
170
171 » parentState.commit(state)
172 » return nil
177 } 173 }
178 174
179 // item is a temporary object for representing key/entity pairs and their cache 175 // item is a temporary object for representing key/entity pairs and their cache
180 // state (e.g. if they exist in the in-memory datastore buffer or not). 176 // state (e.g. if they exist in the in-memory datastore buffer or not).
181 // Additionally item memoizes some common comparison strings. item objects 177 // Additionally item memoizes some common comparison strings. item objects
182 // must never be persisted outside of a single function/query context. 178 // must never be persisted outside of a single function/query context.
183 type item struct { 179 type item struct {
184 key *datastore.Key 180 key *datastore.Key
185 data datastore.PropertyMap 181 data datastore.PropertyMap
186 buffered bool 182 buffered bool
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 // only need to update the roots if they did something that required upd ating 225 // only need to update the roots if they did something that required upd ating
230 if proposedRoots.Len() > 0 { 226 if proposedRoots.Len() > 0 {
231 proposedRoots.Iter(func(root string) bool { 227 proposedRoots.Iter(func(root string) bool {
232 t.roots.Add(root) 228 t.roots.Add(root)
233 return true 229 return true
234 }) 230 })
235 } 231 }
236 return nil 232 return nil
237 } 233 }
238 234
239 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { 235 func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaG etter, cb datastore.GetMultiCB, haveLock bool) error {
240 encKeys, roots := toEncoded(keys) 236 encKeys, roots := toEncoded(keys)
241 » ret := make([]item, len(keys)) 237 » data := make([]item, len(keys))
242 238
243 idxMap := []int(nil) 239 idxMap := []int(nil)
244 toGetKeys := []*datastore.Key(nil) 240 toGetKeys := []*datastore.Key(nil)
245 241
246 » t.Lock() 242 » lme := errors.NewLazyMultiError(len(keys))
247 » defer t.Unlock() 243 » err := func() error {
248 244 » » if !haveLock {
249 » if err := t.updateRootsLocked(roots); err != nil { 245 » » » t.Lock()
250 » » return nil, err 246 » » » defer t.Unlock()
251 » } 247 » » }
252 248
249 » » if err := t.updateRootsLocked(roots); err != nil {
250 » » » return err
251 » » }
252
253 » » for i, key := range keys {
254 » » » data[i].key = key
255 » » » data[i].encKey = encKeys[i]
256 » » » if size, ok := t.entState.get(data[i].getEncKey()); ok {
257 » » » » data[i].buffered = true
258 » » » » if size > 0 {
259 » » » » » idxMap = append(idxMap, i)
260 » » » » » toGetKeys = append(toGetKeys, key)
261 » » » » }
262 » » » }
263 » » }
264
265 » » if len(toGetKeys) > 0 {
266 » » » j := 0
267 » » » t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.Prope rtyMap, err error) {
268 » » » » impossible(err)
269 » » » » data[idxMap[j]].data = pm
270 » » » » j++
271 » » » })
272 » » }
273
274 » » idxMap = nil
275 » » getKeys := []*datastore.Key(nil)
276 » » getMetas := datastore.MultiMetaGetter(nil)
277
278 » » for i, itm := range data {
279 » » » if !itm.buffered {
280 » » » » idxMap = append(idxMap, i)
281 » » » » getKeys = append(getKeys, itm.key)
282 » » » » getMetas = append(getMetas, metas.GetSingle(i))
283 » » » }
284 » » }
285
286 » » if len(idxMap) > 0 {
287 » » » j := 0
288 » » » err := t.parentDS.GetMulti(getKeys, getMetas, func(pm da tastore.PropertyMap, err error) {
289 » » » » if err != datastore.ErrNoSuchEntity {
290 » » » » » i := idxMap[j]
291 » » » » » if !lme.Assign(i, err) {
292 » » » » » » data[i].data = pm
293 » » » » » }
294 » » » » }
295 » » » » j++
296 » » » })
297 » » » if err != nil {
298 » » » » return err
299 » » » }
300 » » }
301 » » return nil
302 » }()
303 » if err != nil {
304 » » return err
305 » }
306
307 » for i, itm := range data {
308 » » err := lme.GetOne(i)
309 » » if err != nil {
310 » » » cb(nil, err)
311 » » } else if itm.data == nil {
312 » » » cb(nil, datastore.ErrNoSuchEntity)
313 » » } else {
314 » » » cb(itm.data, nil)
315 » » }
316 » }
317 » return nil
318 }
319
320 func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMult iCB, haveLock bool) error {
321 » encKeys, roots := toEncoded(keys)
322
323 » err := func() error {
324 » » if !haveLock {
325 » » » t.Lock()
326 » » » defer t.Unlock()
327 » » }
328
329 » » if err := t.updateRootsLocked(roots); err != nil {
330 » » » return err
331 » » }
332
333 » » i := 0
334 » » err := t.bufDS.DeleteMulti(keys, func(err error) {
335 » » » impossible(err)
336 » » » t.entState.set(encKeys[i], 0)
337 » » » i++
338 » » })
339 » » impossible(err)
340 » » return nil
341 » }()
342 » if err != nil {
343 » » return err
344 » }
345
346 » for range keys {
347 » » cb(nil)
348 » }
349
350 » return nil
351 }
352
353 func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) {
354 » lme := errors.NewLazyMultiError(len(keys))
355 » realKeys := []*datastore.Key(nil)
253 for i, key := range keys { 356 for i, key := range keys {
254 » » ret[i].key = key 357 » » if key.Incomplete() {
255 » » ret[i].encKey = encKeys[i] 358 » » » // intentionally call AllocateIDs without lock.
256 » » if size, ok := t.entState.get(ret[i].getEncKey()); ok { 359 » » » start, err := t.parentDS.AllocateIDs(key, 1)
257 » » » ret[i].buffered = true 360 » » » if !lme.Assign(i, err) {
258 » » » if size > 0 { 361 » » » » if realKeys == nil {
259 » » » » idxMap = append(idxMap, i) 362 » » » » » realKeys = make([]*datastore.Key, len(ke ys))
260 » » » » toGetKeys = append(toGetKeys, key) 363 » » » » » copy(realKeys, keys)
261 » » » } 364 » » » » }
262 » » } 365
263 » } 366 » » » » aid, ns, toks := key.Split()
264 367 » » » » toks[len(toks)-1].IntID = start
265 » if len(toGetKeys) > 0 { 368 » » » » realKeys[i] = datastore.NewKeyToks(aid, ns, toks )
266 » » j := 0 369 » » » }
267 » » t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) { 370 » » }
371 » }
372 » err := lme.Get()
373
374 » if realKeys != nil {
375 » » return realKeys, err
376 » }
377 » return keys, err
378 }
379
380 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap, cb datastore.PutMultiCB, haveLock bool) error {
381 » keys, err := t.fixKeys(keys)
382 » if err != nil {
383 » » for _, e := range err.(errors.MultiError) {
384 » » » cb(nil, e)
385 » » }
386 » » return nil
387 » }
388
389 » encKeys, roots := toEncoded(keys)
390
391 » err = func() error {
392 » » if !haveLock {
393 » » » t.Lock()
394 » » » defer t.Unlock()
395 » » }
396
397 » » if err := t.updateRootsLocked(roots); err != nil {
398 » » » return err
399 » » }
400
401 » » i := 0
402 » » err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err e rror) {
268 impossible(err) 403 impossible(err)
269 » » » ret[idxMap[j]].data = pm 404 » » » t.entState.set(encKeys[i], vals[i].EstimateSize())
270 » » » j++ 405 » » » i++
271 }) 406 })
272 » } 407 » » impossible(err)
273 408 » » return nil
274 » return ret, nil 409 » }()
275 } 410 » if err != nil {
276
277 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
278 » encKeys, roots := toEncoded(keys)
279
280 » t.Lock()
281 » defer t.Unlock()
282
283 » if err := t.updateRootsLocked(roots); err != nil {
284 return err 411 return err
285 } 412 }
286 413
287 » i := 0 414 » for _, k := range keys {
288 » err := t.memDS.DeleteMulti(keys, func(err error) { 415 » » cb(k, nil)
289 » » impossible(err) 416 » }
290 » » t.entState.set(encKeys[i], 0) 417 » return nil
291 » » i++ 418 }
292 » }) 419
293 » impossible(err) 420 func commitToReal(s *txnBufState) error {
294 » return nil 421 » toPut, toPutKeys, toDel := s.effect()
295 }
296
297 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error {
298 » encKeys, roots := toEncoded(keys)
299
300 » t.Lock()
301 » defer t.Unlock()
302
303 » if err := t.updateRootsLocked(roots); err != nil {
304 » » return err
305 » }
306
307 » i := 0
308 » err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
309 » » impossible(err)
310 » » t.entState.set(encKeys[i], vals[i].EstimateSize())
311 » » i++
312 » })
313 » impossible(err)
314 » return nil
315 }
316
317 // apply actually takes the buffered transaction and applies it to the parent
318 // transaction. It will only return an error if the underlying 'real' datastore
319 // returns an error on PutMulti or DeleteMulti.
320 func (t *txnBufState) apply() error {
321 » t.Lock()
322 » defer t.Unlock()
323
324 » // if parentState is nil... just try to commit this anyway. The estimate s
325 » // we're using here are just educated guesses. If it fits for real, then
326 » // hooray. If not, then the underlying datastore will error.
327 » if t.parentState != nil {
328 » » t.parentState.Lock()
329 » » proposedState := t.parentState.entState.dup()
330 » » t.parentState.Unlock()
331 » » for k, v := range t.entState.keyToSize {
332 » » » proposedState.set(k, v)
333 » » }
334 » » if proposedState.total > t.sizeBudget {
335 » » » return ErrTransactionTooLarge
336 » » }
337 » }
338
339 » toPutKeys := []*datastore.Key(nil)
340 » toPut := []datastore.PropertyMap(nil)
341 » toDel := []*datastore.Key(nil)
342
343 » // need to pull all items out of the in-memory datastore. Fortunately we have
344 » // kindless queries, and we disabled all the special entities, so just
345 » // run a kindless query without any filters and it will return all data
346 » // currently in memDS :).
347 » fq, err := datastore.NewQuery("").Finalize()
348 » impossible(err)
349
350 » err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool {
351 » » toPutKeys = append(toPutKeys, key)
352 » » toPut = append(toPut, data)
353 » » return true
354 » })
355 » memoryCorruption(err)
356
357 » for keyStr, size := range t.entState.keyToSize {
358 » » if size == 0 {
359 » » » k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
360 » » » memoryCorruption(err)
361 » » » toDel = append(toDel, k)
362 » » }
363 » }
364
365 » ds := t.parentDS
366 422
367 return parallel.FanOutIn(func(ch chan<- func() error) { 423 return parallel.FanOutIn(func(ch chan<- func() error) {
368 if len(toPut) > 0 { 424 if len(toPut) > 0 {
369 ch <- func() error { 425 ch <- func() error {
370 mErr := errors.NewLazyMultiError(len(toPut)) 426 mErr := errors.NewLazyMultiError(len(toPut))
371 i := 0 427 i := 0
372 » » » » err := ds.PutMulti(toPutKeys, toPut, func(_ *dat astore.Key, err error) { 428 » » » » err := s.parentDS.PutMulti(toPutKeys, toPut, fun c(_ *datastore.Key, err error) {
373 mErr.Assign(i, err) 429 mErr.Assign(i, err)
374 i++ 430 i++
375 }) 431 })
376 if err == nil { 432 if err == nil {
377 err = mErr.Get() 433 err = mErr.Get()
378 } 434 }
379 return err 435 return err
380 } 436 }
381 } 437 }
382 if len(toDel) > 0 { 438 if len(toDel) > 0 {
383 ch <- func() error { 439 ch <- func() error {
384 mErr := errors.NewLazyMultiError(len(toDel)) 440 mErr := errors.NewLazyMultiError(len(toDel))
385 i := 0 441 i := 0
386 » » » » err := ds.DeleteMulti(toDel, func(err error) { 442 » » » » err := s.parentDS.DeleteMulti(toDel, func(err er ror) {
387 mErr.Assign(i, err) 443 mErr.Assign(i, err)
388 i++ 444 i++
389 }) 445 })
390 if err == nil { 446 if err == nil {
391 err = mErr.Get() 447 err = mErr.Get()
392 } 448 }
393 return err 449 return err
394 } 450 }
395 } 451 }
396 }) 452 })
397 } 453 }
398 454
455 func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) {
456 // TODO(riannucci): preallocate return slices
457
458 // need to pull all items out of the in-memory datastore. Fortunately we have
459 // kindless queries, and we disabled all the special entities, so just
460 // run a kindless query without any filters and it will return all data
461 // currently in bufDS :).
462 fq, err := datastore.NewQuery("").Finalize()
463 impossible(err)
464
465 err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool {
466 toPutKeys = append(toPutKeys, key)
467 toPut = append(toPut, data)
468 return true
469 })
470 memoryCorruption(err)
471
472 for keyStr, size := range t.entState.keyToSize {
473 if size == 0 {
474 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
475 memoryCorruption(err)
476 toDel = append(toDel, k)
477 }
478 }
479
480 return
481 }
482
483 func (t *txnBufState) canApply(s *txnBufState) error {
dnj 2015/11/11 16:08:28 canApplyLocked?
iannucci 2015/11/11 18:06:40 done
484 proposedState := t.entState.dup()
485
486 for k, v := range s.entState.keyToSize {
487 proposedState.set(k, v)
488 }
489 if proposedState.total > s.sizeBudget {
490 return ErrTransactionTooLarge
491 }
492 return nil
493 }
494
495 func (t *txnBufState) commit(s *txnBufState) {
dnj 2015/11/11 16:08:28 commitLocked?
iannucci 2015/11/11 18:06:40 done
496 toPut, toPutKeys, toDel := s.effect()
497
498 if len(toPut) > 0 {
499 impossible(t.putMulti(toPutKeys, toPut,
500 func(_ *datastore.Key, err error) { impossible(err) }, t rue))
501 }
502
503 if len(toDel) > 0 {
504 impossible(t.deleteMulti(toDel, impossible, true))
505 }
506 }
507
399 // toEncoded returns a list of all of the serialized versions of these keys, 508 // toEncoded returns a list of all of the serialized versions of these keys,
400 // plus a stringset of all the encoded root keys that `keys` represents. 509 // plus a stringset of all the encoded root keys that `keys` represents.
401 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { 510 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
402 roots = stringset.New(len(keys)) 511 roots = stringset.New(len(keys))
403 full = make([]string, len(keys)) 512 full = make([]string, len(keys))
404 for i, k := range keys { 513 for i, k := range keys {
405 roots.Add(string(serialize.ToBytes(k.Root()))) 514 roots.Add(string(serialize.ToBytes(k.Root())))
406 full[i] = string(serialize.ToBytes(k)) 515 full[i] = string(serialize.ToBytes(k))
407 } 516 }
408 return 517 return
409 } 518 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698