| Index: filter/txnBuf/state.go
|
| diff --git a/filter/txnBuf/state.go b/filter/txnBuf/state.go
|
| index 080a92adc65a740b167f92a2f2a6f2785ecef871..46762945225d61ec1a2bcc2534f25cd75cf23147 100644
|
| --- a/filter/txnBuf/state.go
|
| +++ b/filter/txnBuf/state.go
|
| @@ -94,15 +94,14 @@ type txnBufState struct {
|
| // encoded key -> size of entity. A size of 0 means that the entity is
|
| // deleted.
|
| entState *sizeTracker
|
| - memDS datastore.RawInterface
|
| + bufDS datastore.RawInterface
|
|
|
| roots stringset.Set
|
| rootLimit int
|
|
|
| - aid string
|
| - ns string
|
| - parentDS datastore.RawInterface
|
| - parentState *txnBufState
|
| + aid string
|
| + ns string
|
| + parentDS datastore.RawInterface
|
|
|
| // sizeBudget is the number of bytes that this transaction has to operate
|
| // within. It's only used when attempting to apply() the transaction, and
|
| @@ -111,18 +110,6 @@ type txnBufState struct {
|
| // a negative delta if the parent transaction had many large entities which
|
| // the inner transaction deleted.
|
| sizeBudget int64
|
| -
|
| - // siblingLock is to prevent two nested transactions from running at the same
|
| - // time.
|
| - //
|
| - // Example:
|
| - // RunInTransaction() { // root
|
| - // RunInTransaction() // A
|
| - // RunInTransaction() // B
|
| - // }
|
| - //
|
| - // This will prevent A and B from running simulatneously.
|
| - siblingLock sync.Mutex
|
| }
|
|
|
| func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datastore.TransactionOptions) error {
|
| @@ -137,9 +124,6 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
|
| }
|
| sizeBudget := DefaultSizeBudget
|
| if parentState != nil {
|
| - parentState.siblingLock.Lock()
|
| - defer parentState.siblingLock.Unlock()
|
| -
|
| // TODO(riannucci): this is a bit wonky since it means that a child
|
| // transaction declaring XG=true will only get to modify 25 groups IF
|
| // they're same groups affected by the parent transactions. So instead of
|
| @@ -154,26 +138,38 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
|
| }
|
| }
|
|
|
| - memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
|
| + bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
|
| if err != nil {
|
| return err
|
| }
|
|
|
| state := &txnBufState{
|
| - entState: &sizeTracker{},
|
| - memDS: memDS.Raw(),
|
| - roots: roots,
|
| - rootLimit: rootLimit,
|
| - ns: ns,
|
| - aid: inf.AppID(),
|
| - parentDS: datastore.Get(ctx).Raw(),
|
| - parentState: parentState,
|
| - sizeBudget: sizeBudget,
|
| + entState: &sizeTracker{},
|
| + bufDS: bufDS.Raw(),
|
| + roots: roots,
|
| + rootLimit: rootLimit,
|
| + ns: ns,
|
| + aid: inf.AppID(),
|
| + parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLock, true)).Raw(),
|
| + sizeBudget: sizeBudget,
|
| }
|
| if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil {
|
| return err
|
| }
|
| - return state.apply()
|
| +
|
| + // no reason to unlock this ever. At this point it's toast.
|
| + state.Lock()
|
| +
|
| + if parentState == nil {
|
| + return commitToReal(state)
|
| + }
|
| +
|
| + if err = parentState.canApplyLocked(state); err != nil {
|
| + return err
|
| + }
|
| +
|
| + parentState.commitLocked(state)
|
| + return nil
|
| }
|
|
|
| // item is a temporary object for representing key/entity pairs and their cache
|
| @@ -236,140 +232,200 @@ func (t *txnBufState) updateRootsLocked(roots stringset.Set) error {
|
| return nil
|
| }
|
|
|
| -func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) {
|
| +func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaGetter, cb datastore.GetMultiCB, haveLock bool) error {
|
| encKeys, roots := toEncoded(keys)
|
| - ret := make([]item, len(keys))
|
| + data := make([]item, len(keys))
|
|
|
| idxMap := []int(nil)
|
| toGetKeys := []*datastore.Key(nil)
|
|
|
| - t.Lock()
|
| - defer t.Unlock()
|
| + lme := errors.NewLazyMultiError(len(keys))
|
| + err := func() error {
|
| + if !haveLock {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + }
|
|
|
| - if err := t.updateRootsLocked(roots); err != nil {
|
| - return nil, err
|
| - }
|
| + if err := t.updateRootsLocked(roots); err != nil {
|
| + return err
|
| + }
|
|
|
| - for i, key := range keys {
|
| - ret[i].key = key
|
| - ret[i].encKey = encKeys[i]
|
| - if size, ok := t.entState.get(ret[i].getEncKey()); ok {
|
| - ret[i].buffered = true
|
| - if size > 0 {
|
| - idxMap = append(idxMap, i)
|
| - toGetKeys = append(toGetKeys, key)
|
| + for i, key := range keys {
|
| + data[i].key = key
|
| + data[i].encKey = encKeys[i]
|
| + if size, ok := t.entState.get(data[i].getEncKey()); ok {
|
| + data[i].buffered = true
|
| + if size > 0 {
|
| + idxMap = append(idxMap, i)
|
| + toGetKeys = append(toGetKeys, key)
|
| + }
|
| }
|
| }
|
| - }
|
| -
|
| - if len(toGetKeys) > 0 {
|
| - j := 0
|
| - t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
|
| - impossible(err)
|
| - ret[idxMap[j]].data = pm
|
| - j++
|
| - })
|
| - }
|
|
|
| - return ret, nil
|
| -}
|
| + if len(toGetKeys) > 0 {
|
| + j := 0
|
| + t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
|
| + impossible(err)
|
| + data[idxMap[j]].data = pm
|
| + j++
|
| + })
|
| + }
|
|
|
| -func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
|
| - encKeys, roots := toEncoded(keys)
|
| + idxMap = nil
|
| + getKeys := []*datastore.Key(nil)
|
| + getMetas := datastore.MultiMetaGetter(nil)
|
|
|
| - t.Lock()
|
| - defer t.Unlock()
|
| + for i, itm := range data {
|
| + if !itm.buffered {
|
| + idxMap = append(idxMap, i)
|
| + getKeys = append(getKeys, itm.key)
|
| + getMetas = append(getMetas, metas.GetSingle(i))
|
| + }
|
| + }
|
|
|
| - if err := t.updateRootsLocked(roots); err != nil {
|
| + if len(idxMap) > 0 {
|
| + j := 0
|
| + err := t.parentDS.GetMulti(getKeys, getMetas, func(pm datastore.PropertyMap, err error) {
|
| + if err != datastore.ErrNoSuchEntity {
|
| + i := idxMap[j]
|
| + if !lme.Assign(i, err) {
|
| + data[i].data = pm
|
| + }
|
| + }
|
| + j++
|
| + })
|
| + if err != nil {
|
| + return err
|
| + }
|
| + }
|
| + return nil
|
| + }()
|
| + if err != nil {
|
| return err
|
| }
|
|
|
| - i := 0
|
| - err := t.memDS.DeleteMulti(keys, func(err error) {
|
| - impossible(err)
|
| - t.entState.set(encKeys[i], 0)
|
| - i++
|
| - })
|
| - impossible(err)
|
| + for i, itm := range data {
|
| + err := lme.GetOne(i)
|
| + if err != nil {
|
| + cb(nil, err)
|
| + } else if itm.data == nil {
|
| + cb(nil, datastore.ErrNoSuchEntity)
|
| + } else {
|
| + cb(itm.data, nil)
|
| + }
|
| + }
|
| return nil
|
| }
|
|
|
| -func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap) error {
|
| +func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMultiCB, haveLock bool) error {
|
| encKeys, roots := toEncoded(keys)
|
|
|
| - t.Lock()
|
| - defer t.Unlock()
|
| + err := func() error {
|
| + if !haveLock {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + }
|
| +
|
| + if err := t.updateRootsLocked(roots); err != nil {
|
| + return err
|
| + }
|
|
|
| - if err := t.updateRootsLocked(roots); err != nil {
|
| + i := 0
|
| + err := t.bufDS.DeleteMulti(keys, func(err error) {
|
| + impossible(err)
|
| + t.entState.set(encKeys[i], 0)
|
| + i++
|
| + })
|
| + impossible(err)
|
| + return nil
|
| + }()
|
| + if err != nil {
|
| return err
|
| }
|
|
|
| - i := 0
|
| - err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
|
| - impossible(err)
|
| - t.entState.set(encKeys[i], vals[i].EstimateSize())
|
| - i++
|
| - })
|
| - impossible(err)
|
| + for range keys {
|
| + cb(nil)
|
| + }
|
| +
|
| return nil
|
| }
|
|
|
| -// apply actually takes the buffered transaction and applies it to the parent
|
| -// transaction. It will only return an error if the underlying 'real' datastore
|
| -// returns an error on PutMulti or DeleteMulti.
|
| -func (t *txnBufState) apply() error {
|
| - t.Lock()
|
| - defer t.Unlock()
|
| -
|
| - // if parentState is nil... just try to commit this anyway. The estimates
|
| - // we're using here are just educated guesses. If it fits for real, then
|
| - // hooray. If not, then the underlying datastore will error.
|
| - if t.parentState != nil {
|
| - t.parentState.Lock()
|
| - proposedState := t.parentState.entState.dup()
|
| - t.parentState.Unlock()
|
| - for k, v := range t.entState.keyToSize {
|
| - proposedState.set(k, v)
|
| - }
|
| - if proposedState.total > t.sizeBudget {
|
| - return ErrTransactionTooLarge
|
| +func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) {
|
| + lme := errors.NewLazyMultiError(len(keys))
|
| + realKeys := []*datastore.Key(nil)
|
| + for i, key := range keys {
|
| + if key.Incomplete() {
|
| + // intentionally call AllocateIDs without lock.
|
| + start, err := t.parentDS.AllocateIDs(key, 1)
|
| + if !lme.Assign(i, err) {
|
| + if realKeys == nil {
|
| + realKeys = make([]*datastore.Key, len(keys))
|
| + copy(realKeys, keys)
|
| + }
|
| +
|
| + aid, ns, toks := key.Split()
|
| + toks[len(toks)-1].IntID = start
|
| + realKeys[i] = datastore.NewKeyToks(aid, ns, toks)
|
| + }
|
| }
|
| }
|
| + err := lme.Get()
|
|
|
| - toPutKeys := []*datastore.Key(nil)
|
| - toPut := []datastore.PropertyMap(nil)
|
| - toDel := []*datastore.Key(nil)
|
| + if realKeys != nil {
|
| + return realKeys, err
|
| + }
|
| + return keys, err
|
| +}
|
|
|
| - // need to pull all items out of the in-memory datastore. Fortunately we have
|
| - // kindless queries, and we disabled all the special entities, so just
|
| - // run a kindless query without any filters and it will return all data
|
| - // currently in memDS :).
|
| - fq, err := datastore.NewQuery("").Finalize()
|
| - impossible(err)
|
| +func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap, cb datastore.PutMultiCB, haveLock bool) error {
|
| + keys, err := t.fixKeys(keys)
|
| + if err != nil {
|
| + for _, e := range err.(errors.MultiError) {
|
| + cb(nil, e)
|
| + }
|
| + return nil
|
| + }
|
|
|
| - err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) bool {
|
| - toPutKeys = append(toPutKeys, key)
|
| - toPut = append(toPut, data)
|
| - return true
|
| - })
|
| - memoryCorruption(err)
|
| + encKeys, roots := toEncoded(keys)
|
|
|
| - for keyStr, size := range t.entState.keyToSize {
|
| - if size == 0 {
|
| - k, err := serialize.ReadKey(bytes.NewBufferString(keyStr), serialize.WithoutContext, t.aid, t.ns)
|
| - memoryCorruption(err)
|
| - toDel = append(toDel, k)
|
| + err = func() error {
|
| + if !haveLock {
|
| + t.Lock()
|
| + defer t.Unlock()
|
| + }
|
| +
|
| + if err := t.updateRootsLocked(roots); err != nil {
|
| + return err
|
| }
|
| +
|
| + i := 0
|
| + err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
|
| + impossible(err)
|
| + t.entState.set(encKeys[i], vals[i].EstimateSize())
|
| + i++
|
| + })
|
| + impossible(err)
|
| + return nil
|
| + }()
|
| + if err != nil {
|
| + return err
|
| }
|
|
|
| - ds := t.parentDS
|
| + for _, k := range keys {
|
| + cb(k, nil)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func commitToReal(s *txnBufState) error {
|
| + toPut, toPutKeys, toDel := s.effect()
|
|
|
| return parallel.FanOutIn(func(ch chan<- func() error) {
|
| if len(toPut) > 0 {
|
| ch <- func() error {
|
| mErr := errors.NewLazyMultiError(len(toPut))
|
| i := 0
|
| - err := ds.PutMulti(toPutKeys, toPut, func(_ *datastore.Key, err error) {
|
| + err := s.parentDS.PutMulti(toPutKeys, toPut, func(_ *datastore.Key, err error) {
|
| mErr.Assign(i, err)
|
| i++
|
| })
|
| @@ -383,7 +439,7 @@ func (t *txnBufState) apply() error {
|
| ch <- func() error {
|
| mErr := errors.NewLazyMultiError(len(toDel))
|
| i := 0
|
| - err := ds.DeleteMulti(toDel, func(err error) {
|
| + err := s.parentDS.DeleteMulti(toDel, func(err error) {
|
| mErr.Assign(i, err)
|
| i++
|
| })
|
| @@ -396,6 +452,59 @@ func (t *txnBufState) apply() error {
|
| })
|
| }
|
|
|
| +func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) {
|
| + // TODO(riannucci): preallocate return slices
|
| +
|
| + // need to pull all items out of the in-memory datastore. Fortunately we have
|
| + // kindless queries, and we disabled all the special entities, so just
|
| + // run a kindless query without any filters and it will return all data
|
| + // currently in bufDS :).
|
| + fq, err := datastore.NewQuery("").Finalize()
|
| + impossible(err)
|
| +
|
| + err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) bool {
|
| + toPutKeys = append(toPutKeys, key)
|
| + toPut = append(toPut, data)
|
| + return true
|
| + })
|
| + memoryCorruption(err)
|
| +
|
| + for keyStr, size := range t.entState.keyToSize {
|
| + if size == 0 {
|
| + k, err := serialize.ReadKey(bytes.NewBufferString(keyStr), serialize.WithoutContext, t.aid, t.ns)
|
| + memoryCorruption(err)
|
| + toDel = append(toDel, k)
|
| + }
|
| + }
|
| +
|
| + return
|
| +}
|
| +
|
| +func (t *txnBufState) canApplyLocked(s *txnBufState) error {
|
| + proposedState := t.entState.dup()
|
| +
|
| + for k, v := range s.entState.keyToSize {
|
| + proposedState.set(k, v)
|
| + }
|
| + if proposedState.total > s.sizeBudget {
|
| + return ErrTransactionTooLarge
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func (t *txnBufState) commitLocked(s *txnBufState) {
|
| + toPut, toPutKeys, toDel := s.effect()
|
| +
|
| + if len(toPut) > 0 {
|
| + impossible(t.putMulti(toPutKeys, toPut,
|
| + func(_ *datastore.Key, err error) { impossible(err) }, true))
|
| + }
|
| +
|
| + if len(toDel) > 0 {
|
| + impossible(t.deleteMulti(toDel, impossible, true))
|
| + }
|
| +}
|
| +
|
| // toEncoded returns a list of all of the serialized versions of these keys,
|
| // plus a stringset of all the encoded root keys that `keys` represents.
|
| func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
|
|
|