| Index: filter/txnBuf/state.go
|
| diff --git a/filter/txnBuf/state.go b/filter/txnBuf/state.go
|
| index be2c51370a0883f5fdf57c45815e370b94a9bba7..57286361a0340175bfe3d7d524ff3691bf64dd35 100644
|
| --- a/filter/txnBuf/state.go
|
| +++ b/filter/txnBuf/state.go
|
| @@ -27,11 +27,12 @@ import (
|
| // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits
|
| const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95)
|
|
|
| -// DefaultSizeThreshold prevents the root transaction from getting too close
|
| -// to the budget. If the code attempts to begin a transaction which would have
|
| -// less than this threshold for its budget, the transaction will immediately
|
| -// return ErrTransactionTooLarge.
|
| -const DefaultSizeThreshold = int64(10 * 1000)
|
| +// DefaultWriteCountBudget is the maximum number of entities that can be written
|
| +// in a single call.
|
| +//
|
| +// This is not known to be documented, and has instead been extracted from a
|
| +// datastore error message.
|
| +const DefaultWriteCountBudget = 500
|
|
|
| // XGTransactionGroupLimit is the number of transaction groups to allow in an
|
| // XG transaction.
|
| @@ -76,6 +77,11 @@ func (s *sizeTracker) has(key string) bool {
|
| return has
|
| }
|
|
|
| +// numWrites returns the number of tracked write operations.
|
| +func (s *sizeTracker) numWrites() int {
|
| + return len(s.keyToSize)
|
| +}
|
| +
|
| // dup returns a duplicate sizeTracker.
|
| func (s *sizeTracker) dup() *sizeTracker {
|
| if len(s.keyToSize) == 0 {
|
| @@ -110,6 +116,9 @@ type txnBufState struct {
|
| // a negative delta if the parent transaction had many large entities which
|
| // the inner transaction deleted.
|
| sizeBudget int64
|
| + // countBudget is the number of entity writes that this transaction has to
|
| + // operate in.
|
| + writeCountBudget int
|
| }
|
|
|
| func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datastore.TransactionOptions) error {
|
| @@ -122,7 +131,7 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
|
| if opts != nil && opts.XG {
|
| rootLimit = XGTransactionGroupLimit
|
| }
|
| - sizeBudget := DefaultSizeBudget
|
| + sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudget
|
| if parentState != nil {
|
| // TODO(riannucci): this is a bit wonky since it means that a child
|
| // transaction declaring XG=true will only get to modify 25 groups IF
|
| @@ -133,9 +142,7 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
|
| rootLimit = parentState.rootLimit
|
|
|
| sizeBudget = parentState.sizeBudget - parentState.entState.total
|
| - if sizeBudget < DefaultSizeThreshold {
|
| - return ErrTransactionTooLarge
|
| - }
|
| + writeCountBudget = parentState.writeCountBudget - parentState.entState.numWrites()
|
| }
|
|
|
| bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
|
| @@ -144,14 +151,15 @@ func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
|
| }
|
|
|
| state := &txnBufState{
|
| - entState: &sizeTracker{},
|
| - bufDS: bufDS.Raw(),
|
| - roots: roots,
|
| - rootLimit: rootLimit,
|
| - ns: ns,
|
| - aid: inf.AppID(),
|
| - parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLock, true)).Raw(),
|
| - sizeBudget: sizeBudget,
|
| + entState: &sizeTracker{},
|
| + bufDS: bufDS.Raw(),
|
| + roots: roots,
|
| + rootLimit: rootLimit,
|
| + ns: ns,
|
| + aid: inf.AppID(),
|
| + parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLock, true)).Raw(),
|
| + sizeBudget: sizeBudget,
|
| + writeCountBudget: writeCountBudget,
|
| }
|
| if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil {
|
| return err
|
| @@ -492,9 +500,23 @@ func (t *txnBufState) canApplyLocked(s *txnBufState) error {
|
| for k, v := range s.entState.keyToSize {
|
| proposedState.set(k, v)
|
| }
|
| - if proposedState.total > s.sizeBudget {
|
| + switch {
|
| + case proposedState.numWrites() > t.writeCountBudget:
|
| + // The new net number of writes must be below the parent's write count
|
| + // cutoff.
|
| + fallthrough
|
| +
|
| + case proposedState.total > t.sizeBudget:
|
| + // Make sure our new calculated size is within the parent's size budget.
|
| + //
|
| + // We have:
|
| + // - proposedState.total: The "new world" total bytes were this child
|
| + // transaction committed to the parent.
|
| + // - t.sizeBudget: The maximum number of bytes that this parent can
|
| + // accommodate.
|
| return ErrTransactionTooLarge
|
| }
|
| +
|
| return nil
|
| }
|
|
|
|
|