Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(573)

Unified Diff: filter/txnBuf/state.go

Issue 1434873003: Fix races in txnBuf (Closed) Base URL: https://github.com/luci/gae.git@race_tests
Patch Set: fix stuff Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « filter/txnBuf/ds_txn.go ('k') | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: filter/txnBuf/state.go
diff --git a/filter/txnBuf/state.go b/filter/txnBuf/state.go
index 080a92adc65a740b167f92a2f2a6f2785ecef871..46762945225d61ec1a2bcc2534f25cd75cf23147 100644
--- a/filter/txnBuf/state.go
+++ b/filter/txnBuf/state.go
@@ -94,15 +94,14 @@ type txnBufState struct {
// encoded key -> size of entity. A size of 0 means that the entity is
// deleted.
entState *sizeTracker
- memDS datastore.RawInterface
+ bufDS datastore.RawInterface
roots stringset.Set
rootLimit int
- aid string
- ns string
- parentDS datastore.RawInterface
- parentState *txnBufState
+ aid string
+ ns string
+ parentDS datastore.RawInterface
// sizeBudget is the number of bytes that this transaction has to operate
// within. It's only used when attempting to apply() the transaction, and
@@ -111,18 +110,6 @@ type txnBufState struct {
// a negative delta if the parent transaction had many large entities which
// the inner transaction deleted.
sizeBudget int64
-
- // siblingLock is to prevent two nested transactions from running at the same
- // time.
- //
- // Example:
- // RunInTransaction() { // root
- // RunInTransaction() // A
- // RunInTransaction() // B
- // }
- //
- // This will prevent A and B from running simulatneously.
- siblingLock sync.Mutex
}
func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datastore.TransactionOptions) error {
@@ -137,9 +124,6 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
}
sizeBudget := DefaultSizeBudget
if parentState != nil {
- parentState.siblingLock.Lock()
- defer parentState.siblingLock.Unlock()
-
// TODO(riannucci): this is a bit wonky since it means that a child
// transaction declaring XG=true will only get to modify 25 groups IF
// they're same groups affected by the parent transactions. So instead of
@@ -154,26 +138,38 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
}
}
- memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
+ bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
if err != nil {
return err
}
state := &txnBufState{
- entState: &sizeTracker{},
- memDS: memDS.Raw(),
- roots: roots,
- rootLimit: rootLimit,
- ns: ns,
- aid: inf.AppID(),
- parentDS: datastore.Get(ctx).Raw(),
- parentState: parentState,
- sizeBudget: sizeBudget,
+ entState: &sizeTracker{},
+ bufDS: bufDS.Raw(),
+ roots: roots,
+ rootLimit: rootLimit,
+ ns: ns,
+ aid: inf.AppID(),
+ parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLock, true)).Raw(),
+ sizeBudget: sizeBudget,
}
if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil {
return err
}
- return state.apply()
+
+ // no reason to unlock this ever. At this point it's toast.
+ state.Lock()
+
+ if parentState == nil {
+ return commitToReal(state)
+ }
+
+ if err = parentState.canApplyLocked(state); err != nil {
+ return err
+ }
+
+ parentState.commitLocked(state)
+ return nil
}
// item is a temporary object for representing key/entity pairs and their cache
@@ -236,140 +232,200 @@ func (t *txnBufState) updateRootsLocked(roots stringset.Set) error {
return nil
}
-func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) {
+func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaGetter, cb datastore.GetMultiCB, haveLock bool) error {
encKeys, roots := toEncoded(keys)
- ret := make([]item, len(keys))
+ data := make([]item, len(keys))
idxMap := []int(nil)
toGetKeys := []*datastore.Key(nil)
- t.Lock()
- defer t.Unlock()
+ lme := errors.NewLazyMultiError(len(keys))
+ err := func() error {
+ if !haveLock {
+ t.Lock()
+ defer t.Unlock()
+ }
- if err := t.updateRootsLocked(roots); err != nil {
- return nil, err
- }
+ if err := t.updateRootsLocked(roots); err != nil {
+ return err
+ }
- for i, key := range keys {
- ret[i].key = key
- ret[i].encKey = encKeys[i]
- if size, ok := t.entState.get(ret[i].getEncKey()); ok {
- ret[i].buffered = true
- if size > 0 {
- idxMap = append(idxMap, i)
- toGetKeys = append(toGetKeys, key)
+ for i, key := range keys {
+ data[i].key = key
+ data[i].encKey = encKeys[i]
+ if size, ok := t.entState.get(data[i].getEncKey()); ok {
+ data[i].buffered = true
+ if size > 0 {
+ idxMap = append(idxMap, i)
+ toGetKeys = append(toGetKeys, key)
+ }
}
}
- }
-
- if len(toGetKeys) > 0 {
- j := 0
- t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
- impossible(err)
- ret[idxMap[j]].data = pm
- j++
- })
- }
- return ret, nil
-}
+ if len(toGetKeys) > 0 {
+ j := 0
+ t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
+ impossible(err)
+ data[idxMap[j]].data = pm
+ j++
+ })
+ }
-func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
- encKeys, roots := toEncoded(keys)
+ idxMap = nil
+ getKeys := []*datastore.Key(nil)
+ getMetas := datastore.MultiMetaGetter(nil)
- t.Lock()
- defer t.Unlock()
+ for i, itm := range data {
+ if !itm.buffered {
+ idxMap = append(idxMap, i)
+ getKeys = append(getKeys, itm.key)
+ getMetas = append(getMetas, metas.GetSingle(i))
+ }
+ }
- if err := t.updateRootsLocked(roots); err != nil {
+ if len(idxMap) > 0 {
+ j := 0
+ err := t.parentDS.GetMulti(getKeys, getMetas, func(pm datastore.PropertyMap, err error) {
+ if err != datastore.ErrNoSuchEntity {
+ i := idxMap[j]
+ if !lme.Assign(i, err) {
+ data[i].data = pm
+ }
+ }
+ j++
+ })
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }()
+ if err != nil {
return err
}
- i := 0
- err := t.memDS.DeleteMulti(keys, func(err error) {
- impossible(err)
- t.entState.set(encKeys[i], 0)
- i++
- })
- impossible(err)
+ for i, itm := range data {
+ err := lme.GetOne(i)
+ if err != nil {
+ cb(nil, err)
+ } else if itm.data == nil {
+ cb(nil, datastore.ErrNoSuchEntity)
+ } else {
+ cb(itm.data, nil)
+ }
+ }
return nil
}
-func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap) error {
+func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMultiCB, haveLock bool) error {
encKeys, roots := toEncoded(keys)
- t.Lock()
- defer t.Unlock()
+ err := func() error {
+ if !haveLock {
+ t.Lock()
+ defer t.Unlock()
+ }
+
+ if err := t.updateRootsLocked(roots); err != nil {
+ return err
+ }
- if err := t.updateRootsLocked(roots); err != nil {
+ i := 0
+ err := t.bufDS.DeleteMulti(keys, func(err error) {
+ impossible(err)
+ t.entState.set(encKeys[i], 0)
+ i++
+ })
+ impossible(err)
+ return nil
+ }()
+ if err != nil {
return err
}
- i := 0
- err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
- impossible(err)
- t.entState.set(encKeys[i], vals[i].EstimateSize())
- i++
- })
- impossible(err)
+ for range keys {
+ cb(nil)
+ }
+
return nil
}
-// apply actually takes the buffered transaction and applies it to the parent
-// transaction. It will only return an error if the underlying 'real' datastore
-// returns an error on PutMulti or DeleteMulti.
-func (t *txnBufState) apply() error {
- t.Lock()
- defer t.Unlock()
-
- // if parentState is nil... just try to commit this anyway. The estimates
- // we're using here are just educated guesses. If it fits for real, then
- // hooray. If not, then the underlying datastore will error.
- if t.parentState != nil {
- t.parentState.Lock()
- proposedState := t.parentState.entState.dup()
- t.parentState.Unlock()
- for k, v := range t.entState.keyToSize {
- proposedState.set(k, v)
- }
- if proposedState.total > t.sizeBudget {
- return ErrTransactionTooLarge
+func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) {
+ lme := errors.NewLazyMultiError(len(keys))
+ realKeys := []*datastore.Key(nil)
+ for i, key := range keys {
+ if key.Incomplete() {
+ // intentionally call AllocateIDs without lock.
+ start, err := t.parentDS.AllocateIDs(key, 1)
+ if !lme.Assign(i, err) {
+ if realKeys == nil {
+ realKeys = make([]*datastore.Key, len(keys))
+ copy(realKeys, keys)
+ }
+
+ aid, ns, toks := key.Split()
+ toks[len(toks)-1].IntID = start
+ realKeys[i] = datastore.NewKeyToks(aid, ns, toks)
+ }
}
}
+ err := lme.Get()
- toPutKeys := []*datastore.Key(nil)
- toPut := []datastore.PropertyMap(nil)
- toDel := []*datastore.Key(nil)
+ if realKeys != nil {
+ return realKeys, err
+ }
+ return keys, err
+}
- // need to pull all items out of the in-memory datastore. Fortunately we have
- // kindless queries, and we disabled all the special entities, so just
- // run a kindless query without any filters and it will return all data
- // currently in memDS :).
- fq, err := datastore.NewQuery("").Finalize()
- impossible(err)
+func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap, cb datastore.PutMultiCB, haveLock bool) error {
+ keys, err := t.fixKeys(keys)
+ if err != nil {
+ for _, e := range err.(errors.MultiError) {
+ cb(nil, e)
+ }
+ return nil
+ }
- err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) bool {
- toPutKeys = append(toPutKeys, key)
- toPut = append(toPut, data)
- return true
- })
- memoryCorruption(err)
+ encKeys, roots := toEncoded(keys)
- for keyStr, size := range t.entState.keyToSize {
- if size == 0 {
- k, err := serialize.ReadKey(bytes.NewBufferString(keyStr), serialize.WithoutContext, t.aid, t.ns)
- memoryCorruption(err)
- toDel = append(toDel, k)
+ err = func() error {
+ if !haveLock {
+ t.Lock()
+ defer t.Unlock()
+ }
+
+ if err := t.updateRootsLocked(roots); err != nil {
+ return err
}
+
+ i := 0
+ err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
+ impossible(err)
+ t.entState.set(encKeys[i], vals[i].EstimateSize())
+ i++
+ })
+ impossible(err)
+ return nil
+ }()
+ if err != nil {
+ return err
}
- ds := t.parentDS
+ for _, k := range keys {
+ cb(k, nil)
+ }
+ return nil
+}
+
+func commitToReal(s *txnBufState) error {
+ toPut, toPutKeys, toDel := s.effect()
return parallel.FanOutIn(func(ch chan<- func() error) {
if len(toPut) > 0 {
ch <- func() error {
mErr := errors.NewLazyMultiError(len(toPut))
i := 0
- err := ds.PutMulti(toPutKeys, toPut, func(_ *datastore.Key, err error) {
+ err := s.parentDS.PutMulti(toPutKeys, toPut, func(_ *datastore.Key, err error) {
mErr.Assign(i, err)
i++
})
@@ -383,7 +439,7 @@ func (t *txnBufState) apply() error {
ch <- func() error {
mErr := errors.NewLazyMultiError(len(toDel))
i := 0
- err := ds.DeleteMulti(toDel, func(err error) {
+ err := s.parentDS.DeleteMulti(toDel, func(err error) {
mErr.Assign(i, err)
i++
})
@@ -396,6 +452,59 @@ func (t *txnBufState) apply() error {
})
}
+func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) {
+ // TODO(riannucci): preallocate return slices
+
+ // need to pull all items out of the in-memory datastore. Fortunately we have
+ // kindless queries, and we disabled all the special entities, so just
+ // run a kindless query without any filters and it will return all data
+ // currently in bufDS :).
+ fq, err := datastore.NewQuery("").Finalize()
+ impossible(err)
+
+ err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) bool {
+ toPutKeys = append(toPutKeys, key)
+ toPut = append(toPut, data)
+ return true
+ })
+ memoryCorruption(err)
+
+ for keyStr, size := range t.entState.keyToSize {
+ if size == 0 {
+ k, err := serialize.ReadKey(bytes.NewBufferString(keyStr), serialize.WithoutContext, t.aid, t.ns)
+ memoryCorruption(err)
+ toDel = append(toDel, k)
+ }
+ }
+
+ return
+}
+
+func (t *txnBufState) canApplyLocked(s *txnBufState) error {
+ proposedState := t.entState.dup()
+
+ for k, v := range s.entState.keyToSize {
+ proposedState.set(k, v)
+ }
+ if proposedState.total > s.sizeBudget {
+ return ErrTransactionTooLarge
+ }
+ return nil
+}
+
+func (t *txnBufState) commitLocked(s *txnBufState) {
+ toPut, toPutKeys, toDel := s.effect()
+
+ if len(toPut) > 0 {
+ impossible(t.putMulti(toPutKeys, toPut,
+ func(_ *datastore.Key, err error) { impossible(err) }, true))
+ }
+
+ if len(toDel) > 0 {
+ impossible(t.deleteMulti(toDel, impossible, true))
+ }
+}
+
// toEncoded returns a list of all of the serialized versions of these keys,
// plus a stringset of all the encoded root keys that `keys` represents.
func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
« no previous file with comments | « filter/txnBuf/ds_txn.go ('k') | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698