Index: go/src/infra/gae/libs/wrapper/memory/datastore_data.go |
diff --git a/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go b/go/src/infra/gae/libs/wrapper/memory/datastore_data.go |
similarity index 55% |
rename from go/src/infra/gae/libs/gae/memory/raw_datstore_data.go |
rename to go/src/infra/gae/libs/wrapper/memory/datastore_data.go |
index c786835c195b207c06348874f43932d342651c4b..d36d52ab622c09dc1e2e64da884739f1e3252460 100644 |
--- a/go/src/infra/gae/libs/gae/memory/raw_datstore_data.go |
+++ b/go/src/infra/gae/libs/wrapper/memory/datastore_data.go |
@@ -5,21 +5,51 @@ |
package memory |
import ( |
- "bytes" |
"errors" |
- "fmt" |
- "golang.org/x/net/context" |
+ "infra/gae/libs/wrapper" |
+ goon_internal "infra/gae/libs/wrapper/memory/internal/goon" |
"sync" |
"sync/atomic" |
- "infra/gae/libs/gae" |
- "infra/gae/libs/gae/helper" |
+ "github.com/mjibson/goon" |
+ |
+ "appengine/datastore" |
+ pb "appengine_internal/datastore" |
+ "golang.org/x/net/context" |
) |
+////////////////////////////////// knrKeeper /////////////////////////////////// |
+ |
+type knrKeeper struct { |
+ knrLock sync.Mutex |
+ knrFunc goon.KindNameResolver |
+} |
+ |
+var _ = wrapper.DSKindSetter((*knrKeeper)(nil)) |
+ |
+func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { |
+ k.knrLock.Lock() |
+ defer k.knrLock.Unlock() |
+ if k.knrFunc == nil { |
+ k.knrFunc = goon.DefaultKindName |
+ } |
+ return k.knrFunc |
+} |
+ |
+func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { |
+ k.knrLock.Lock() |
+ defer k.knrLock.Unlock() |
+ if knr == nil { |
+ knr = goon.DefaultKindName |
+ } |
+ k.knrFunc = knr |
+} |
+ |
//////////////////////////////// dataStoreData ///////////////////////////////// |
type dataStoreData struct { |
- gae.BrokenFeatures |
+ wrapper.BrokenFeatures |
+ knrKeeper |
rwlock sync.RWMutex |
// See README.md for store schema. |
@@ -30,13 +60,14 @@ type dataStoreData struct { |
var ( |
_ = memContextObj((*dataStoreData)(nil)) |
_ = sync.Locker((*dataStoreData)(nil)) |
- _ = gae.Testable((*dataStoreData)(nil)) |
+ _ = wrapper.Testable((*dataStoreData)(nil)) |
+ _ = wrapper.DSKindSetter((*dataStoreData)(nil)) |
) |
func newDataStoreData() *dataStoreData { |
store := newMemStore() |
return &dataStoreData{ |
- BrokenFeatures: gae.BrokenFeatures{DefaultError: errors.New("INTERNAL_ERROR")}, |
+ BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError(pb.Error_INTERNAL_ERROR)}, |
store: store, |
snap: store.Snapshot(), // empty but better than a nil pointer. |
} |
@@ -52,60 +83,59 @@ func (d *dataStoreData) Unlock() { |
/////////////////////////// indicies(dataStoreData) //////////////////////////// |
-func groupMetaKey(key gae.DSKey) []byte { |
- return keyBytes(helper.WithoutContext, |
- helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyRoot(key))) |
+func groupMetaKey(key *datastore.Key) []byte { |
+ return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key))) |
} |
-func groupIDsKey(key gae.DSKey) []byte { |
- return keyBytes(helper.WithoutContext, |
- helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DSKeyRoot(key))) |
+func groupIDsKey(key *datastore.Key) []byte { |
+ return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(key))) |
} |
func rootIDsKey(kind string) []byte { |
- return keyBytes(helper.WithoutContext, |
- helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil)) |
+ return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) |
} |
func curVersion(ents *memCollection, key []byte) (int64, error) { |
if v := ents.Get(key); v != nil { |
- pm, err := rpm(v) |
- if err != nil { |
+ numData := &propertyList{} |
+ if err := numData.UnmarshalBinary(v); err != nil { |
return 0, err |
} |
- pl, ok := pm["__version__"] |
- if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt { |
- return pl[0].Value().(int64), nil |
- } |
- return 0, fmt.Errorf("__version__ property missing or wrong: %v", pm) |
+ return (*numData)[0].Value.(int64), nil |
} |
return 0, nil |
} |
-func incrementLocked(ents *memCollection, key []byte) (ret int64, err error) { |
- if ret, err = curVersion(ents, key); err != nil { |
- ret = 0 |
+func incrementLocked(ents *memCollection, key []byte) (int64, error) { |
+ num := int64(0) |
+ numData := &propertyList{} |
+ if v := ents.Get(key); v != nil { |
+ if err := numData.UnmarshalBinary(v); err != nil { |
+ return 0, err |
+ } |
+ num = (*numData)[0].Value.(int64) |
+ } else { |
+ *numData = append(*numData, datastore.Property{Name: "__version__"}) |
} |
- ret++ |
- p := gae.DSProperty{} |
- if err = p.SetValue(ret, true); err != nil { |
- return |
+ num++ |
+ (*numData)[0].Value = num |
+ incData, err := numData.MarshalBinary() |
+ if err != nil { |
+ return 0, err |
} |
- buf := &bytes.Buffer{} |
- helper.WriteDSPropertyMap( |
- buf, gae.DSPropertyMap{"__version__": {p}}, helper.WithContext) |
- ents.Set(key, buf.Bytes()) |
- return |
+ ents.Set(key, incData) |
+ |
+ return num, nil |
} |
-func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, error) { |
+func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *datastore.Key, error) { |
coll := "ents:" + key.Namespace() |
ents := d.store.GetCollection(coll) |
if ents == nil { |
ents = d.store.SetCollection(coll, nil) |
} |
- if helper.DSKeyIncomplete(key) { |
+ if key.Incomplete() { |
idKey := []byte(nil) |
if key.Parent() == nil { |
idKey = rootIDsKey(key.Kind()) |
@@ -116,40 +146,39 @@ func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey, |
if err != nil { |
return nil, nil, err |
} |
- key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent()) |
+ key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) |
} |
return ents, key, nil |
} |
-func putPrelim(ns string, key gae.DSKey, src interface{}) (gae.DSPropertyMap, error) { |
- if !keyCouldBeValid(key, ns, false) { |
+func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastore.Key, *propertyList, error) { |
+ key := newKeyObj(ns, knr, src) |
+ if !keyCouldBeValid(ns, key, userKeyOnly) { |
// TODO(riannucci): different error for Put-ing to reserved Keys? |
- return nil, gae.ErrDSInvalidKey |
+ return nil, nil, datastore.ErrInvalidKey |
} |
- pls, err := helper.GetPLS(src) |
- if err != nil { |
- return nil, err |
- } |
- return pls.Save() |
+ data, err := toPL(src) |
+ return key, data, err |
} |
-func (d *dataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKey, error) { |
- pmData, err := putPrelim(ns, key, src) |
+func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { |
+ key, plData, err := putPrelim(ns, d.KindNameResolver(), src) |
if err != nil { |
return nil, err |
} |
- if key, err = d.putInner(key, pmData); err != nil { |
+ if key, err = d.putInner(key, plData); err != nil { |
return nil, err |
} |
- return key, nil |
+ return key, goon_internal.SetStructKey(src, key, d.KindNameResolver()) |
} |
-func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSKey, error) { |
- buf := &bytes.Buffer{} |
- helper.WriteDSPropertyMap(buf, data, helper.WithoutContext) |
- dataBytes := buf.Bytes() |
+func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datastore.Key, error) { |
+ dataBytes, err := data.MarshalBinary() |
+ if err != nil { |
+ return nil, err |
+ } |
d.rwlock.Lock() |
defer d.rwlock.Unlock() |
@@ -162,57 +191,49 @@ func (d *dataStoreData) putInner(key gae.DSKey, data gae.DSPropertyMap) (gae.DSK |
return nil, err |
} |
- old := ents.Get(keyBytes(helper.WithoutContext, key)) |
- oldPM := gae.DSPropertyMap(nil) |
+ old := ents.Get(keyBytes(noNS, key)) |
+ oldPl := (*propertyList)(nil) |
if old != nil { |
- if oldPM, err = rpmWoCtx(old, key.Namespace()); err != nil { |
+ oldPl = &propertyList{} |
+ if err = oldPl.UnmarshalBinary(old); err != nil { |
return nil, err |
} |
} |
- if err = updateIndicies(d.store, key, oldPM, data); err != nil { |
+ if err = updateIndicies(d.store, key, oldPl, data); err != nil { |
return nil, err |
} |
- ents.Set(keyBytes(helper.WithoutContext, key), dataBytes) |
+ ents.Set(keyBytes(noNS, key), dataBytes) |
return key, nil |
} |
-func getInner(ns string, key gae.DSKey, dst interface{}, getColl func() (*memCollection, error)) error { |
- if helper.DSKeyIncomplete(key) || !helper.DSKeyValid(key, ns, true) { |
- return gae.ErrDSInvalidKey |
+func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl func(*datastore.Key) (*memCollection, error)) error { |
+ key := newKeyObj(ns, knr, dst) |
+ if !keyValid(ns, key, allowSpecialKeys) { |
+ return datastore.ErrInvalidKey |
} |
- ents, err := getColl() |
+ ents, err := getColl(key) |
if err != nil { |
return err |
} |
if ents == nil { |
- return gae.ErrDSNoSuchEntity |
+ return datastore.ErrNoSuchEntity |
} |
- pdata := ents.Get(keyBytes(helper.WithoutContext, key)) |
+ pdata := ents.Get(keyBytes(noNS, key)) |
if pdata == nil { |
- return gae.ErrDSNoSuchEntity |
- } |
- |
- pm, err := rpmWoCtx(pdata, ns) |
- if err != nil { |
- return err |
+ return datastore.ErrNoSuchEntity |
} |
- |
- pls, err := helper.GetPLS(dst) |
- if err != nil { |
+ pl := &propertyList{} |
+ if err = pl.UnmarshalBinary(pdata); err != nil { |
return err |
} |
- |
- // TODO(riannucci): should the Get API reveal conversion errors instead of |
- // swallowing them? |
- _, err = pls.Load(pm) |
- return err |
+ return fromPL(pl, dst) |
} |
-func (d *dataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { |
- return getInner(ns, key, dst, func() (*memCollection, error) { |
+func (d *dataStoreData) get(ns string, dst interface{}) error { |
+ return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*memCollection, error) { |
d.rwlock.RLock() |
s := d.store.Snapshot() |
d.rwlock.RUnlock() |
@@ -221,12 +242,12 @@ func (d *dataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { |
}) |
} |
-func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { |
- if !helper.DSKeyValid(key, ns, false) { |
- return gae.ErrDSInvalidKey |
+func (d *dataStoreData) del(ns string, key *datastore.Key) error { |
+ if !keyValid(ns, key, userKeyOnly) { |
+ return datastore.ErrInvalidKey |
} |
- keyBuf := keyBytes(helper.WithoutContext, key) |
+ keyBuf := keyBytes(noNS, key) |
d.rwlock.Lock() |
defer d.rwlock.Unlock() |
@@ -235,18 +256,19 @@ func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) { |
if ents == nil { |
return nil |
} |
- if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil { |
- return |
+ if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil { |
+ return err |
} |
old := ents.Get(keyBuf) |
- oldPM := gae.DSPropertyMap(nil) |
+ oldPl := (*propertyList)(nil) |
if old != nil { |
- if oldPM, err = rpmWoCtx(old, ns); err != nil { |
- return |
+ oldPl = &propertyList{} |
+ if err := oldPl.UnmarshalBinary(old); err != nil { |
+ return err |
} |
} |
- if err := updateIndicies(d.store, key, oldPM, nil); err != nil { |
+ if err := updateIndicies(d.store, key, oldPl, nil); err != nil { |
return err |
} |
@@ -262,11 +284,10 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
if len(muts) == 0 { // read-only |
continue |
} |
- k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.WithContext, "", "") |
+ k, err := keyFromByteString(withNS, rk, "") |
if err != nil { |
panic(err) |
} |
- |
entKey := "ents:" + k.Namespace() |
mkey := groupMetaKey(k) |
entsHead := d.store.GetCollection(entKey) |
@@ -306,12 +327,13 @@ func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { |
} |
} |
-func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) (memContextObj, error) { |
+func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, error) { |
return &txnDataStoreData{ |
// alias to the main datastore's so that testing code can have primitive |
// access to break features inside of transactions. |
BrokenFeatures: &d.BrokenFeatures, |
parent: d, |
+ knrKeeper: knrKeeper{knrFunc: d.knrFunc}, |
isXG: o != nil && o.XG, |
snap: d.store.Snapshot(), |
muts: map[string][]txnMutation{}, |
@@ -323,12 +345,13 @@ func (d *dataStoreData) endTxn() {} |
/////////////////////////////// txnDataStoreData /////////////////////////////// |
type txnMutation struct { |
- key gae.DSKey |
- data gae.DSPropertyMap |
+ key *datastore.Key |
+ data *propertyList |
} |
type txnDataStoreData struct { |
- *gae.BrokenFeatures |
+ *wrapper.BrokenFeatures |
+ knrKeeper |
sync.Mutex |
parent *dataStoreData |
@@ -348,7 +371,8 @@ type txnDataStoreData struct { |
var ( |
_ = memContextObj((*txnDataStoreData)(nil)) |
_ = sync.Locker((*txnDataStoreData)(nil)) |
- _ = gae.Testable((*txnDataStoreData)(nil)) |
+ _ = wrapper.Testable((*txnDataStoreData)(nil)) |
+ _ = wrapper.DSKindSetter((*txnDataStoreData)(nil)) |
) |
const xgEGLimit = 25 |
@@ -363,17 +387,17 @@ func (td *txnDataStoreData) endTxn() { |
func (*txnDataStoreData) applyTxn(context.Context, memContextObj) { |
panic("txnDataStoreData cannot apply transactions") |
} |
-func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) (memContextObj, error) { |
+func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
return nil, errors.New("datastore: nested transactions are not supported") |
} |
-func (td *txnDataStoreData) RunIfNotBroken(f func() error) error { |
+func (td *txnDataStoreData) IsBroken() error { |
// Slightly different from the SDK... datastore and taskqueue each implement |
// this here, where in the SDK only datastore.transaction.Call does. |
if atomic.LoadInt32(&td.closed) == 1 { |
return errors.New("datastore: transaction context has expired") |
} |
- return td.BrokenFeatures.RunIfNotBroken(f) |
+ return td.BrokenFeatures.IsBroken() |
} |
// writeMutation ensures that this transaction can support the given key/value |
@@ -387,8 +411,8 @@ func (td *txnDataStoreData) RunIfNotBroken(f func() error) error { |
// |
// Returns an error if this key causes the transaction to cross too many entity |
// groups. |
-func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae.DSPropertyMap) error { |
- rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key))) |
+func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error { |
+ rk := string(keyBytes(withNS, rootKey(key))) |
td.Lock() |
defer td.Unlock() |
@@ -403,7 +427,7 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae. |
if td.isXG { |
msg = "operating on too many entity groups in a single transaction" |
} |
- return errors.New(msg) |
+ return newDSError(pb.Error_BAD_REQUEST, msg) |
} |
td.muts[rk] = []txnMutation{} |
} |
@@ -414,8 +438,8 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae. |
return nil |
} |
-func (td *txnDataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae.DSKey, error) { |
- pMap, err := putPrelim(ns, key, src) |
+func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { |
+ key, plData, err := putPrelim(ns, td.KindNameResolver(), src) |
if err != nil { |
return nil, err |
} |
@@ -429,15 +453,15 @@ func (td *txnDataStoreData) put(ns string, key gae.DSKey, src interface{}) (gae. |
return nil, err |
} |
- if err = td.writeMutation(false, key, pMap); err != nil { |
+ if err = td.writeMutation(false, key, plData); err != nil { |
return nil, err |
} |
- return key, nil |
+ return key, goon_internal.SetStructKey(src, key, td.KindNameResolver()) |
} |
-func (td *txnDataStoreData) get(ns string, key gae.DSKey, dst interface{}) error { |
- return getInner(ns, key, dst, func() (*memCollection, error) { |
+func (td *txnDataStoreData) get(ns string, dst interface{}) error { |
+ return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) { |
if err := td.writeMutation(true, key, nil); err != nil { |
return nil, err |
} |
@@ -445,31 +469,9 @@ func (td *txnDataStoreData) get(ns string, key gae.DSKey, dst interface{}) error |
}) |
} |
-func (td *txnDataStoreData) del(ns string, key gae.DSKey) error { |
- if !helper.DSKeyValid(key, ns, false) { |
- return gae.ErrDSInvalidKey |
+func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { |
+ if !keyValid(ns, key, userKeyOnly) { |
+ return datastore.ErrInvalidKey |
} |
return td.writeMutation(false, key, nil) |
} |
- |
-func keyCouldBeValid(k gae.DSKey, ns string, allowSpecial bool) bool { |
- // adds an id to k if it's incomplete. |
- if helper.DSKeyIncomplete(k) { |
- k = helper.NewDSKey(k.AppID(), k.Namespace(), k.Kind(), "", 1, k.Parent()) |
- } |
- return helper.DSKeyValid(k, ns, allowSpecial) |
-} |
- |
-func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte { |
- buf := &bytes.Buffer{} |
- helper.WriteDSKey(buf, ctx, key) |
- return buf.Bytes() |
-} |
- |
-func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) { |
- return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutContext, globalAppID, ns) |
-} |
- |
-func rpm(data []byte) (gae.DSPropertyMap, error) { |
- return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContext, "", "") |
-} |