Index: filter/txnBuf/state.go |
diff --git a/filter/txnBuf/state.go b/filter/txnBuf/state.go |
index 080a92adc65a740b167f92a2f2a6f2785ecef871..68c207b7bd89178916588a8124280b8165c2e786 100644 |
--- a/filter/txnBuf/state.go |
+++ b/filter/txnBuf/state.go |
@@ -94,15 +94,14 @@ type txnBufState struct { |
// encoded key -> size of entity. A size of 0 means that the entity is |
// deleted. |
entState *sizeTracker |
- memDS datastore.RawInterface |
+ bufDS datastore.RawInterface |
roots stringset.Set |
rootLimit int |
- aid string |
- ns string |
- parentDS datastore.RawInterface |
- parentState *txnBufState |
+ aid string |
+ ns string |
+ parentDS datastore.RawInterface |
// sizeBudget is the number of bytes that this transaction has to operate |
// within. It's only used when attempting to apply() the transaction, and |
@@ -111,18 +110,6 @@ type txnBufState struct { |
// a negative delta if the parent transaction had many large entities which |
// the inner transaction deleted. |
sizeBudget int64 |
- |
- // siblingLock is to prevent two nested transactions from running at the same |
- // time. |
- // |
- // Example: |
- // RunInTransaction() { // root |
- // RunInTransaction() // A |
- // RunInTransaction() // B |
- // } |
- // |
- // This will prevent A and B from running simulatneously. |
- siblingLock sync.Mutex |
} |
func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datastore.TransactionOptions) error { |
@@ -137,9 +124,6 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas |
} |
sizeBudget := DefaultSizeBudget |
if parentState != nil { |
- parentState.siblingLock.Lock() |
- defer parentState.siblingLock.Unlock() |
- |
// TODO(riannucci): this is a bit wonky since it means that a child |
// transaction declaring XG=true will only get to modify 25 groups IF |
// they're same groups affected by the parent transactions. So instead of |
@@ -154,26 +138,38 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas |
} |
} |
- memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) |
+ bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) |
if err != nil { |
return err |
} |
state := &txnBufState{ |
- entState: &sizeTracker{}, |
- memDS: memDS.Raw(), |
- roots: roots, |
- rootLimit: rootLimit, |
- ns: ns, |
- aid: inf.AppID(), |
- parentDS: datastore.Get(ctx).Raw(), |
- parentState: parentState, |
- sizeBudget: sizeBudget, |
+ entState: &sizeTracker{}, |
+ bufDS: bufDS.Raw(), |
+ roots: roots, |
+ rootLimit: rootLimit, |
+ ns: ns, |
+ aid: inf.AppID(), |
+ parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLock, true)).Raw(), |
+ sizeBudget: sizeBudget, |
} |
if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { |
return err |
} |
- return state.apply() |
+ |
+ // no reason to unlock this ever. At this point it's toast. |
+ state.Lock() |
+ |
+ if parentState == nil { |
+ return commitToReal(state) |
+ } |
+ |
+ if err = parentState.canApply(state); err != nil { |
+ return err |
+ } |
+ |
+ parentState.commit(state) |
+ return nil |
} |
// item is a temporary object for representing key/entity pairs and their cache |
@@ -236,140 +232,200 @@ func (t *txnBufState) updateRootsLocked(roots stringset.Set) error { |
return nil |
} |
-func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { |
+func (t *txnBufState) getMulti(keys []*datastore.Key, metas datastore.MultiMetaGetter, cb datastore.GetMultiCB, haveLock bool) error { |
encKeys, roots := toEncoded(keys) |
- ret := make([]item, len(keys)) |
+ data := make([]item, len(keys)) |
idxMap := []int(nil) |
toGetKeys := []*datastore.Key(nil) |
- t.Lock() |
- defer t.Unlock() |
+ lme := errors.NewLazyMultiError(len(keys)) |
+ err := func() error { |
+ if !haveLock { |
+ t.Lock() |
+ defer t.Unlock() |
+ } |
- if err := t.updateRootsLocked(roots); err != nil { |
- return nil, err |
- } |
+ if err := t.updateRootsLocked(roots); err != nil { |
+ return err |
+ } |
- for i, key := range keys { |
- ret[i].key = key |
- ret[i].encKey = encKeys[i] |
- if size, ok := t.entState.get(ret[i].getEncKey()); ok { |
- ret[i].buffered = true |
- if size > 0 { |
- idxMap = append(idxMap, i) |
- toGetKeys = append(toGetKeys, key) |
+ for i, key := range keys { |
+ data[i].key = key |
+ data[i].encKey = encKeys[i] |
+ if size, ok := t.entState.get(data[i].getEncKey()); ok { |
+ data[i].buffered = true |
+ if size > 0 { |
+ idxMap = append(idxMap, i) |
+ toGetKeys = append(toGetKeys, key) |
+ } |
} |
} |
- } |
- |
- if len(toGetKeys) > 0 { |
- j := 0 |
- t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) { |
- impossible(err) |
- ret[idxMap[j]].data = pm |
- j++ |
- }) |
- } |
- return ret, nil |
-} |
+ if len(toGetKeys) > 0 { |
+ j := 0 |
+ t.bufDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) { |
+ impossible(err) |
+ data[idxMap[j]].data = pm |
+ j++ |
+ }) |
+ } |
-func (t *txnBufState) deleteMulti(keys []*datastore.Key) error { |
- encKeys, roots := toEncoded(keys) |
+ idxMap = nil |
+ getKeys := []*datastore.Key(nil) |
+ getMetas := datastore.MultiMetaGetter(nil) |
- t.Lock() |
- defer t.Unlock() |
+ for i, itm := range data { |
+ if !itm.buffered { |
+ idxMap = append(idxMap, i) |
+ getKeys = append(getKeys, itm.key) |
+ getMetas = append(getMetas, metas.GetSingle(i)) |
+ } |
+ } |
- if err := t.updateRootsLocked(roots); err != nil { |
+ if len(idxMap) > 0 { |
+ j := 0 |
+ err := t.parentDS.GetMulti(getKeys, getMetas, func(pm datastore.PropertyMap, err error) { |
+ if err != datastore.ErrNoSuchEntity { |
+ i := idxMap[j] |
+ if !lme.Assign(i, err) { |
+ data[i].data = pm |
+ } |
+ } |
+ j++ |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ } |
+ return nil |
+ }() |
+ if err != nil { |
return err |
} |
- i := 0 |
- err := t.memDS.DeleteMulti(keys, func(err error) { |
- impossible(err) |
- t.entState.set(encKeys[i], 0) |
- i++ |
- }) |
- impossible(err) |
+ for i, itm := range data { |
+ err := lme.GetOne(i) |
+ if err != nil { |
+ cb(nil, err) |
+ } else if itm.data == nil { |
+ cb(nil, datastore.ErrNoSuchEntity) |
+ } else { |
+ cb(itm.data, nil) |
+ } |
+ } |
return nil |
} |
-func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap) error { |
+func (t *txnBufState) deleteMulti(keys []*datastore.Key, cb datastore.DeleteMultiCB, haveLock bool) error { |
encKeys, roots := toEncoded(keys) |
- t.Lock() |
- defer t.Unlock() |
+ err := func() error { |
+ if !haveLock { |
+ t.Lock() |
+ defer t.Unlock() |
+ } |
+ |
+ if err := t.updateRootsLocked(roots); err != nil { |
+ return err |
+ } |
- if err := t.updateRootsLocked(roots); err != nil { |
+ i := 0 |
+ err := t.bufDS.DeleteMulti(keys, func(err error) { |
+ impossible(err) |
+ t.entState.set(encKeys[i], 0) |
+ i++ |
+ }) |
+ impossible(err) |
+ return nil |
+ }() |
+ if err != nil { |
return err |
} |
- i := 0 |
- err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { |
- impossible(err) |
- t.entState.set(encKeys[i], vals[i].EstimateSize()) |
- i++ |
- }) |
- impossible(err) |
+ for range keys { |
+ cb(nil) |
+ } |
+ |
return nil |
} |
-// apply actually takes the buffered transaction and applies it to the parent |
-// transaction. It will only return an error if the underlying 'real' datastore |
-// returns an error on PutMulti or DeleteMulti. |
-func (t *txnBufState) apply() error { |
- t.Lock() |
- defer t.Unlock() |
- |
- // if parentState is nil... just try to commit this anyway. The estimates |
- // we're using here are just educated guesses. If it fits for real, then |
- // hooray. If not, then the underlying datastore will error. |
- if t.parentState != nil { |
- t.parentState.Lock() |
- proposedState := t.parentState.entState.dup() |
- t.parentState.Unlock() |
- for k, v := range t.entState.keyToSize { |
- proposedState.set(k, v) |
- } |
- if proposedState.total > t.sizeBudget { |
- return ErrTransactionTooLarge |
+func (t *txnBufState) fixKeys(keys []*datastore.Key) ([]*datastore.Key, error) { |
+ lme := errors.NewLazyMultiError(len(keys)) |
+ realKeys := []*datastore.Key(nil) |
+ for i, key := range keys { |
+ if key.Incomplete() { |
+ // intentionally call AllocateIDs without lock. |
+ start, err := t.parentDS.AllocateIDs(key, 1) |
+ if !lme.Assign(i, err) { |
+ if realKeys == nil { |
+ realKeys = make([]*datastore.Key, len(keys)) |
+ copy(realKeys, keys) |
+ } |
+ |
+ aid, ns, toks := key.Split() |
+ toks[len(toks)-1].IntID = start |
+ realKeys[i] = datastore.NewKeyToks(aid, ns, toks) |
+ } |
} |
} |
+ err := lme.Get() |
- toPutKeys := []*datastore.Key(nil) |
- toPut := []datastore.PropertyMap(nil) |
- toDel := []*datastore.Key(nil) |
+ if realKeys != nil { |
+ return realKeys, err |
+ } |
+ return keys, err |
+} |
- // need to pull all items out of the in-memory datastore. Fortunately we have |
- // kindless queries, and we disabled all the special entities, so just |
- // run a kindless query without any filters and it will return all data |
- // currently in memDS :). |
- fq, err := datastore.NewQuery("").Finalize() |
- impossible(err) |
+func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyMap, cb datastore.PutMultiCB, haveLock bool) error { |
+ keys, err := t.fixKeys(keys) |
+ if err != nil { |
+ for _, e := range err.(errors.MultiError) { |
+ cb(nil, e) |
+ } |
+ return nil |
+ } |
- err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) bool { |
- toPutKeys = append(toPutKeys, key) |
- toPut = append(toPut, data) |
- return true |
- }) |
- memoryCorruption(err) |
+ encKeys, roots := toEncoded(keys) |
- for keyStr, size := range t.entState.keyToSize { |
- if size == 0 { |
- k, err := serialize.ReadKey(bytes.NewBufferString(keyStr), serialize.WithoutContext, t.aid, t.ns) |
- memoryCorruption(err) |
- toDel = append(toDel, k) |
+ err = func() error { |
+ if !haveLock { |
+ t.Lock() |
+ defer t.Unlock() |
+ } |
+ |
+ if err := t.updateRootsLocked(roots); err != nil { |
+ return err |
} |
+ |
+ i := 0 |
+ err := t.bufDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { |
+ impossible(err) |
+ t.entState.set(encKeys[i], vals[i].EstimateSize()) |
+ i++ |
+ }) |
+ impossible(err) |
+ return nil |
+ }() |
+ if err != nil { |
+ return err |
} |
- ds := t.parentDS |
+ for _, k := range keys { |
+ cb(k, nil) |
+ } |
+ return nil |
+} |
+ |
+func commitToReal(s *txnBufState) error { |
+ toPut, toPutKeys, toDel := s.effect() |
return parallel.FanOutIn(func(ch chan<- func() error) { |
if len(toPut) > 0 { |
ch <- func() error { |
mErr := errors.NewLazyMultiError(len(toPut)) |
i := 0 |
- err := ds.PutMulti(toPutKeys, toPut, func(_ *datastore.Key, err error) { |
+ err := s.parentDS.PutMulti(toPutKeys, toPut, func(_ *datastore.Key, err error) { |
mErr.Assign(i, err) |
i++ |
}) |
@@ -383,7 +439,7 @@ func (t *txnBufState) apply() error { |
ch <- func() error { |
mErr := errors.NewLazyMultiError(len(toDel)) |
i := 0 |
- err := ds.DeleteMulti(toDel, func(err error) { |
+ err := s.parentDS.DeleteMulti(toDel, func(err error) { |
mErr.Assign(i, err) |
i++ |
}) |
@@ -396,6 +452,59 @@ func (t *txnBufState) apply() error { |
}) |
} |
+func (t *txnBufState) effect() (toPut []datastore.PropertyMap, toPutKeys, toDel []*datastore.Key) { |
+ // TODO(riannucci): preallocate return slices |
+ |
+ // need to pull all items out of the in-memory datastore. Fortunately we have |
+ // kindless queries, and we disabled all the special entities, so just |
+ // run a kindless query without any filters and it will return all data |
+ // currently in bufDS :). |
+ fq, err := datastore.NewQuery("").Finalize() |
+ impossible(err) |
+ |
+ err = t.bufDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMap, _ datastore.CursorCB) bool { |
+ toPutKeys = append(toPutKeys, key) |
+ toPut = append(toPut, data) |
+ return true |
+ }) |
+ memoryCorruption(err) |
+ |
+ for keyStr, size := range t.entState.keyToSize { |
+ if size == 0 { |
+ k, err := serialize.ReadKey(bytes.NewBufferString(keyStr), serialize.WithoutContext, t.aid, t.ns) |
+ memoryCorruption(err) |
+ toDel = append(toDel, k) |
+ } |
+ } |
+ |
+ return |
+} |
+ |
+func (t *txnBufState) canApply(s *txnBufState) error { |
dnj
2015/11/11 16:08:28
canApplyLocked?
iannucci
2015/11/11 18:06:40
done
|
+ proposedState := t.entState.dup() |
+ |
+ for k, v := range s.entState.keyToSize { |
+ proposedState.set(k, v) |
+ } |
+ if proposedState.total > s.sizeBudget { |
+ return ErrTransactionTooLarge |
+ } |
+ return nil |
+} |
+ |
+func (t *txnBufState) commit(s *txnBufState) { |
dnj
2015/11/11 16:08:28
commitLocked?
iannucci
2015/11/11 18:06:40
done
|
+ toPut, toPutKeys, toDel := s.effect() |
+ |
+ if len(toPut) > 0 { |
+ impossible(t.putMulti(toPutKeys, toPut, |
+ func(_ *datastore.Key, err error) { impossible(err) }, true)) |
+ } |
+ |
+ if len(toDel) > 0 { |
+ impossible(t.deleteMulti(toDel, impossible, true)) |
+ } |
+} |
+ |
// toEncoded returns a list of all of the serialized versions of these keys, |
// plus a stringset of all the encoded root keys that `keys` represents. |
func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |