| Index: impl/memory/datastore_data.go
|
| diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go
|
| index 5ef87530ded64c4c31f367bb859d3f87c43b8d7b..c2e393d8de85edc95ae22f2b6ab3fe84bd266c42 100644
|
| --- a/impl/memory/datastore_data.go
|
| +++ b/impl/memory/datastore_data.go
|
| @@ -21,9 +21,9 @@ import (
|
|
|
| type dataStoreData struct {
|
| rwlock sync.RWMutex
|
| - // See README.md for store schema.
|
| - store *memStore
|
| - snap *memStore
|
| + // See README.md for head schema.
|
| + head *memStore
|
| + snap *memStore
|
| }
|
|
|
| var (
|
| @@ -32,10 +32,10 @@ var (
|
| )
|
|
|
| func newDataStoreData() *dataStoreData {
|
| - store := newMemStore()
|
| + head := newMemStore()
|
| return &dataStoreData{
|
| - store: store,
|
| - snap: store.Snapshot(), // empty but better than a nil pointer.
|
| + head: head,
|
| + snap: head.Snapshot(), // empty but better than a nil pointer.
|
| }
|
| }
|
|
|
| @@ -50,7 +50,7 @@ func (d *dataStoreData) Unlock() {
|
| func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
|
| d.rwlock.RLock()
|
| defer d.rwlock.RUnlock()
|
| - head = d.store.Snapshot()
|
| + head = d.head.Snapshot()
|
| if consistent {
|
| idx = head
|
| } else {
|
| @@ -62,7 +62,7 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
|
| func (d *dataStoreData) takeSnapshot() *memStore {
|
| d.rwlock.RLock()
|
| defer d.rwlock.RUnlock()
|
| - return d.store.Snapshot()
|
| + return d.head.Snapshot()
|
| }
|
|
|
| func (d *dataStoreData) setSnapshot(snap *memStore) {
|
| @@ -74,38 +74,35 @@ func (d *dataStoreData) setSnapshot(snap *memStore) {
|
| func (d *dataStoreData) catchupIndexes() {
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
| - d.snap = d.store.Snapshot()
|
| + d.snap = d.head.Snapshot()
|
| }
|
|
|
| -/////////////////////////// indicies(dataStoreData) ////////////////////////////
|
| +/////////////////////////// indexes(dataStoreData) ////////////////////////////
|
|
|
| func groupMetaKey(key ds.Key) []byte {
|
| - return keyBytes(serialize.WithoutContext,
|
| - dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key)))
|
| + return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key)))
|
| }
|
|
|
| func groupIDsKey(key ds.Key) []byte {
|
| - return keyBytes(serialize.WithoutContext,
|
| - dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key)))
|
| + return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key)))
|
| }
|
|
|
| func rootIDsKey(kind string) []byte {
|
| - return keyBytes(serialize.WithoutContext,
|
| - dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
|
| + return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
|
| }
|
|
|
| func curVersion(ents *memCollection, key []byte) int64 {
|
| if ents != nil {
|
| if v := ents.Get(key); v != nil {
|
| pm, err := rpm(v)
|
| - if err != nil {
|
| - panic(err) // memory corruption
|
| - }
|
| + memoryCorruption(err)
|
| +
|
| pl, ok := pm["__version__"]
|
| if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
|
| return pl[0].Value().(int64)
|
| }
|
| - panic(fmt.Errorf("__version__ property missing or wrong: %v", pm))
|
| +
|
| + memoryCorruption(fmt.Errorf("__version__ property missing or wrong: %v", pm))
|
| }
|
| }
|
| return 0
|
| @@ -113,18 +110,17 @@ func curVersion(ents *memCollection, key []byte) int64 {
|
|
|
| func incrementLocked(ents *memCollection, key []byte) int64 {
|
| ret := curVersion(ents, key) + 1
|
| - buf := &bytes.Buffer{}
|
| - serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{
|
| - "__version__": {ds.MkPropertyNI(ret)}})
|
| - ents.Set(key, buf.Bytes())
|
| + ents.Set(key, serialize.ToBytes(ds.PropertyMap{
|
| + "__version__": {ds.MkPropertyNI(ret)},
|
| + }))
|
| return ret
|
| }
|
|
|
| func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
|
| coll := "ents:" + key.Namespace()
|
| - ents := d.store.GetCollection(coll)
|
| + ents := d.head.GetCollection(coll)
|
| if ents == nil {
|
| - ents = d.store.SetCollection(coll, nil)
|
| + ents = d.head.SetCollection(coll, nil)
|
| }
|
|
|
| if dskey.Incomplete(key) {
|
| @@ -143,10 +139,8 @@ func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
|
|
|
| func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) {
|
| for i, k := range keys {
|
| - buf := &bytes.Buffer{}
|
| pmap, _ := vals[i].Save(false)
|
| - serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap)
|
| - dataBytes := buf.Bytes()
|
| + dataBytes := serialize.ToBytes(pmap)
|
|
|
| k, err := func() (ret ds.Key, err error) {
|
| d.rwlock.Lock()
|
| @@ -155,15 +149,15 @@ func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
|
| ents, ret := d.entsKeyLocked(k)
|
| incrementLocked(ents, groupMetaKey(ret))
|
|
|
| - old := ents.Get(keyBytes(serialize.WithoutContext, ret))
|
| + old := ents.Get(keyBytes(ret))
|
| oldPM := ds.PropertyMap(nil)
|
| if old != nil {
|
| if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
|
| return
|
| }
|
| }
|
| - updateIndicies(d.store, ret, oldPM, pmap)
|
| - ents.Set(keyBytes(serialize.WithoutContext, ret), dataBytes)
|
| + updateIndexes(d.head, ret, oldPM, pmap)
|
| + ents.Set(keyBytes(ret), dataBytes)
|
| return
|
| }()
|
| if cb != nil {
|
| @@ -185,7 +179,7 @@ func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
|
| }
|
|
|
| for _, k := range keys {
|
| - pdata := ents.Get(keyBytes(serialize.WithoutContext, k))
|
| + pdata := ents.Get(keyBytes(k))
|
| if pdata == nil {
|
| cb(nil, ds.ErrNoSuchEntity)
|
| continue
|
| @@ -207,14 +201,14 @@ func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error {
|
| func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
|
| toDel := make([][]byte, 0, len(keys))
|
| for _, k := range keys {
|
| - toDel = append(toDel, keyBytes(serialize.WithoutContext, k))
|
| + toDel = append(toDel, keyBytes(k))
|
| }
|
| ns := keys[0].Namespace()
|
|
|
| d.rwlock.Lock()
|
| defer d.rwlock.Unlock()
|
|
|
| - ents := d.store.GetCollection("ents:" + ns)
|
| + ents := d.head.GetCollection("ents:" + ns)
|
|
|
| for i, k := range keys {
|
| if ents != nil {
|
| @@ -228,7 +222,7 @@ func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
|
| }
|
| continue
|
| }
|
| - updateIndicies(d.store, k, oldPM, nil)
|
| + updateIndexes(d.head, k, oldPM, nil)
|
| ents.Delete(kb)
|
| }
|
| }
|
| @@ -246,14 +240,14 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
|
| if len(muts) == 0 { // read-only
|
| continue
|
| }
|
| - k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize.WithContext, "", "")
|
| - if err != nil {
|
| - panic(err)
|
| - }
|
| + prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, "", "")
|
| + memoryCorruption(err)
|
| +
|
| + k := prop.Value().(ds.Key)
|
|
|
| entKey := "ents:" + k.Namespace()
|
| mkey := groupMetaKey(k)
|
| - entsHead := d.store.GetCollection(entKey)
|
| + entsHead := d.head.GetCollection(entKey)
|
| entsSnap := txn.snap.GetCollection(entKey)
|
| vHead := curVersion(entsHead, mkey)
|
| vSnap := curVersion(entsSnap, mkey)
|
| @@ -281,10 +275,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
|
| d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.data},
|
| func(_ ds.Key, e error) { err = e })
|
| }
|
| - err = errors.SingleError(err)
|
| - if err != nil {
|
| - panic(err)
|
| - }
|
| + impossible(err)
|
| }
|
| }
|
| }
|
| @@ -295,7 +286,7 @@ func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
|
| // access to break features inside of transactions.
|
| parent: d,
|
| isXG: o != nil && o.XG,
|
| - snap: d.store.Snapshot(),
|
| + snap: d.head.Snapshot(),
|
| muts: map[string][]txnMutation{},
|
| }
|
| }
|
| @@ -338,10 +329,11 @@ func (td *txnDataStoreData) endTxn() {
|
| atomic.StoreInt32(&td.closed, 1)
|
| }
|
| func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
|
| - panic("txnDataStoreData cannot apply transactions")
|
| + impossible(fmt.Errorf("cannot create a recursive transaction"))
|
| }
|
| func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
|
| - panic("impossible")
|
| + impossible(fmt.Errorf("cannot create a recursive transaction"))
|
| + return nil
|
| }
|
|
|
| func (td *txnDataStoreData) run(f func() error) error {
|
| @@ -365,7 +357,7 @@ func (td *txnDataStoreData) run(f func() error) error {
|
| // Returns an error if this key causes the transaction to cross too many entity
|
| // groups.
|
| func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.PropertyMap) error {
|
| - rk := string(keyBytes(serialize.WithContext, dskey.Root(key)))
|
| + rk := string(keyBytes(dskey.Root(key)))
|
|
|
| td.Lock()
|
| defer td.Unlock()
|
| @@ -428,10 +420,8 @@ func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
|
| return nil
|
| }
|
|
|
| -func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte {
|
| - buf := &bytes.Buffer{}
|
| - serialize.WriteKey(buf, ctx, key)
|
| - return buf.Bytes()
|
| +func keyBytes(key ds.Key) []byte {
|
| + return serialize.ToBytes(ds.MkProperty(key))
|
| }
|
|
|
| func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
|
|
|