Index: impl/memory/datastore_data.go |
diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go |
index a3bbda7074f61d34b15ddd8a7dbb63d33e36687d..e221c55eee161eb9fda8fece355a4a71da1da546 100644 |
--- a/impl/memory/datastore_data.go |
+++ b/impl/memory/datastore_data.go |
@@ -20,13 +20,17 @@ import ( |
type dataStoreData struct { |
rwlock sync.RWMutex |
+ |
+ // the 'appid' of this datastore |
+ aid string |
+ |
// See README.md for head schema. |
head *memStore |
+ // if snap is nil, that means that this is always-consistent, and |
+ // getQuerySnaps will return (head, head) |
snap *memStore |
// For testing, see SetTransactionRetryCount. |
txnFakeRetry int |
- // true means that head always == snap |
- consistent bool |
// true means that queries with insufficent indexes will pause to add them |
// and then continue instead of failing. |
autoIndex bool |
@@ -41,9 +45,10 @@ var ( |
_ = sync.Locker((*dataStoreData)(nil)) |
) |
-func newDataStoreData() *dataStoreData { |
+func newDataStoreData(aid string) *dataStoreData { |
head := newMemStore() |
return &dataStoreData{ |
+ aid: aid, |
head: head, |
snap: head.Snapshot(), // empty but better than a nil pointer. |
} |
@@ -67,8 +72,9 @@ func (d *dataStoreData) setConsistent(always bool) { |
d.Lock() |
defer d.Unlock() |
- d.consistent = always |
- if d.consistent { |
+ if always { |
+ d.snap = nil |
+ } else { |
d.snap = d.head.Snapshot() |
} |
} |
@@ -76,10 +82,7 @@ func (d *dataStoreData) setConsistent(always bool) { |
func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { |
d.Lock() |
defer d.Unlock() |
- addIndexes(d.head, ns, idxs) |
- if d.consistent { |
- d.snap = d.head.Snapshot() |
- } |
+ addIndexes(d.head, d.aid, ns, idxs) |
} |
func (d *dataStoreData) setAutoIndex(enable bool) { |
@@ -121,9 +124,9 @@ func (d *dataStoreData) getDisableSpecialEntities() bool { |
func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
d.rwlock.RLock() |
defer d.rwlock.RUnlock() |
- if d.consistent { |
- // snap is already a consistent snapshot of head |
- return d.snap, d.snap |
+ if d.snap == nil { |
+ // we're 'always consistent' |
+ return d.head, d.head |
} |
head = d.head.Snapshot() |
@@ -138,16 +141,14 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
func (d *dataStoreData) takeSnapshot() *memStore { |
d.rwlock.RLock() |
defer d.rwlock.RUnlock() |
- if d.consistent { |
- return d.snap |
- } |
return d.head.Snapshot() |
} |
func (d *dataStoreData) setSnapshot(snap *memStore) { |
d.rwlock.Lock() |
defer d.rwlock.Unlock() |
- if d.consistent { |
+ if d.snap == nil { |
+ // we're 'always consistent' |
return |
} |
d.snap = snap |
@@ -156,7 +157,8 @@ func (d *dataStoreData) setSnapshot(snap *memStore) { |
func (d *dataStoreData) catchupIndexes() { |
d.rwlock.Lock() |
defer d.rwlock.Unlock() |
- if d.consistent { |
+ if d.snap == nil { |
+ // we're 'always consistent' |
return |
} |
d.snap = d.head.Snapshot() |
@@ -272,15 +274,12 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu |
old := ents.Get(keyBytes(ret)) |
oldPM := ds.PropertyMap(nil) |
if old != nil { |
- if oldPM, err = rpmWoCtx(old, ns); err != nil { |
+ if oldPM, err = rpm(old); err != nil { |
return |
} |
} |
updateIndexes(d.head, ret, oldPM, pmap) |
ents.Set(keyBytes(ret), dataBytes) |
- if d.consistent { |
- d.snap = d.head.Snapshot() |
- } |
return |
}() |
if cb != nil { |
@@ -307,7 +306,7 @@ func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect |
cb(nil, ds.ErrNoSuchEntity) |
continue |
} |
- cb(rpmWoCtx(pdata, k.Namespace())) |
+ cb(rpm(pdata)) |
} |
return nil |
} |
@@ -336,15 +335,12 @@ func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { |
incrementLocked(ents, groupMetaKey(k), 1) |
} |
if old := ents.Get(kb); old != nil { |
- oldPM, err := rpmWoCtx(old, ns) |
+ oldPM, err := rpm(old) |
if err != nil { |
return err |
} |
updateIndexes(d.head, k, oldPM, nil) |
ents.Delete(kb) |
- if d.consistent { |
- d.snap = d.head.Snapshot() |
- } |
} |
return nil |
}() |
@@ -556,11 +552,6 @@ func keyBytes(key *ds.Key) []byte { |
return serialize.ToBytes(ds.MkProperty(key)) |
} |
-func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |
- return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
- serialize.WithoutContext, globalAppID, ns) |
-} |
- |
func rpm(data []byte) (ds.PropertyMap, error) { |
return serialize.ReadPropertyMap(bytes.NewBuffer(data), |
serialize.WithContext, "", "") |