Chromium Code Reviews| Index: go/src/infra/gae/libs/wrapper/memory/datastoreData.go |
| diff --git a/go/src/infra/gae/libs/wrapper/memory/datastoreData.go b/go/src/infra/gae/libs/wrapper/memory/datastoreData.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..b41fcc4c61619b0140ef3d19a9eaa2c93d8d4a79 |
| --- /dev/null |
| +++ b/go/src/infra/gae/libs/wrapper/memory/datastoreData.go |
| @@ -0,0 +1,480 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
|
Vadim Sh.
2015/05/24 19:43:26
datastore_data.go?
iannucci
2015/05/24 20:33:54
done
|
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package memory |
| + |
| +import ( |
| + "errors" |
| + "math/rand" |
| + "sync" |
| + "sync/atomic" |
| + |
| + "appengine/datastore" |
| + pb "appengine_internal/datastore" |
| + |
| + "github.com/mjibson/goon" |
| + |
| + "infra/gae/libs/wrapper" |
| +) |
| + |
| +////////////////////////////////// knrKeeper /////////////////////////////////// |
| + |
| +type knrKeeper struct { |
| + lock sync.RWMutex |
| + fun goon.KindNameResolver |
| +} |
| + |
| +func newKnrKeeper(fun goon.KindNameResolver) *knrKeeper { |
| + if fun == nil { |
| + fun = goon.DefaultKindName |
| + } |
| + return &knrKeeper{fun: fun} |
| +} |
| + |
| +func (k *knrKeeper) KindNameResolver() goon.KindNameResolver { |
| + k.lock.RLock() |
| + defer k.lock.RUnlock() |
| + return k.fun |
| +} |
| +func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) { |
|
Vadim Sh.
2015/05/24 19:43:26
looks like you didn't autoformat your code
iannucci
2015/05/24 20:33:54
I did, but gofmt will let methods stick together.
|
| + k.lock.Lock() |
| + defer k.lock.Unlock() |
| + if knr == nil { |
| + knr = goon.DefaultKindName |
| + } |
| + k.fun = knr |
| +} |
| + |
| +//////////////////////////////// dataStoreData ///////////////////////////////// |
| + |
| +type dataStoreData struct { |
| + *wrapper.BrokenFeatures |
| + *knrKeeper |
| + |
| + sync.RWMutex |
| + store *boomStore |
| + snap *boomStore |
| + |
| + /* collections: |
| + * ents:ns -> key -> value |
| + * (rootkind, rootid, __entity_group__,1) -> {__version__: int} |
| + * (rootkind, rootid, __entity_group_ids__,1) -> {__version__: int} |
| + * (__entity_group_ids__,1) -> {__version__: int} |
| + * idx -> kind,A?,[-?prop]* |
| + * idx:ns:kind -> key = nil |
| + * idx:ns:kind|prop -> propval|key = [prev val] |
| + * idx:ns:kind|-prop -> -propval|key = [next val] |
| + * idx:ns:kind|A|?prop|?prop -> A|propval|propval|key = [prev/next val]|[prev/next val] |
| + * idx:ns:kind|?prop|?prop -> propval|propval|key = [prev/next val]|[prev/next val] |
| + */ |
| +} |
| + |
| +////////////////////////////// New(dataStoreData) ////////////////////////////// |
| + |
| +func newDataStoreData() *dataStoreData { |
| + store := newBoomStore() |
| + return &dataStoreData{ |
| + BrokenFeatures: wrapper.NewBrokenFeatures(newDSError(pb.Error_INTERNAL_ERROR)), |
| + knrKeeper: newKnrKeeper(nil), |
| + store: store, |
| + snap: store.Snapshot(), // empty but better than a nil pointer. |
| + } |
| +} |
| + |
| +func groupMetaKey(key *datastore.Key) []byte { |
| + return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key))) |
| +} |
| + |
| +func groupIDsKey(key *datastore.Key) []byte { |
| + return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey(key))) |
| +} |
| + |
| +func rootIDsKey(kind string) []byte { |
| + return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil)) |
| +} |
| + |
| +func curval(ents *boomCollection, key []byte) (int64, error) { |
| + var err error |
| + v := ents.Get(key) |
| + num := int64(0) |
| + numData := &propertyList{} |
| + if v != nil { |
| + err = numData.UnmarshalBinary(v) |
| + num = (*numData)[0].Value.(int64) |
| + } |
| + return num, err |
| +} |
| + |
| +func plusPlusLocked(ents *boomCollection, key []byte) (int64, error) { |
| + v := ents.Get(key) |
| + |
| + num := int64(0) |
| + numData := &propertyList{} |
| + if v == nil { |
| + num++ |
| + *numData = append(*numData, datastore.Property{Name: "__version__"}) |
| + } else { |
| + err := numData.UnmarshalBinary(v) |
| + if err != nil { |
| + return 0, err |
| + } |
| + num = (*numData)[0].Value.(int64) |
| + num++ |
| + } |
| + (*numData)[0].Value = num |
| + incData, err := numData.MarshalBinary() |
| + if err != nil { |
| + return 0, err |
| + } |
| + ents.Set(key, incData) |
| + |
| + return num, nil |
| +} |
| + |
| +func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*boomCollection, *datastore.Key, error) { |
| + coll := "ents:" + key.Namespace() |
| + ents := d.store.GetCollection(coll) |
| + if ents == nil { |
| + ents = d.store.SetCollection(coll, nil) |
| + } |
| + |
| + if key.Incomplete() { |
| + var idKey []byte |
| + if key.Parent() == nil { |
| + idKey = rootIDsKey(key.Kind()) |
| + } else { |
| + idKey = groupIDsKey(key) |
| + } |
| + |
| + id, err := plusPlusLocked(ents, idKey) |
| + if err != nil { |
| + return nil, nil, err |
| + } |
| + key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent()) |
| + } |
| + |
| + return ents, key, nil |
| +} |
| + |
| +func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (key *datastore.Key, data *propertyList, err error) { |
| + key = newKeyObj(ns, knr, src) |
| + if !KeyCouldBeValid(ns, key, UserKeyOnly) { |
| + // TODO(riannucci): different error for Put-ing to reserved Keys? |
| + return nil, nil, datastore.ErrInvalidKey |
| + } |
| + |
| + data, err = toPL(src) |
| + return |
| +} |
| + |
| +func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { |
| + key, plData, err := putPrelim(ns, d.KindNameResolver(), src) |
| + if err != nil { |
| + return nil, err |
| + } |
| + key, err = d.putInner(key, plData) |
| + if err != nil { |
| + return nil, err |
| + } |
| + return key, setStructKey(src, key, d.KindNameResolver()) |
| +} |
| + |
| +func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datastore.Key, error) { |
| + dataBytes, err := data.MarshalBinary() |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + d.Lock() |
| + defer d.Unlock() |
| + |
| + ents, key, err := d.entsKeyLocked(key) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + _, err = plusPlusLocked(ents, groupMetaKey(key)) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + old := ents.Get(keyBytes(noNS, key)) |
| + oldPl := (*propertyList)(nil) |
| + if old != nil { |
| + oldPl = &propertyList{} |
| + err = oldPl.UnmarshalBinary(old) |
| + if err != nil { |
| + return nil, err |
| + } |
| + } |
| + ents.Set(keyBytes(noNS, key), dataBytes) |
| + |
| + return key, nil |
| +} |
| + |
| +func getInner(ns string, knr goon.KindNameResolver, dst interface{}, f func(*datastore.Key) (*boomCollection, error)) error { |
| + key := newKeyObj(ns, knr, dst) |
| + if !KeyValid(ns, key, AllowSpecialKeys) { |
| + return datastore.ErrInvalidKey |
| + } |
| + |
| + ents, err := f(key) |
| + if err != nil { |
| + return err |
| + } |
| + if ents == nil { |
| + return datastore.ErrNoSuchEntity |
| + } |
| + pdata := ents.Get(keyBytes(noNS, key)) |
| + if pdata == nil { |
| + return datastore.ErrNoSuchEntity |
| + } |
| + pl := &propertyList{} |
| + err = pl.UnmarshalBinary(pdata) |
| + if err != nil { |
| + return err |
| + } |
| + return fromPL(pl, dst) |
| +} |
| + |
| +func (d *dataStoreData) get(ns string, dst interface{}) error { |
| + return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*boomCollection, error) { |
| + d.RLock() |
| + defer d.RUnlock() |
| + return d.store.Snapshot().GetCollection("ents:" + ns), nil |
| + }) |
| +} |
| + |
| +func (d *dataStoreData) del(ns string, key *datastore.Key) error { |
| + if !KeyValid(ns, key, UserKeyOnly) { |
| + return datastore.ErrInvalidKey |
| + } |
| + |
| + keyBuf := keyBytes(noNS, key) |
| + |
| + d.Lock() |
| + defer d.Unlock() |
| + |
| + ents := d.store.GetCollection("ents:" + ns) |
| + if ents == nil { |
| + return nil |
| + } |
| + |
| + _, err := plusPlusLocked(ents, groupMetaKey(key)) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + old := ents.Get(keyBuf) |
| + oldPl := (*propertyList)(nil) |
| + if old != nil { |
| + oldPl = &propertyList{} |
| + err = oldPl.UnmarshalBinary(old) |
| + if err != nil { |
| + return err |
| + } |
| + } |
| + |
| + ents.Delete(keyBuf) |
| + return nil |
| +} |
| + |
| +///////////////////////// memContextObj(dataStoreData) ///////////////////////// |
| + |
| +func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { |
| + // TODO(riannucci): implement with Flush/FlushRevert for persistance. |
| + |
| + txn := obj.(*txnDataStoreData) |
| + for rk, muts := range txn.muts { |
| + if len(muts) == 0 { // read-only |
| + continue |
| + } |
| + k, err := keyFromByteString(withNS, rk) |
| + if err != nil { |
| + panic(err) |
| + } |
| + entKey := "ents:" + k.Namespace() |
| + mkey := groupMetaKey(k) |
| + entsHead := d.store.GetCollection(entKey) |
| + entsSnap := txn.snap.GetCollection(entKey) |
| + vHead, err := curval(entsHead, mkey) |
| + if err != nil { |
| + panic(err) |
| + } |
| + vSnap, err := curval(entsSnap, mkey) |
| + if err != nil { |
| + panic(err) |
| + } |
| + if vHead != vSnap { |
| + return false |
| + } |
| + } |
| + return true |
| +} |
| + |
| +func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) { |
| + txn := obj.(*txnDataStoreData) |
| + for _, muts := range txn.muts { |
| + if len(muts) == 0 { // read-only |
| + continue |
| + } |
| + for _, m := range muts { |
| + var err error |
| + if m.data == nil { |
| + err = d.del(m.key.Namespace(), m.key) |
| + } else { |
| + _, err = d.putInner(m.key, m.data) |
| + } |
| + if err != nil { |
| + panic(err) |
| + } |
| + } |
| + } |
| +} |
| + |
| +func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, error) { |
| + return &txnDataStoreData{ |
| + BrokenFeatures: d.BrokenFeatures, |
| + parent: d, |
| + knrKeeper: newKnrKeeper(d.KindNameResolver()), |
| + isXG: o != nil && o.XG, |
| + snap: d.store.Snapshot(), |
| + muts: map[string][]txnMutation{}, |
| + }, nil |
| +} |
| + |
| +func (d *dataStoreData) endTxn() {} |
| + |
| +/////////////////////////////// txnDataStoreData /////////////////////////////// |
| +type txnMutation struct { |
| + key *datastore.Key |
| + data *propertyList |
| +} |
| + |
| +type txnDataStoreData struct { |
| + *wrapper.BrokenFeatures |
| + *knrKeeper |
| + sync.Mutex |
| + |
| + parent *dataStoreData |
| + |
| + // boolean 0 or 1, use atomic.*Int32 to access. |
| + closed int32 |
| + isXG bool |
| + |
| + snap *boomStore |
| + |
| + // string is the raw-bytes encoding of the entity root incl. namespace |
| + muts map[string][]txnMutation |
| + // TODO(riannucci): account for 'transaction size' limit of 10MB by summing |
| + // length of encoded keys + values. |
| +} |
| + |
| +const xgEGLimit = 25 |
| + |
| +/////////////////////// memContextObj(txnDataStoreData) //////////////////////// |
| + |
| +func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false } |
| +func (td *txnDataStoreData) endTxn() { |
| + if atomic.LoadInt32(&td.closed) == 1 { |
| + panic("cannot end transaction twice") |
| + } |
| + atomic.StoreInt32(&td.closed, 1) |
| +} |
| +func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) { |
| + panic("txnDataStoreData cannot apply transactions") |
| +} |
| +func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, error) { |
| + return nil, errors.New("datastore: nested transactions are not supported") |
| +} |
| + |
| +/////////////////// wrapper.BrokenFeatures(txnDataStoreData) /////////////////// |
| + |
| +func (td *txnDataStoreData) IsBroken() error { |
| + // Slightly different from the SDK... datastore and taskqueue each implement |
| + // this here, where in the SDK only datastore.transaction.Call does. |
| + if atomic.LoadInt32(&td.closed) == 1 { |
| + return errors.New("datastore: transaction context has expired") |
| + } |
| + return td.parent.IsBroken() |
| +} |
| + |
| +// writeMutation ensures that this transaction can support the given key/value |
| +// mutation. |
| +// |
| +// if getOnly is true, don't record the actual mutation data, just ensure that |
| +// the key is in an included entity group (or add an empty entry for that |
| +// group). |
| +// |
| +// if !getOnly && data == nil, this counts as a deletion instead of a Put. |
| +// |
| +// will return an error if we're crossing too many entity groups. |
| +func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error { |
| + rk := string(keyBytes(withNS, rootKey(key))) |
| + |
| + td.Lock() |
| + defer td.Unlock() |
| + |
| + if _, ok := td.muts[rk]; !ok { |
| + limit := 1 |
| + if td.isXG { |
| + limit = xgEGLimit |
| + } |
| + if len(td.muts)+1 > limit { |
| + msg := "cross-group transaction need to be explicitly specified (xg=True)" |
| + if td.isXG { |
| + msg = "operating on too many entity groups in a single transaction" |
| + } |
| + return newDSError(pb.Error_BAD_REQUEST, msg) |
| + } |
| + td.muts[rk] = []txnMutation{} |
| + } |
| + if !getOnly { |
| + td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) |
| + } |
| + |
| + return nil |
| +} |
| + |
| +func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, error) { |
| + key, plData, err := putPrelim(ns, td.KindNameResolver(), src) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + key, err = func(key *datastore.Key) (*datastore.Key, error) { |
| + td.parent.Lock() |
| + defer td.parent.Unlock() |
| + _, key, err := td.parent.entsKeyLocked(key) |
| + return key, err |
| + }(key) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + err = td.writeMutation(false, key, plData) |
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + return key, setStructKey(src, key, td.KindNameResolver()) |
| +} |
| + |
| +func (td *txnDataStoreData) get(ns string, dst interface{}) error { |
| + return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*boomCollection, error) { |
| + err := td.writeMutation(true, key, nil) |
| + if err != nil { |
| + return nil, err |
| + } |
| + return td.snap.GetCollection("ents:" + ns), nil |
| + }) |
| +} |
| + |
| +func (td *txnDataStoreData) del(ns string, key *datastore.Key) error { |
| + if !KeyValid(ns, key, UserKeyOnly) { |
| + return datastore.ErrInvalidKey |
| + } |
| + |
| + return td.writeMutation(false, key, nil) |
| +} |