Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(326)

Unified Diff: impl/memory/datastore_data.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: stringSet everywhere Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/memory/datastore_data.go
diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go
index 5ef87530ded64c4c31f367bb859d3f87c43b8d7b..c2e393d8de85edc95ae22f2b6ab3fe84bd266c42 100644
--- a/impl/memory/datastore_data.go
+++ b/impl/memory/datastore_data.go
@@ -21,9 +21,9 @@ import (
type dataStoreData struct {
rwlock sync.RWMutex
- // See README.md for store schema.
- store *memStore
- snap *memStore
+ // See README.md for head schema.
+ head *memStore
+ snap *memStore
}
var (
@@ -32,10 +32,10 @@ var (
)
func newDataStoreData() *dataStoreData {
- store := newMemStore()
+ head := newMemStore()
return &dataStoreData{
- store: store,
- snap: store.Snapshot(), // empty but better than a nil pointer.
+ head: head,
+ snap: head.Snapshot(), // empty but better than a nil pointer.
}
}
@@ -50,7 +50,7 @@ func (d *dataStoreData) Unlock() {
func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
d.rwlock.RLock()
defer d.rwlock.RUnlock()
- head = d.store.Snapshot()
+ head = d.head.Snapshot()
if consistent {
idx = head
} else {
@@ -62,7 +62,7 @@ func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
func (d *dataStoreData) takeSnapshot() *memStore {
d.rwlock.RLock()
defer d.rwlock.RUnlock()
- return d.store.Snapshot()
+ return d.head.Snapshot()
}
func (d *dataStoreData) setSnapshot(snap *memStore) {
@@ -74,38 +74,35 @@ func (d *dataStoreData) setSnapshot(snap *memStore) {
func (d *dataStoreData) catchupIndexes() {
d.rwlock.Lock()
defer d.rwlock.Unlock()
- d.snap = d.store.Snapshot()
+ d.snap = d.head.Snapshot()
}
-/////////////////////////// indicies(dataStoreData) ////////////////////////////
+/////////////////////////// indexes(dataStoreData) ////////////////////////////
func groupMetaKey(key ds.Key) []byte {
- return keyBytes(serialize.WithoutContext,
- dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key)))
+ return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root(key)))
}
func groupIDsKey(key ds.Key) []byte {
- return keyBytes(serialize.WithoutContext,
- dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key)))
+ return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.Root(key)))
}
func rootIDsKey(kind string) []byte {
- return keyBytes(serialize.WithoutContext,
- dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
+ return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil))
}
func curVersion(ents *memCollection, key []byte) int64 {
if ents != nil {
if v := ents.Get(key); v != nil {
pm, err := rpm(v)
- if err != nil {
- panic(err) // memory corruption
- }
+ memoryCorruption(err)
+
pl, ok := pm["__version__"]
if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
return pl[0].Value().(int64)
}
- panic(fmt.Errorf("__version__ property missing or wrong: %v", pm))
+
+ memoryCorruption(fmt.Errorf("__version__ property missing or wrong: %v", pm))
}
}
return 0
@@ -113,18 +110,17 @@ func curVersion(ents *memCollection, key []byte) int64 {
func incrementLocked(ents *memCollection, key []byte) int64 {
ret := curVersion(ents, key) + 1
- buf := &bytes.Buffer{}
- serialize.WritePropertyMap(buf, serialize.WithContext, ds.PropertyMap{
- "__version__": {ds.MkPropertyNI(ret)}})
- ents.Set(key, buf.Bytes())
+ ents.Set(key, serialize.ToBytes(ds.PropertyMap{
+ "__version__": {ds.MkPropertyNI(ret)},
+ }))
return ret
}
func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
coll := "ents:" + key.Namespace()
- ents := d.store.GetCollection(coll)
+ ents := d.head.GetCollection(coll)
if ents == nil {
- ents = d.store.SetCollection(coll, nil)
+ ents = d.head.SetCollection(coll, nil)
}
if dskey.Incomplete(key) {
@@ -143,10 +139,8 @@ func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) {
func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) {
for i, k := range keys {
- buf := &bytes.Buffer{}
pmap, _ := vals[i].Save(false)
- serialize.WritePropertyMap(buf, serialize.WithoutContext, pmap)
- dataBytes := buf.Bytes()
+ dataBytes := serialize.ToBytes(pmap)
k, err := func() (ret ds.Key, err error) {
d.rwlock.Lock()
@@ -155,15 +149,15 @@ func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put
ents, ret := d.entsKeyLocked(k)
incrementLocked(ents, groupMetaKey(ret))
- old := ents.Get(keyBytes(serialize.WithoutContext, ret))
+ old := ents.Get(keyBytes(ret))
oldPM := ds.PropertyMap(nil)
if old != nil {
if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
return
}
}
- updateIndicies(d.store, ret, oldPM, pmap)
- ents.Set(keyBytes(serialize.WithoutContext, ret), dataBytes)
+ updateIndexes(d.head, ret, oldPM, pmap)
+ ents.Set(keyBytes(ret), dataBytes)
return
}()
if cb != nil {
@@ -185,7 +179,7 @@ func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti
}
for _, k := range keys {
- pdata := ents.Get(keyBytes(serialize.WithoutContext, k))
+ pdata := ents.Get(keyBytes(k))
if pdata == nil {
cb(nil, ds.ErrNoSuchEntity)
continue
@@ -207,14 +201,14 @@ func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error {
func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
toDel := make([][]byte, 0, len(keys))
for _, k := range keys {
- toDel = append(toDel, keyBytes(serialize.WithoutContext, k))
+ toDel = append(toDel, keyBytes(k))
}
ns := keys[0].Namespace()
d.rwlock.Lock()
defer d.rwlock.Unlock()
- ents := d.store.GetCollection("ents:" + ns)
+ ents := d.head.GetCollection("ents:" + ns)
for i, k := range keys {
if ents != nil {
@@ -228,7 +222,7 @@ func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) {
}
continue
}
- updateIndicies(d.store, k, oldPM, nil)
+ updateIndexes(d.head, k, oldPM, nil)
ents.Delete(kb)
}
}
@@ -246,14 +240,14 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
if len(muts) == 0 { // read-only
continue
}
- k, err := serialize.ReadKey(bytes.NewBufferString(rk), serialize.WithContext, "", "")
- if err != nil {
- panic(err)
- }
+ prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, "", "")
+ memoryCorruption(err)
+
+ k := prop.Value().(ds.Key)
entKey := "ents:" + k.Namespace()
mkey := groupMetaKey(k)
- entsHead := d.store.GetCollection(entKey)
+ entsHead := d.head.GetCollection(entKey)
entsSnap := txn.snap.GetCollection(entKey)
vHead := curVersion(entsHead, mkey)
vSnap := curVersion(entsSnap, mkey)
@@ -281,10 +275,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.data},
func(_ ds.Key, e error) { err = e })
}
- err = errors.SingleError(err)
- if err != nil {
- panic(err)
- }
+ impossible(err)
}
}
}
@@ -295,7 +286,7 @@ func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
// access to break features inside of transactions.
parent: d,
isXG: o != nil && o.XG,
- snap: d.store.Snapshot(),
+ snap: d.head.Snapshot(),
muts: map[string][]txnMutation{},
}
}
@@ -338,10 +329,11 @@ func (td *txnDataStoreData) endTxn() {
atomic.StoreInt32(&td.closed, 1)
}
func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
- panic("txnDataStoreData cannot apply transactions")
+ impossible(fmt.Errorf("cannot create a recursive transaction"))
}
func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
- panic("impossible")
+ impossible(fmt.Errorf("cannot create a recursive transaction"))
+ return nil
}
func (td *txnDataStoreData) run(f func() error) error {
@@ -365,7 +357,7 @@ func (td *txnDataStoreData) run(f func() error) error {
// Returns an error if this key causes the transaction to cross too many entity
// groups.
func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.PropertyMap) error {
- rk := string(keyBytes(serialize.WithContext, dskey.Root(key)))
+ rk := string(keyBytes(dskey.Root(key)))
td.Lock()
defer td.Unlock()
@@ -428,10 +420,8 @@ func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
return nil
}
-func keyBytes(ctx serialize.KeyContext, key ds.Key) []byte {
- buf := &bytes.Buffer{}
- serialize.WriteKey(buf, ctx, key)
- return buf.Bytes()
+func keyBytes(key ds.Key) []byte {
+ return serialize.ToBytes(ds.MkProperty(key))
}
func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698