Index: impl/memory/datastore_data.go |
diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go |
index 5ef87530ded64c4c31f367bb859d3f87c43b8d7b..c2e393d8de85edc95ae22f2b6ab3fe84bd266c42 100644 |
--- a/impl/memory/datastore_data.go |
+++ b/impl/memory/datastore_data.go |
@@ -21,9 +21,9 @@ import ( |
type dataStoreData struct { |
rwlock sync.RWMutex |
- // See README.md for store schema. |
- store *memStore |
- snap *memStore |
+ // See README.md for head schema. |
+ head *memStore |
+ snap *memStore |
} |
var ( |
@@ -32,10 +32,10 @@ var ( |
) |
func newDataStoreData() *dataStoreData { |
- store := newMemStore() |
+ head := newMemStore() |
return &dataStoreData{ |
- store: store, |
- snap: store.Snapshot(), // empty but better than a nil pointer. |
+ head: head, |
+ snap: head.Snapshot(), // empty but better than a nil pointer. |
} |
} |
@@ -50,7 +50,7 @@ func (d *dataStoreData) Unlock() { |
func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
d.rwlock.RLock() |
defer d.rwlock.RUnlock() |
- head = d.store.Snapshot() |
+ head = d.head.Snapshot() |
if consistent { |
idx = head |
} else { |
@@ -62,7 +62,7 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { |
func (d *dataStoreData) takeSnapshot() *memStore { |
d.rwlock.RLock() |
defer d.rwlock.RUnlock() |
- return d.store.Snapshot() |
+ return d.head.Snapshot() |
} |
func (d *dataStoreData) setSnapshot(snap *memStore) { |
@@ -74,38 +74,35 @@ func (d *dataStoreData) setSnapshot(snap *memStore) { |
func (d *dataStoreData) catchupIndexes() { |
d.rwlock.Lock() |
defer d.rwlock.Unlock() |
- d.snap = d.store.Snapshot() |
+ d.snap = d.head.Snapshot() |
} |
-/////////////////////////// indicies(dataStoreData) //////////////////////////// |
+/////////////////////////// indexes(dataStoreData) //////////////////////////// |
func groupMetaKey(key ds.Key) []byte { |
- return keyBytes(serialize.WithoutContext, |
- dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key))) |
+ return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key))) |
} |
func groupIDsKey(key ds.Key) []byte { |
- return keyBytes(serialize.WithoutContext, |
- dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key))) |
+ return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key))) |
} |
func rootIDsKey(kind string) []byte { |
- return keyBytes(serialize.WithoutContext, |
- dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
+ return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) |
} |
func curVersion(ents *memCollection, key []byte) int64 { |
if ents != nil { |
if v := ents.Get(key); v != nil { |
pm, err := rpm(v) |
- if err != nil { |
- panic(err) // memory corruption |
- } |
+ memoryCorruption(err) |
+ |
pl, ok := pm["__version__"] |
if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { |
return pl[0].Value().(int64) |
} |
- panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) |
+ |
+ memoryCorruption(fmt.Errorf("__version__ property missing or wrong: %v", pm)) |
} |
} |
return 0 |
@@ -113,18 +110,17 @@ func curVersion(ents *memCollection, key []byte) int64 { |
func incrementLocked(ents *memCollection, key []byte) int64 { |
ret := curVersion(ents, key) + 1 |
- buf := &bytes.Buffer{} |
- serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{ |
- "__version__": {ds.MkPropertyNI(ret)}}) |
- ents.Set(key, buf.Bytes()) |
+ ents.Set(key, serialize.ToBytes(ds.PropertyMap{ |
+ "__version__": {ds.MkPropertyNI(ret)}, |
+ })) |
return ret |
} |
func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
coll := "ents:" + key.Namespace() |
- ents := d.store.GetCollection(coll) |
+ ents := d.head.GetCollection(coll) |
if ents == nil { |
- ents = d.store.SetCollection(coll, nil) |
+ ents = d.head.SetCollection(coll, nil) |
} |
if dskey.Incomplete(key) { |
@@ -143,10 +139,8 @@ func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { |
func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) { |
for i, k := range keys { |
- buf := &bytes.Buffer{} |
pmap, _ := vals[i].Save(false) |
- serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap) |
- dataBytes := buf.Bytes() |
+ dataBytes := serialize.ToBytes(pmap) |
k, err := func() (ret ds.Key, err error) { |
d.rwlock.Lock() |
@@ -155,15 +149,15 @@ func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put |
ents, ret := d.entsKeyLocked(k) |
incrementLocked(ents, groupMetaKey(ret)) |
- old := ents.Get(keyBytes(serialize.WithoutContext, ret)) |
+ old := ents.Get(keyBytes(ret)) |
oldPM := ds.PropertyMap(nil) |
if old != nil { |
if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { |
return |
} |
} |
- updateIndicies(d.store, ret, oldPM, pmap) |
- ents.Set(keyBytes(serialize.WithoutContext, ret), dataBytes) |
+ updateIndexes(d.head, ret, oldPM, pmap) |
+ ents.Set(keyBytes(ret), dataBytes) |
return |
}() |
if cb != nil { |
@@ -185,7 +179,7 @@ func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti |
} |
for _, k := range keys { |
- pdata := ents.Get(keyBytes(serialize.WithoutContext, k)) |
+ pdata := ents.Get(keyBytes(k)) |
if pdata == nil { |
cb(nil, ds.ErrNoSuchEntity) |
continue |
@@ -207,14 +201,14 @@ func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { |
func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
toDel := make([][]byte, 0, len(keys)) |
for _, k := range keys { |
- toDel = append(toDel, keyBytes(serialize.WithoutContext, k)) |
+ toDel = append(toDel, keyBytes(k)) |
} |
ns := keys[0].Namespace() |
d.rwlock.Lock() |
defer d.rwlock.Unlock() |
- ents := d.store.GetCollection("ents:" + ns) |
+ ents := d.head.GetCollection("ents:" + ns) |
for i, k := range keys { |
if ents != nil { |
@@ -228,7 +222,7 @@ func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { |
} |
continue |
} |
- updateIndicies(d.store, k, oldPM, nil) |
+ updateIndexes(d.head, k, oldPM, nil) |
ents.Delete(kb) |
} |
} |
@@ -246,14 +240,14 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
if len(muts) == 0 { // read-only |
continue |
} |
- k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize.WithContext, "", "") |
- if err != nil { |
- panic(err) |
- } |
+ prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, "", "") |
+ memoryCorruption(err) |
+ |
+ k := prop.Value().(ds.Key) |
entKey := "ents:" + k.Namespace() |
mkey := groupMetaKey(k) |
- entsHead := d.store.GetCollection(entKey) |
+ entsHead := d.head.GetCollection(entKey) |
entsSnap := txn.snap.GetCollection(entKey) |
vHead := curVersion(entsHead, mkey) |
vSnap := curVersion(entsSnap, mkey) |
@@ -281,10 +275,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.data}, |
func(_ ds.Key, e error) { err = e }) |
} |
- err = errors.SingleError(err) |
- if err != nil { |
- panic(err) |
- } |
+ impossible(err) |
} |
} |
} |
@@ -295,7 +286,7 @@ func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { |
// access to break features inside of transactions. |
parent: d, |
isXG: o != nil && o.XG, |
- snap: d.store.Snapshot(), |
+ snap: d.head.Snapshot(), |
muts: map[string][]txnMutation{}, |
} |
} |
@@ -338,10 +329,11 @@ func (td *txnDataStoreData) endTxn() { |
atomic.StoreInt32(&td.closed, 1) |
} |
func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
- panic("txnDataStoreData cannot apply transactions") |
+ impossible(fmt.Errorf("cannot create a recursive transaction")) |
} |
func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj { |
- panic("impossible") |
+ impossible(fmt.Errorf("cannot create a recursive transaction")) |
+ return nil |
} |
func (td *txnDataStoreData) run(f func() error) error { |
@@ -365,7 +357,7 @@ func (td *txnDataStoreData) run(f func() error) error { |
// Returns an error if this key causes the transaction to cross too many entity |
// groups. |
func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.PropertyMap) error { |
- rk := string(keyBytes(serialize.WithContext, dskey.Root(key))) |
+ rk := string(keyBytes(dskey.Root(key))) |
td.Lock() |
defer td.Unlock() |
@@ -428,10 +420,8 @@ func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
return nil |
} |
-func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte { |
- buf := &bytes.Buffer{} |
- serialize.WriteKey(buf, ctx, key) |
- return buf.Bytes() |
+func keyBytes(key ds.Key) []byte { |
+ return serialize.ToBytes(ds.MkProperty(key)) |
} |
func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { |