Chromium Code Reviews| Index: go/src/infra/gae/libs/gae/memory/raw_datstore_data.go |
| diff --git a/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go b/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go |
| index 70533f693668bb993c045f5495bd708b4e76cc5a..e8d2744611dc0ae5f04dfb9c92e5ef20192af250 100644 |
| --- a/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go |
| +++ b/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go |
| @@ -63,38 +63,31 @@ func rootIDsKey(kind string) []byte { |
| helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) |
| } |
| -func curVersion(ents *memCollection, key []byte) (int64, error) { |
| +func curVersion(ents *memCollection, key []byte) int64 { |
| if v := ents.Get(key); v != nil { |
| pm, err := rpm(v) |
| if err != nil { |
| - return 0, err |
| + panic(err) // memory corruption |
| } |
| pl, ok := pm["__version__"] |
| if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { |
| - return pl[0].Value().(int64), nil |
| + return pl[0].Value().(int64) |
| } |
| - return 0, fmt.Errorf("__version__ property missing or wrong: %v", pm) |
| + panic(fmt.Errorf("__version__ property missing or wrong: %v", pm)) |
| } |
| - return 0, nil |
| + return 0 |
| } |
| -func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) { |
| - if ret, err = curVersion(ents, key); err != nil { |
| - ret = 0 |
| - } |
| - ret++ |
| - p := gae.DSProperty{} |
| - if err = p.SetValue(ret, true); err != nil { |
| - return |
| - } |
| +func incrementLocked(ents *memCollection, key []byte) int64 { |
| + ret := curVersion(ents, key) + 1 |
| buf := &bytes.Buffer{} |
| helper.WriteDSPropertyMap( |
| - buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext) |
| + buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}}, helper.WithContext) |
| ents.Set(key, buf.Bytes()) |
| - return |
| + return ret |
| } |
| -func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, error) { |
| +func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey) { |
|
Vadim Sh.
2015/07/17 01:48:29
do you have tests that spawn multiple goroutines a
|
| coll := "ents:" + key.Namespace() |
| ents := d.store.GetCollection(coll) |
| if ents == nil { |
| @@ -108,91 +101,118 @@ func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, |
| } else { |
| idKey = groupIDsKey(key) |
| } |
| - id, err := incrementLocked(ents, idKey) |
| - if err != nil { |
| - return nil, nil, err |
| - } |
| + id := incrementLocked(ents, idKey) |
| key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent()) |
| } |
| - return ents, key, nil |
| + return ents, key |
| } |
| -func putPrelim(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSPropertyMap, error) { |
| - if !keyCouldBeValid(key, ns, false) { |
| - // TODO(riannucci): different error for Put-ing to reserved Keys? |
| - return nil, gae.ErrDSInvalidKey |
| +func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSKey, error) { |
| + keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls}) |
| + if errs == nil { |
| + return keys[0], nil |
| } |
| - return pls.Save(false) |
| + return nil, gae.SingleError(errs) |
| } |
| -func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSKey, error) { |
| - pm, err := putPrelim(ns, key, pls) |
| +func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSKey, error) { |
| + pmaps, err := putMultiPrelim(ns, keys, plss) |
| if err != nil { |
| return nil, err |
| } |
| - return d.putInner(key, pm) |
| + return d.putMultiInner(keys, pmaps) |
| } |
| -func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSKey, error) { |
| - buf := &bytes.Buffer{} |
| - helper.WriteDSPropertyMap(buf, data, helper.WithoutContext) |
| - dataBytes := buf.Bytes() |
| - |
| - d.rwlock.Lock() |
| - defer d.rwlock.Unlock() |
| - |
| - ents, key, err := d.entsKeyLocked(key) |
| +func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSPropertyMap, error) { |
| + err := multiValid(keys, plss, ns, true, false) |
| if err != nil { |
| return nil, err |
| } |
| - if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { |
| - return nil, err |
| - } |
| - |
| - old := ents.Get(keyBytes(helper.WithoutContext, key)) |
| - oldPM := gae.DSPropertyMap(nil) |
| - if old != nil { |
| - if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil { |
| - return nil, err |
| - } |
| - } |
| - if err = updateIndicies(d.store, key, oldPM, data); err != nil { |
| - return nil, err |
| + pmaps := make([]gae.DSPropertyMap, len(keys)) |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + for i, pls := range plss { |
| + pm, err := pls.Save(false) |
| + lme.Assign(i, err) |
| + pmaps[i] = pm |
| + } |
| + return pmaps, lme.Get() |
| +} |
| + |
| +func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap) ([]gae.DSKey, error) { |
| + retKeys := make([]gae.DSKey, len(keys)) |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + for i, k := range keys { |
| + buf := &bytes.Buffer{} |
| + helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext) |
| + dataBytes := buf.Bytes() |
| + |
| + rKey, err := func() (ret gae.DSKey, err error) { |
| + d.rwlock.Lock() |
| + defer d.rwlock.Unlock() |
| + |
| + ents, ret := d.entsKeyLocked(k) |
| + incrementLocked(ents, groupMetaKey(ret)) |
| + |
| + old := ents.Get(keyBytes(helper.WithoutContext, ret)) |
| + oldPM := gae.DSPropertyMap(nil) |
| + if old != nil { |
| + if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { |
| + return |
| + } |
| + } |
| + updateIndicies(d.store, ret, oldPM, data[i]) |
| + ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes) |
| + return |
| + }() |
| + lme.Assign(i, err) |
| + retKeys[i] = rKey |
| } |
| - |
| - ents.Set(keyBytes(helper.WithoutContext, key), dataBytes) |
| - |
| - return key, nil |
| + return retKeys, lme.Get() |
| } |
| -func getInner(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver, getColl func() (*memCollection, error)) error { |
| - if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) { |
| - return gae.ErrDSInvalidKey |
| +func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, getColl func() (*memCollection, error)) error { |
| + if err := multiValid(keys, plss, ns, false, true); err != nil { |
| + return err |
| } |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + |
| ents, err := getColl() |
| if err != nil { |
| return err |
| } |
| if ents == nil { |
| - return gae.ErrDSNoSuchEntity |
| - } |
| - pdata := ents.Get(keyBytes(helper.WithoutContext, key)) |
| - if pdata == nil { |
| - return gae.ErrDSNoSuchEntity |
| + for i := range keys { |
| + lme.Assign(i, gae.ErrDSNoSuchEntity) |
| + } |
| + return lme.Get() |
| } |
| - got, err := rpmWoCtx(pdata, ns) |
| - if err != nil { |
| - return err |
| - } |
| + for i, k := range keys { |
| + pdata := ents.Get(keyBytes(helper.WithoutContext, k)) |
| + if pdata == nil { |
| + lme.Assign(i, gae.ErrDSNoSuchEntity) |
| + continue |
| + } |
| - return pls.Load(got) |
| + got, err := rpmWoCtx(pdata, ns) |
| + if err != nil { |
| + lme.Assign(i, err) |
| + continue |
| + } |
| + |
| + lme.Assign(i, plss[i].Load(got)) |
| + } |
| + return lme.Get() |
| } |
| func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) error { |
| - return getInner(ns, key, pls, func() (*memCollection, error) { |
| + return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls})) |
| +} |
| + |
| +func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) error { |
| + return getMultiInner(ns, keys, plss, func() (*memCollection, error) { |
| d.rwlock.RLock() |
| s := d.store.Snapshot() |
| d.rwlock.RUnlock() |
| @@ -202,11 +222,23 @@ func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave |
| } |
| func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { |
| - if !helper.DSKeyValid(key, ns, false) { |
| - return gae.ErrDSInvalidKey |
| - } |
| + return gae.SingleError(d.delMulti(ns, []gae.DSKey{key})) |
| +} |
| - keyBuf := keyBytes(helper.WithoutContext, key) |
| +func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error { |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + toDel := make([][]byte, 0, len(keys)) |
| + for i, k := range keys { |
| + if !helper.DSKeyValid(k, ns, false) { |
| + lme.Assign(i, gae.ErrDSInvalidKey) |
| + continue |
| + } |
| + toDel = append(toDel, keyBytes(helper.WithoutContext, k)) |
| + } |
| + err := lme.Get() |
| + if err != nil { |
|
Vadim Sh.
2015/07/17 01:48:29
nit: if err := ...; err != nil {
|
| + return err |
| + } |
| d.rwlock.Lock() |
| defer d.rwlock.Unlock() |
| @@ -215,23 +247,22 @@ func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { |
| if ents == nil { |
| return nil |
| } |
| - if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { |
| - return |
| - } |
| - old := ents.Get(keyBuf) |
| - oldPM := gae.DSPropertyMap(nil) |
| - if old != nil { |
| - if oldPM, err = rpmWoCtx(old, ns); err != nil { |
| - return |
| + for i, k := range keys { |
| + incrementLocked(ents, groupMetaKey(k)) |
| + kb := toDel[i] |
| + old := ents.Get(kb) |
| + oldPM := gae.DSPropertyMap(nil) |
| + if old != nil { |
| + if oldPM, err = rpmWoCtx(old, ns); err != nil { |
| + lme.Assign(i, err) |
| + continue |
| + } |
| } |
| + updateIndicies(d.store, k, oldPM, nil) |
| + ents.Delete(kb) |
| } |
| - if err := updateIndicies(d.store, key, oldPM, nil); err != nil { |
| - return err |
| - } |
| - |
| - ents.Delete(keyBuf) |
| - return nil |
| + return lme.Get() |
| } |
| func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| @@ -251,14 +282,8 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| mkey := groupMetaKey(k) |
| entsHead := d.store.GetCollection(entKey) |
| entsSnap := txn.snap.GetCollection(entKey) |
| - vHead, err := curVersion(entsHead, mkey) |
| - if err != nil { |
| - panic(err) |
| - } |
| - vSnap, err := curVersion(entsSnap, mkey) |
| - if err != nil { |
| - panic(err) |
| - } |
| + vHead := curVersion(entsHead, mkey) |
| + vSnap := curVersion(entsSnap, mkey) |
| if vHead != vSnap { |
| return false |
| } |
| @@ -277,7 +302,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
| if m.data == nil { |
| err = d.del(m.key.Namespace(), m.key) |
| } else { |
| - _, err = d.putInner(m.key, m.data) |
| + _, err = d.put(m.key.Namespace(), m.key, m.data) |
| } |
| if err != nil { |
| panic(err) |
| @@ -286,7 +311,7 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
| } |
| } |
| -func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error) { |
| +func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj { |
| return &txnDataStoreData{ |
| // alias to the main datastore's so that testing code can have primitive |
| // access to break features inside of transactions. |
| @@ -294,7 +319,7 @@ func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error |
| isXG: o != nil && o.XG, |
| snap: d.store.Snapshot(), |
| muts: map[string][]txnMutation{}, |
| - }, nil |
| + } |
| } |
| func (d *dataStoreData) endTxn() {} |
| @@ -337,17 +362,17 @@ func (td *txnDataStoreData) endTxn() { |
| func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
| panic("txnDataStoreData cannot apply transactions") |
| } |
| -func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) { |
| - return nil, errors.New("datastore: nested transactions are not supported") |
| +func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj { |
| + panic("impossible") |
| } |
| -func (td *txnDataStoreData) isBroken() error { |
| +func (td *txnDataStoreData) run(f func() error) error { |
| // Slightly different from the SDK... datastore and taskqueue each implement |
| // this here, where in the SDK only datastore.transaction.Call does. |
| if atomic.LoadInt32(&td.closed) == 1 { |
| return errors.New("datastore: transaction context has expired") |
| } |
| - return nil |
| + return f() |
| } |
| // writeMutation ensures that this transaction can support the given key/value |
| @@ -389,49 +414,62 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae. |
| } |
| func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) (gae.DSKey, error) { |
| - pm, err := putPrelim(ns, key, pls) |
| - if err != nil { |
| - return nil, err |
| + keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls}) |
| + if errs == nil { |
| + return keys[0], nil |
| } |
| + return nil, gae.SingleError(errs) |
| +} |
| - func() { |
| - td.parent.Lock() |
| - defer td.parent.Unlock() |
| - _, key, err = td.parent.entsKeyLocked(key) |
| - }() |
| +func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSKey, error) { |
| + pmaps, err := putMultiPrelim(ns, keys, plss) |
| if err != nil { |
| return nil, err |
| } |
| - if err = td.writeMutation(false, key, pm); err != nil { |
| - return nil, err |
| + retKeys := make([]gae.DSKey, len(keys)) |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + for i, k := range keys { |
| + func() { |
| + td.parent.Lock() |
| + defer td.parent.Unlock() |
| + _, k = td.parent.entsKeyLocked(k) |
|
Vadim Sh.
2015/07/17 01:48:29
why not:
td.parent.Lock()
_, k = td.parent.entsKey
|
| + }() |
| + lme.Assign(i, td.writeMutation(false, k, pmaps[i])) |
| + retKeys[i] = k |
| } |
| - return key, nil |
| + return retKeys, lme.Get() |
| } |
| func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSaver) error { |
| - return getInner(ns, key, pls, func() (*memCollection, error) { |
| - if err := td.writeMutation(true, key, nil); err != nil { |
| - return nil, err |
| + return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver{pls})) |
| +} |
| + |
| +func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) error { |
| + return getMultiInner(ns, keys, plss, func() (*memCollection, error) { |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + for i, k := range keys { |
| + lme.Assign(i, td.writeMutation(true, k, nil)) |
| } |
| - return td.snap.GetCollection("ents:" + ns), nil |
| + return td.snap.GetCollection("ents:" + ns), lme.Get() |
| }) |
| } |
| func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { |
| - if !helper.DSKeyValid(key, ns, false) { |
| - return gae.ErrDSInvalidKey |
| - } |
| - return td.writeMutation(false, key, nil) |
| + return gae.SingleError(td.delMulti(ns, []gae.DSKey{key})) |
| } |
| -func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool { |
| - // adds an id to k if it's incomplete. |
| - if helper.DSKeyIncomplete(k) { |
| - k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k.Parent()) |
| +func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error { |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + for i, k := range keys { |
| + if !helper.DSKeyValid(k, ns, false) { |
| + lme.Assign(i, gae.ErrDSInvalidKey) |
| + } else { |
| + lme.Assign(i, td.writeMutation(false, k, nil)) |
| + } |
| } |
| - return helper.DSKeyValid(k, ns, allowSpecial) |
| + return lme.Get() |
| } |
| func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { |
| @@ -447,3 +485,32 @@ func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { |
| func rpm(data []byte) (gae.DSPropertyMap, error) { |
| return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContext, "", "") |
| } |
| + |
| +func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, potentialKey, allowSpecial bool) error { |
| + vfn := func(k gae.DSKey) bool { |
| + return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, allowSpecial) |
| + } |
| + if potentialKey { |
| + vfn = func(k gae.DSKey) bool { |
| + // adds an id to k if it's incomplete. |
| + if helper.DSKeyIncomplete(k) { |
| + k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k.Parent()) |
| + } |
| + return helper.DSKeyValid(k, ns, allowSpecial) |
| + } |
| + } |
| + |
| + if keys == nil || plss == nil { |
| + return errors.New("gae: key or plss slices were nil") |
| + } |
| + if len(keys) != len(plss) { |
| + return errors.New("gae: key and dst slices have different length") |
| + } |
| + lme := gae.LazyMultiError{Size: len(keys)} |
| + for i, k := range keys { |
| + if !vfn(k) { |
| + lme.Assign(i, gae.ErrDSInvalidKey) |
| + } |
| + } |
| + return lme.Get() |
| +} |