Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Unified Diff: go/src/infra/gae/libs/gae/memory/raw_datstore_data.go

Issue 1242043005: Improve memory implementation of gae to provide the full RawDatastore interface. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@port_broken_features
Patch Set: further simplification of CL Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
diff --git a/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go b/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
index 70533f693668bb993c045f5495bd708b4e76cc5a..e8d2744611dc0ae5f04dfb9c92e5ef20192af250 100644
--- a/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
+++ b/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go
@@ -63,38 +63,31 @@ func rootIDsKey(kind string) []byte {
helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil))
}
-func curVersion(ents *memCollection, key []byte) (int64, error) {
+func curVersion(ents *memCollection, key []byte) int64 {
if v := ents.Get(key); v != nil {
pm, err := rpm(v)
if err != nil {
- return 0, err
+ panic(err) // memory corruption
}
pl, ok := pm["__version__"]
if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt {
- return pl[0].Value().(int64), nil
+ return pl[0].Value().(int64)
}
- return 0, fmt.Errorf("__version__ property missing or wrong: %v", pm)
+ panic(fmt.Errorf("__version__ property missing or wrong: %v", pm))
}
- return 0, nil
+ return 0
}
-func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) {
- if ret, err = curVersion(ents, key); err != nil {
- ret = 0
- }
- ret++
- p := gae.DSProperty{}
- if err = p.SetValue(ret, true); err != nil {
- return
- }
+func incrementLocked(ents *memCollection, key []byte) int64 {
+ ret := curVersion(ents, key) + 1
buf := &bytes.Buffer{}
helper.WriteDSPropertyMap(
- buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext)
+ buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}}, helper.WithContext)
ents.Set(key, buf.Bytes())
- return
+ return ret
}
-func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, error) {
+func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey) {
Vadim Sh. 2015/07/17 01:48:29 do you have tests that spawn multiple goroutines a
coll := "ents:" + key.Namespace()
ents := d.store.GetCollection(coll)
if ents == nil {
@@ -108,91 +101,118 @@ func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey,
} else {
idKey = groupIDsKey(key)
}
- id, err := incrementLocked(ents, idKey)
- if err != nil {
- return nil, nil, err
- }
+ id := incrementLocked(ents, idKey)
key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent())
}
- return ents, key, nil
+ return ents, key
}
-func putPrelim(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSPropertyMap, error) {
- if !keyCouldBeValid(key, ns, false) {
- // TODO(riannucci): different error for Put-ing to reserved Keys?
- return nil, gae.ErrDSInvalidKey
+func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSKey, error) {
+ keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls})
+ if errs == nil {
+ return keys[0], nil
}
- return pls.Save(false)
+ return nil, gae.SingleError(errs)
}
-func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSKey, error) {
- pm, err := putPrelim(ns, key, pls)
+func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSKey, error) {
+ pmaps, err := putMultiPrelim(ns, keys, plss)
if err != nil {
return nil, err
}
- return d.putInner(key, pm)
+ return d.putMultiInner(keys, pmaps)
}
-func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSKey, error) {
- buf := &bytes.Buffer{}
- helper.WriteDSPropertyMap(buf, data, helper.WithoutContext)
- dataBytes := buf.Bytes()
-
- d.rwlock.Lock()
- defer d.rwlock.Unlock()
-
- ents, key, err := d.entsKeyLocked(key)
+func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSPropertyMap, error) {
+ err := multiValid(keys, plss, ns, true, false)
if err != nil {
return nil, err
}
- if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
- return nil, err
- }
-
- old := ents.Get(keyBytes(helper.WithoutContext, key))
- oldPM := gae.DSPropertyMap(nil)
- if old != nil {
- if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil {
- return nil, err
- }
- }
- if err = updateIndicies(d.store, key, oldPM, data); err != nil {
- return nil, err
+ pmaps := make([]gae.DSPropertyMap, len(keys))
+ lme := gae.LazyMultiError{Size: len(keys)}
+ for i, pls := range plss {
+ pm, err := pls.Save(false)
+ lme.Assign(i, err)
+ pmaps[i] = pm
+ }
+ return pmaps, lme.Get()
+}
+
+func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap) ([]gae.DSKey, error) {
+ retKeys := make([]gae.DSKey, len(keys))
+ lme := gae.LazyMultiError{Size: len(keys)}
+ for i, k := range keys {
+ buf := &bytes.Buffer{}
+ helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext)
+ dataBytes := buf.Bytes()
+
+ rKey, err := func() (ret gae.DSKey, err error) {
+ d.rwlock.Lock()
+ defer d.rwlock.Unlock()
+
+ ents, ret := d.entsKeyLocked(k)
+ incrementLocked(ents, groupMetaKey(ret))
+
+ old := ents.Get(keyBytes(helper.WithoutContext, ret))
+ oldPM := gae.DSPropertyMap(nil)
+ if old != nil {
+ if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
+ return
+ }
+ }
+ updateIndicies(d.store, ret, oldPM, data[i])
+ ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes)
+ return
+ }()
+ lme.Assign(i, err)
+ retKeys[i] = rKey
}
-
- ents.Set(keyBytes(helper.WithoutContext, key), dataBytes)
-
- return key, nil
+ return retKeys, lme.Get()
}
-func getInner(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver, getColl func() (*memCollection, error)) error {
- if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) {
- return gae.ErrDSInvalidKey
+func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, getColl func() (*memCollection, error)) error {
+ if err := multiValid(keys, plss, ns, false, true); err != nil {
+ return err
}
+ lme := gae.LazyMultiError{Size: len(keys)}
+
ents, err := getColl()
if err != nil {
return err
}
if ents == nil {
- return gae.ErrDSNoSuchEntity
- }
- pdata := ents.Get(keyBytes(helper.WithoutContext, key))
- if pdata == nil {
- return gae.ErrDSNoSuchEntity
+ for i := range keys {
+ lme.Assign(i, gae.ErrDSNoSuchEntity)
+ }
+ return lme.Get()
}
- got, err := rpmWoCtx(pdata, ns)
- if err != nil {
- return err
- }
+ for i, k := range keys {
+ pdata := ents.Get(keyBytes(helper.WithoutContext, k))
+ if pdata == nil {
+ lme.Assign(i, gae.ErrDSNoSuchEntity)
+ continue
+ }
- return pls.Load(got)
+ got, err := rpmWoCtx(pdata, ns)
+ if err != nil {
+ lme.Assign(i, err)
+ continue
+ }
+
+ lme.Assign(i, plss[i].Load(got))
+ }
+ return lme.Get()
}
func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) error {
- return getInner(ns, key, pls, func() (*memCollection, error) {
+ return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls}))
+}
+
+func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) error {
+ return getMultiInner(ns, keys, plss, func() (*memCollection, error) {
d.rwlock.RLock()
s := d.store.Snapshot()
d.rwlock.RUnlock()
@@ -202,11 +222,23 @@ func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave
}
func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) {
- if !helper.DSKeyValid(key, ns, false) {
- return gae.ErrDSInvalidKey
- }
+ return gae.SingleError(d.delMulti(ns, []gae.DSKey{key}))
+}
- keyBuf := keyBytes(helper.WithoutContext, key)
+func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error {
+ lme := gae.LazyMultiError{Size: len(keys)}
+ toDel := make([][]byte, 0, len(keys))
+ for i, k := range keys {
+ if !helper.DSKeyValid(k, ns, false) {
+ lme.Assign(i, gae.ErrDSInvalidKey)
+ continue
+ }
+ toDel = append(toDel, keyBytes(helper.WithoutContext, k))
+ }
+ err := lme.Get()
+ if err != nil {
Vadim Sh. 2015/07/17 01:48:29 nit: if err := ...; err != nil {
+ return err
+ }
d.rwlock.Lock()
defer d.rwlock.Unlock()
@@ -215,23 +247,22 @@ func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) {
if ents == nil {
return nil
}
- if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
- return
- }
- old := ents.Get(keyBuf)
- oldPM := gae.DSPropertyMap(nil)
- if old != nil {
- if oldPM, err = rpmWoCtx(old, ns); err != nil {
- return
+ for i, k := range keys {
+ incrementLocked(ents, groupMetaKey(k))
+ kb := toDel[i]
+ old := ents.Get(kb)
+ oldPM := gae.DSPropertyMap(nil)
+ if old != nil {
+ if oldPM, err = rpmWoCtx(old, ns); err != nil {
+ lme.Assign(i, err)
+ continue
+ }
}
+ updateIndicies(d.store, k, oldPM, nil)
+ ents.Delete(kb)
}
- if err := updateIndicies(d.store, key, oldPM, nil); err != nil {
- return err
- }
-
- ents.Delete(keyBuf)
- return nil
+ return lme.Get()
}
func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
@@ -251,14 +282,8 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
mkey := groupMetaKey(k)
entsHead := d.store.GetCollection(entKey)
entsSnap := txn.snap.GetCollection(entKey)
- vHead, err := curVersion(entsHead, mkey)
- if err != nil {
- panic(err)
- }
- vSnap, err := curVersion(entsSnap, mkey)
- if err != nil {
- panic(err)
- }
+ vHead := curVersion(entsHead, mkey)
+ vSnap := curVersion(entsSnap, mkey)
if vHead != vSnap {
return false
}
@@ -277,7 +302,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
if m.data == nil {
err = d.del(m.key.Namespace(), m.key)
} else {
- _, err = d.putInner(m.key, m.data)
+ _, err = d.put(m.key.Namespace(), m.key, m.data)
}
if err != nil {
panic(err)
@@ -286,7 +311,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
}
}
-func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error) {
+func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj {
return &txnDataStoreData{
// alias to the main datastore's so that testing code can have primitive
// access to break features inside of transactions.
@@ -294,7 +319,7 @@ func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error
isXG: o != nil && o.XG,
snap: d.store.Snapshot(),
muts: map[string][]txnMutation{},
- }, nil
+ }
}
func (d *dataStoreData) endTxn() {}
@@ -337,17 +362,17 @@ func (td *txnDataStoreData) endTxn() {
func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
panic("txnDataStoreData cannot apply transactions")
}
-func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) {
- return nil, errors.New("datastore: nested transactions are not supported")
+func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj {
+ panic("impossible")
}
-func (td *txnDataStoreData) isBroken() error {
+func (td *txnDataStoreData) run(f func() error) error {
// Slightly different from the SDK... datastore and taskqueue each implement
// this here, where in the SDK only datastore.transaction.Call does.
if atomic.LoadInt32(&td.closed) == 1 {
return errors.New("datastore: transaction context has expired")
}
- return nil
+ return f()
}
// writeMutation ensures that this transaction can support the given key/value
@@ -389,49 +414,62 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.
}
func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSKey, error) {
- pm, err := putPrelim(ns, key, pls)
- if err != nil {
- return nil, err
+ keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls})
+ if errs == nil {
+ return keys[0], nil
}
+ return nil, gae.SingleError(errs)
+}
- func() {
- td.parent.Lock()
- defer td.parent.Unlock()
- _, key, err = td.parent.entsKeyLocked(key)
- }()
+func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSKey, error) {
+ pmaps, err := putMultiPrelim(ns, keys, plss)
if err != nil {
return nil, err
}
- if err = td.writeMutation(false, key, pm); err != nil {
- return nil, err
+ retKeys := make([]gae.DSKey, len(keys))
+ lme := gae.LazyMultiError{Size: len(keys)}
+ for i, k := range keys {
+ func() {
+ td.parent.Lock()
+ defer td.parent.Unlock()
+ _, k = td.parent.entsKeyLocked(k)
Vadim Sh. 2015/07/17 01:48:29 why not: td.parent.Lock() _, k = td.parent.entsKey
+ }()
+ lme.Assign(i, td.writeMutation(false, k, pmaps[i]))
+ retKeys[i] = k
}
- return key, nil
+ return retKeys, lme.Get()
}
func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) error {
- return getInner(ns, key, pls, func() (*memCollection, error) {
- if err := td.writeMutation(true, key, nil); err != nil {
- return nil, err
+ return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls}))
+}
+
+func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) error {
+ return getMultiInner(ns, keys, plss, func() (*memCollection, error) {
+ lme := gae.LazyMultiError{Size: len(keys)}
+ for i, k := range keys {
+ lme.Assign(i, td.writeMutation(true, k, nil))
}
- return td.snap.GetCollection("ents:" + ns), nil
+ return td.snap.GetCollection("ents:" + ns), lme.Get()
})
}
func (td *txnDataStoreData) del(ns string, key gae.DSKey) error {
- if !helper.DSKeyValid(key, ns, false) {
- return gae.ErrDSInvalidKey
- }
- return td.writeMutation(false, key, nil)
+ return gae.SingleError(td.delMulti(ns, []gae.DSKey{key}))
}
-func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool {
- // adds an id to k if it's incomplete.
- if helper.DSKeyIncomplete(k) {
- k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k.Parent())
+func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error {
+ lme := gae.LazyMultiError{Size: len(keys)}
+ for i, k := range keys {
+ if !helper.DSKeyValid(k, ns, false) {
+ lme.Assign(i, gae.ErrDSInvalidKey)
+ } else {
+ lme.Assign(i, td.writeMutation(false, k, nil))
+ }
}
- return helper.DSKeyValid(k, ns, allowSpecial)
+ return lme.Get()
}
func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte {
@@ -447,3 +485,32 @@ func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) {
func rpm(data []byte) (gae.DSPropertyMap, error) {
return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContext, "", "")
}
+
+func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, potentialKey, allowSpecial bool) error {
+ vfn := func(k gae.DSKey) bool {
+ return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, allowSpecial)
+ }
+ if potentialKey {
+ vfn = func(k gae.DSKey) bool {
+ // adds an id to k if it's incomplete.
+ if helper.DSKeyIncomplete(k) {
+ k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k.Parent())
+ }
+ return helper.DSKeyValid(k, ns, allowSpecial)
+ }
+ }
+
+ if keys == nil || plss == nil {
+ return errors.New("gae: key or plss slices were nil")
+ }
+ if len(keys) != len(plss) {
+ return errors.New("gae: key and dst slices have different length")
+ }
+ lme := gae.LazyMultiError{Size: len(keys)}
+ for i, k := range keys {
+ if !vfn(k) {
+ lme.Assign(i, gae.ErrDSInvalidKey)
+ }
+ }
+ return lme.Get()
+}
« no previous file with comments | « go/src/infra/gae/libs/gae/memory/raw_datstore.go ('k') | go/src/infra/gae/libs/gae/memory/raw_datstore_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698