| Index: impl/memory/datastore_data.go
|
| diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go
|
| index f563b4ba1e230c5b789facb5c1f2b45ea1485470..8a01e89e8000613881bb98a0ae5438bbb049bb70 100644
|
| --- a/impl/memory/datastore_data.go
|
| +++ b/impl/memory/datastore_data.go
|
| @@ -25,6 +25,8 @@ type dataStoreData struct {
|
| snap *memStore
|
| // For testing, see SetTransactionRetryCount.
|
| txnFakeRetry int
|
| + // true means that head always == snap
|
| + consistent bool
|
| }
|
|
|
| var (
|
| @@ -48,9 +50,30 @@ func (d *dataStoreData) Unlock() {
|
| d.rwlock.Unlock()
|
| }
|
|
|
| +func (d *dataStoreData) setTxnRetry(count int) {
|
| + d.Lock()
|
| + defer d.Unlock()
|
| + d.txnFakeRetry = count
|
| +}
|
| +
|
| +func (d *dataStoreData) setConsistent(always bool) {
|
| + d.Lock()
|
| + defer d.Unlock()
|
| +
|
| + d.consistent = always
|
| + if d.consistent {
|
| + d.snap = d.head.Snapshot()
|
| + }
|
| +}
|
| +
|
| func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
|
| d.rwlock.RLock()
|
| defer d.rwlock.RUnlock()
|
| + if d.consistent {
|
| + // snap is already a consistent snapshot of head
|
| + return d.snap, d.snap
|
| + }
|
| +
|
| head = d.head.Snapshot()
|
| if consistent {
|
| idx = head
|
| @@ -63,18 +86,27 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
|
| func (d *dataStoreData) takeSnapshot() *memStore {
|
| d.rwlock.RLock()
|
| defer d.rwlock.RUnlock()
|
| + if d.consistent {
|
| + return d.snap
|
| + }
|
| return d.head.Snapshot()
|
| }
|
|
|
| func (d *dataStoreData) setSnapshot(snap *memStore) {
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| + if d.consistent {
|
| + return
|
| + }
|
| d.snap = snap
|
| }
|
|
|
| func (d *dataStoreData) catchupIndexes() {
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| + if d.consistent {
|
| + return
|
| + }
|
| d.snap = d.head.Snapshot()
|
| }
|
|
|
| @@ -159,7 +191,8 @@ func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key {
|
| }
|
|
|
| func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) {
|
| - ents := d.mutableEnts(keys[0].Namespace())
|
| + ns := keys[0].Namespace()
|
| + ents := d.mutableEnts(ns)
|
|
|
| for i, k := range keys {
|
| pmap, _ := vals[i].Save(false)
|
| @@ -175,12 +208,15 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu
|
| old := ents.Get(keyBytes(ret))
|
| oldPM := ds.PropertyMap(nil)
|
| if old != nil {
|
| - if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
|
| + if oldPM, err = rpmWoCtx(old, ns); err != nil {
|
| return
|
| }
|
| }
|
| updateIndexes(d.head, ret, oldPM, pmap)
|
| ents.Set(keyBytes(ret), dataBytes)
|
| + if d.consistent {
|
| + d.snap = d.head.Snapshot()
|
| + }
|
| return
|
| }()
|
| if cb != nil {
|
| @@ -221,34 +257,37 @@ func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
|
| }
|
|
|
| func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
|
| - toDel := make([][]byte, 0, len(keys))
|
| - for _, k := range keys {
|
| - toDel = append(toDel, keyBytes(k))
|
| - }
|
| ns := keys[0].Namespace()
|
| + ents := d.mutableEnts(ns)
|
|
|
| - d.rwlock.Lock()
|
| - defer d.rwlock.Unlock()
|
| -
|
| - ents := d.head.GetCollection("ents:" + ns)
|
| -
|
| - for i, k := range keys {
|
| - if ents != nil {
|
| - incrementLocked(ents, groupMetaKey(k), 1)
|
| - kb := toDel[i]
|
| - if old := ents.Get(kb); old != nil {
|
| - oldPM, err := rpmWoCtx(old, ns)
|
| - if err != nil {
|
| - if cb != nil {
|
| - cb(err)
|
| + if ents != nil {
|
| + for _, k := range keys {
|
| + err := func() error {
|
| + kb := keyBytes(k)
|
| +
|
| + d.Lock()
|
| + defer d.Unlock()
|
| +
|
| + incrementLocked(ents, groupMetaKey(k), 1)
|
| + if old := ents.Get(kb); old != nil {
|
| + oldPM, err := rpmWoCtx(old, ns)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + updateIndexes(d.head, k, oldPM, nil)
|
| + ents.Delete(kb)
|
| + if d.consistent {
|
| + d.snap = d.head.Snapshot()
|
| }
|
| - continue
|
| }
|
| - updateIndexes(d.head, k, oldPM, nil)
|
| - ents.Delete(kb)
|
| + return nil
|
| + }()
|
| + if cb != nil {
|
| + cb(err)
|
| }
|
| }
|
| - if cb != nil {
|
| + } else if cb != nil {
|
| + for range keys {
|
| cb(nil)
|
| }
|
| }
|
|
|