| Index: impl/memory/datastore_data.go
|
| diff --git a/impl/memory/datastore_data.go b/impl/memory/datastore_data.go
|
| index ad8515ee62919e1cf7a84fe942870127170a4043..ed7e56b0bd5c729577e5a0ae163f37b80633791c 100644
|
| --- a/impl/memory/datastore_data.go
|
| +++ b/impl/memory/datastore_data.go
|
| @@ -9,11 +9,11 @@ import (
|
| "fmt"
|
| "strings"
|
| "sync"
|
| - "sync/atomic"
|
|
|
| ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/gae/service/datastore/serialize"
|
| "github.com/luci/luci-go/common/errors"
|
| +
|
| "golang.org/x/net/context"
|
| )
|
|
|
| @@ -176,15 +176,15 @@ func (d *dataStoreData) namespaces() []string {
|
| /////////////////////////// indexes(dataStoreData) ////////////////////////////
|
|
|
| func groupMetaKey(key *ds.Key) []byte {
|
| - return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()))
|
| + return keyBytes(ds.KeyContext{}.NewKey("__entity_group__", "", 1, key.Root()))
|
| }
|
|
|
| func groupIDsKey(key *ds.Key) []byte {
|
| - return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Root()))
|
| + return keyBytes(ds.KeyContext{}.NewKey("__entity_group_ids__", "", 1, key.Root()))
|
| }
|
|
|
| func rootIDsKey(kind string) []byte {
|
| - return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil))
|
| + return keyBytes(ds.KeyContext{}.NewKey("__entity_root_ids__", kind, 0, nil))
|
| }
|
|
|
| func curVersion(ents memCollection, key []byte) int64 {
|
| @@ -279,7 +279,7 @@ func (d *dataStoreData) fixKeyLocked(ents memCollection, key *ds.Key) (*ds.Key,
|
| if err != nil {
|
| return key, err
|
| }
|
| - key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent())
|
| + key = key.KeyContext().NewKey(key.Kind(), "", id, key.Parent())
|
| }
|
| return key, nil
|
| }
|
| @@ -412,7 +412,7 @@ func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
|
| if len(muts) == 0 { // read-only
|
| continue
|
| }
|
| - prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, "", "")
|
| + prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), serialize.WithContext, ds.KeyContext{})
|
| memoryCorruption(err)
|
|
|
| k := prop.Value().(*ds.Key)
|
| @@ -455,9 +455,11 @@ func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
|
| // alias to the main datastore's so that testing code can have primitive
|
| // access to break features inside of transactions.
|
| parent: d,
|
| - isXG: o != nil && o.XG,
|
| - snap: d.head.Snapshot(),
|
| - muts: map[string][]txnMutation{},
|
| + txn: &transactionImpl{
|
| + isXG: o != nil && o.XG,
|
| + },
|
| + snap: d.head.Snapshot(),
|
| + muts: map[string][]txnMutation{},
|
| }
|
| }
|
|
|
| @@ -475,9 +477,7 @@ type txnDataStoreData struct {
|
|
|
| parent *dataStoreData
|
|
|
| - // boolean 0 or 1, use atomic.*Int32 to access.
|
| - closed int32
|
| - isXG bool
|
| + txn *transactionImpl
|
|
|
| snap memStore
|
|
|
| @@ -492,11 +492,11 @@ var _ memContextObj = (*txnDataStoreData)(nil)
|
| const xgEGLimit = 25
|
|
|
| func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
|
| +
|
| func (td *txnDataStoreData) endTxn() {
|
| - if atomic.LoadInt32(&td.closed) == 1 {
|
| - panic("cannot end transaction twice")
|
| + if err := td.txn.close(); err != nil {
|
| + panic(err)
|
| }
|
| - atomic.StoreInt32(&td.closed, 1)
|
| }
|
| func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
|
| impossible(fmt.Errorf("cannot create a recursive transaction"))
|
| @@ -509,8 +509,8 @@ func (*txnDataStoreData) mkTxn(*ds.TransactionOptions) memContextObj {
|
| func (td *txnDataStoreData) run(f func() error) error {
|
| // Slightly different from the SDK... datastore and taskqueue each implement
|
| // this here, where in the SDK only datastore.transaction.Call does.
|
| - if atomic.LoadInt32(&td.closed) == 1 {
|
| - return errors.New("datastore: transaction context has expired")
|
| + if err := td.txn.valid(); err != nil {
|
| + return err
|
| }
|
| return f()
|
| }
|
| @@ -534,12 +534,12 @@ func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro
|
|
|
| if _, ok := td.muts[rk]; !ok {
|
| limit := 1
|
| - if td.isXG {
|
| + if td.txn.isXG {
|
| limit = xgEGLimit
|
| }
|
| if len(td.muts)+1 > limit {
|
| msg := "cross-group transaction need to be explicitly specified (xg=True)"
|
| - if td.isXG {
|
| + if td.txn.isXG {
|
| msg = "operating on too many entity groups in a single transaction"
|
| }
|
| return errors.New(msg)
|
| @@ -603,7 +603,7 @@ func keyBytes(key *ds.Key) []byte {
|
|
|
| func rpm(data []byte) (ds.PropertyMap, error) {
|
| return serialize.ReadPropertyMap(bytes.NewBuffer(data),
|
| - serialize.WithContext, "", "")
|
| + serialize.WithContext, ds.KeyContext{})
|
| }
|
|
|
| func namespaces(store memStore) []string {
|
|
|