Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1978)

Unified Diff: filter/txnBuf/ds_txn.go

Issue 1434873003: Fix races in txnBuf (Closed) Base URL: https://github.com/luci/gae.git@race_tests
Patch Set: fix stuff Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « filter/txnBuf/ds.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: filter/txnBuf/ds_txn.go
diff --git a/filter/txnBuf/ds_txn.go b/filter/txnBuf/ds_txn.go
index ef426ef55164d332385d830744a3d6672a7a8df0..b04f354f9068050a76db763e9867a9bdec75acf4 100644
--- a/filter/txnBuf/ds_txn.go
+++ b/filter/txnBuf/ds_txn.go
@@ -21,8 +21,9 @@ var ErrTooManyRoots = errors.New(
"operating on too many entity groups in nested transaction")
type dsTxnBuf struct {
- ic context.Context
- state *txnBufState
+ ic context.Context
+ state *txnBufState
+ haveLock bool
}
var _ ds.RawInterface = (*dsTxnBuf)(nil)
@@ -36,105 +37,15 @@ func (d *dsTxnBuf) AllocateIDs(incomplete *ds.Key, n int) (start int64, err erro
}
func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error {
- data, err := d.state.getMulti(keys)
- if err != nil {
- return err
- }
-
- idxMap := []int(nil)
- getKeys := []*ds.Key(nil)
- getMetas := ds.MultiMetaGetter(nil)
- lme := errors.NewLazyMultiError(len(keys))
-
- for i, itm := range data {
- if !itm.buffered {
- idxMap = append(idxMap, i)
- getKeys = append(getKeys, itm.key)
- getMetas = append(getMetas, metas.GetSingle(i))
- }
- }
-
- if len(idxMap) > 0 {
- j := 0
- err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.PropertyMap, err error) {
- if err != ds.ErrNoSuchEntity {
- i := idxMap[j]
- if !lme.Assign(i, err) {
- data[i].data = pm
- }
- }
- j++
- })
- if err != nil {
- return err
- }
- }
-
- for i, itm := range data {
- err := lme.GetOne(i)
- if err != nil {
- cb(nil, err)
- } else if itm.data == nil {
- cb(nil, ds.ErrNoSuchEntity)
- } else {
- cb(itm.data, nil)
- }
- }
- return nil
+ return d.state.getMulti(keys, metas, cb, d.haveLock)
}
func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error {
- lme := errors.NewLazyMultiError(len(keys))
- realKeys := []*ds.Key(nil)
- for i, key := range keys {
- if key.Incomplete() {
- start, err := d.AllocateIDs(key, 1)
- if !lme.Assign(i, err) {
- if realKeys == nil {
- realKeys = make([]*ds.Key, len(keys))
- copy(realKeys, keys)
- }
-
- aid, ns, toks := key.Split()
- toks[len(toks)-1].IntID = start
- realKeys[i] = ds.NewKeyToks(aid, ns, toks)
- }
- }
- }
- if err := lme.Get(); err != nil {
- for _, e := range err.(errors.MultiError) {
- if e == nil {
- e = errors.New("putMulti failed because some keys were unable to AllocateIDs")
- }
- cb(nil, e)
- }
- return nil
- }
-
- if realKeys == nil {
- realKeys = keys
- }
-
- err := d.state.putMulti(realKeys, vals)
- if err != nil {
- return err
- }
-
- for _, k := range realKeys {
- cb(k, nil)
- }
- return nil
+ return d.state.putMulti(keys, vals, cb, d.haveLock)
}
func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
- if err := d.state.deleteMulti(keys); err != nil {
- return err
- }
-
- for range keys {
- cb(nil)
- }
- return nil
+ return d.state.deleteMulti(keys, cb, d.haveLock)
}
func (d *dsTxnBuf) Count(fq *ds.FinalizedQuery) (count int64, err error) {
@@ -165,13 +76,15 @@ func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
project := fq.Project()
- d.state.Lock()
- memDS := d.state.memDS
- parentDS := d.state.parentDS
- sizes := d.state.entState.dup()
- d.state.Unlock()
+ bufDS, parentDS, sizes := func() (ds.RawInterface, ds.RawInterface, *sizeTracker) {
+ if !d.haveLock {
+ d.state.Lock()
+ defer d.state.Unlock()
+ }
+ return d.state.bufDS, d.state.parentDS, d.state.entState.dup()
+ }()
- return runMergedQueries(fq, sizes, memDS, parentDS, func(key *ds.Key, data ds.PropertyMap) bool {
+ return runMergedQueries(fq, sizes, bufDS, parentDS, func(key *ds.Key, data ds.PropertyMap) bool {
if offset > 0 {
offset--
return true
@@ -196,6 +109,10 @@ func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
}
func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.TransactionOptions) error {
+ if !d.haveLock {
+ d.state.Lock()
+ defer d.state.Unlock()
+ }
return withTxnBuf(d.ic, cb, opts)
}
« no previous file with comments | « filter/txnBuf/ds.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698