| Index: impl/memory/context.go
|
| diff --git a/impl/memory/context.go b/impl/memory/context.go
|
| index bb784ec540c1603c2194a1f5da2f36e42106285a..8859ebb15c491f7ef2372158f0c2fbd1ed095633 100644
|
| --- a/impl/memory/context.go
|
| +++ b/impl/memory/context.go
|
| @@ -135,28 +135,43 @@ var memContextKey memContextKeyType
|
| // test-access API for TaskQueue better (instead of trying to reconstitute the
|
| // state of the task queue from a bunch of datastore accesses).
|
| func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.TransactionOptions) error {
|
| - curMC := cur(d.c)
|
| + // Keep in separate function for defers.
|
| + loopBody := func(applyForReal bool) error {
|
| + curMC := cur(d.c)
|
|
|
| - txnMC := curMC.mkTxn(o)
|
| + txnMC := curMC.mkTxn(o)
|
| +
|
| + defer func() {
|
| + txnMC.Lock()
|
| + defer txnMC.Unlock()
|
| +
|
| + txnMC.endTxn()
|
| + }()
|
| +
|
| + if err := f(context.WithValue(d.c, memContextKey, txnMC)); err != nil {
|
| + return err
|
| + }
|
|
|
| - defer func() {
|
| txnMC.Lock()
|
| defer txnMC.Unlock()
|
|
|
| - txnMC.endTxn()
|
| - }()
|
| -
|
| - if err := f(context.WithValue(d.c, memContextKey, txnMC)); err != nil {
|
| - return err
|
| + if applyForReal && curMC.canApplyTxn(txnMC) {
|
| + curMC.applyTxn(d.c, txnMC)
|
| + } else {
|
| + return ds.ErrConcurrentTransaction
|
| + }
|
| + return nil
|
| }
|
|
|
| - txnMC.Lock()
|
| - defer txnMC.Unlock()
|
| -
|
| - if curMC.canApplyTxn(txnMC) {
|
| - curMC.applyTxn(d.c, txnMC)
|
| - } else {
|
| - return ds.ErrConcurrentTransaction
|
| + // From GAE docs for TransactionOptions: "If omitted, it defaults to 3."
|
| + attempts := 3
|
| + if o != nil && o.Attempts != 0 {
|
| + attempts = o.Attempts
|
| + }
|
| + for attempt := 0; attempt < attempts; attempt++ {
|
| + if err := loopBody(attempt >= d.txnFakeRetry); err != ds.ErrConcurrentTransaction {
|
| + return err
|
| + }
|
| }
|
| - return nil
|
| + return ds.ErrConcurrentTransaction
|
| }
|
|
|