| Index: impl/memory/datastore_data.go
|
| diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go
|
| index a3bbda7074f61d34b15ddd8a7dbb63d33e36687d..e221c55eee161eb9fda8fece355a4a71da1da546 100644
|
| --- a/impl/memory/datastore_data.go
|
| +++ b/impl/memory/datastore_data.go
|
| @@ -20,13 +20,17 @@ import (
|
|
|
| type dataStoreData struct {
|
| rwlock sync.RWMutex
|
| +
|
| + // the 'appid' of this datastore
|
| + aid string
|
| +
|
| // See README.md for head schema.
|
| head *memStore
|
| + // if snap is nil, that means that this is always-consistent, and
|
| + // getQuerySnaps will return (head, head)
|
| snap *memStore
|
| // For testing, see SetTransactionRetryCount.
|
| txnFakeRetry int
|
| - // true means that head always == snap
|
| - consistent bool
|
| // true means that queries with insufficent indexes will pause to add them
|
| // and then continue instead of failing.
|
| autoIndex bool
|
| @@ -41,9 +45,10 @@ var (
|
| _ = sync.Locker((*dataStoreData)(nil))
|
| )
|
|
|
| -func newDataStoreData() *dataStoreData {
|
| +func newDataStoreData(aid string) *dataStoreData {
|
| head := newMemStore()
|
| return &dataStoreData{
|
| + aid: aid,
|
| head: head,
|
| snap: head.Snapshot(), // empty but better than a nil pointer.
|
| }
|
| @@ -67,8 +72,9 @@ func (d *dataStoreData) setConsistent(always bool) {
|
| d.Lock()
|
| defer d.Unlock()
|
|
|
| - d.consistent = always
|
| - if d.consistent {
|
| + if always {
|
| + d.snap = nil
|
| + } else {
|
| d.snap = d.head.Snapshot()
|
| }
|
| }
|
| @@ -76,10 +82,7 @@ func (d *dataStoreData) setConsistent(always bool) {
|
| func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) {
|
| d.Lock()
|
| defer d.Unlock()
|
| - addIndexes(d.head, ns, idxs)
|
| - if d.consistent {
|
| - d.snap = d.head.Snapshot()
|
| - }
|
| + addIndexes(d.head, d.aid, ns, idxs)
|
| }
|
|
|
| func (d *dataStoreData) setAutoIndex(enable bool) {
|
| @@ -121,9 +124,9 @@ func (d *dataStoreData) getDisableSpecialEntities() bool {
|
| func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
|
| d.rwlock.RLock()
|
| defer d.rwlock.RUnlock()
|
| - if d.consistent {
|
| - // snap is already a consistent snapshot of head
|
| - return d.snap, d.snap
|
| + if d.snap == nil {
|
| + // we're 'always consistent'
|
| + return d.head, d.head
|
| }
|
|
|
| head = d.head.Snapshot()
|
| @@ -138,16 +141,14 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
|
| func (d *dataStoreData) takeSnapshot() *memStore {
|
| d.rwlock.RLock()
|
| defer d.rwlock.RUnlock()
|
| - if d.consistent {
|
| - return d.snap
|
| - }
|
| return d.head.Snapshot()
|
| }
|
|
|
| func (d *dataStoreData) setSnapshot(snap *memStore) {
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| - if d.consistent {
|
| + if d.snap == nil {
|
| + // we're 'always consistent'
|
| return
|
| }
|
| d.snap = snap
|
| @@ -156,7 +157,8 @@ func (d *dataStoreData) setSnapshot(snap *memStore) {
|
| func (d *dataStoreData) catchupIndexes() {
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| - if d.consistent {
|
| + if d.snap == nil {
|
| + // we're 'always consistent'
|
| return
|
| }
|
| d.snap = d.head.Snapshot()
|
| @@ -272,15 +274,12 @@ func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu
|
| old := ents.Get(keyBytes(ret))
|
| oldPM := ds.PropertyMap(nil)
|
| if old != nil {
|
| - if oldPM, err = rpmWoCtx(old, ns); err != nil {
|
| + if oldPM, err = rpm(old); err != nil {
|
| return
|
| }
|
| }
|
| updateIndexes(d.head, ret, oldPM, pmap)
|
| ents.Set(keyBytes(ret), dataBytes)
|
| - if d.consistent {
|
| - d.snap = d.head.Snapshot()
|
| - }
|
| return
|
| }()
|
| if cb != nil {
|
| @@ -307,7 +306,7 @@ func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect
|
| cb(nil, ds.ErrNoSuchEntity)
|
| continue
|
| }
|
| - cb(rpmWoCtx(pdata, k.Namespace()))
|
| + cb(rpm(pdata))
|
| }
|
| return nil
|
| }
|
| @@ -336,15 +335,12 @@ func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
|
| incrementLocked(ents, groupMetaKey(k), 1)
|
| }
|
| if old := ents.Get(kb); old != nil {
|
| - oldPM, err := rpmWoCtx(old, ns)
|
| + oldPM, err := rpm(old)
|
| if err != nil {
|
| return err
|
| }
|
| updateIndexes(d.head, k, oldPM, nil)
|
| ents.Delete(kb)
|
| - if d.consistent {
|
| - d.snap = d.head.Snapshot()
|
| - }
|
| }
|
| return nil
|
| }()
|
| @@ -556,11 +552,6 @@ func keyBytes(key *ds.Key) []byte {
|
| return serialize.ToBytes(ds.MkProperty(key))
|
| }
|
|
|
| -func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
|
| - return serialize.ReadPropertyMap(bytes.NewBuffer(data),
|
| - serialize.WithoutContext, globalAppID, ns)
|
| -}
|
| -
|
| func rpm(data []byte) (ds.PropertyMap, error) {
|
| return serialize.ReadPropertyMap(bytes.NewBuffer(data),
|
| serialize.WithContext, "", "")
|
|
|