Index: impl/memory/context.go |
diff --git a/impl/memory/context.go b/impl/memory/context.go |
index bb784ec540c1603c2194a1f5da2f36e42106285a..8859ebb15c491f7ef2372158f0c2fbd1ed095633 100644 |
--- a/impl/memory/context.go |
+++ b/impl/memory/context.go |
@@ -135,28 +135,43 @@ var memContextKey memContextKeyType |
// test-access API for TaskQueue better (instead of trying to reconstitute the |
// state of the task queue from a bunch of datastore accesses). |
func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.TransactionOptions) error { |
- curMC := cur(d.c) |
+ // Keep in separate function for defers. |
+ loopBody := func(applyForReal bool) error { |
+ curMC := cur(d.c) |
- txnMC := curMC.mkTxn(o) |
+ txnMC := curMC.mkTxn(o) |
+ |
+ defer func() { |
+ txnMC.Lock() |
+ defer txnMC.Unlock() |
+ |
+ txnMC.endTxn() |
+ }() |
+ |
+ if err := f(context.WithValue(d.c, memContextKey, txnMC)); err != nil { |
+ return err |
+ } |
- defer func() { |
txnMC.Lock() |
defer txnMC.Unlock() |
- txnMC.endTxn() |
- }() |
- |
- if err := f(context.WithValue(d.c, memContextKey, txnMC)); err != nil { |
- return err |
+ if applyForReal && curMC.canApplyTxn(txnMC) { |
+ curMC.applyTxn(d.c, txnMC) |
+ } else { |
+ return ds.ErrConcurrentTransaction |
+ } |
+ return nil |
} |
- txnMC.Lock() |
- defer txnMC.Unlock() |
- |
- if curMC.canApplyTxn(txnMC) { |
- curMC.applyTxn(d.c, txnMC) |
- } else { |
- return ds.ErrConcurrentTransaction |
+ // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." |
+ attempts := 3 |
+ if o != nil && o.Attempts != 0 { |
+ attempts = o.Attempts |
+ } |
+ for attempt := 0; attempt < attempts; attempt++ { |
+ if err := loopBody(attempt >= d.txnFakeRetry); err != ds.ErrConcurrentTransaction { |
+ return err |
+ } |
} |
- return nil |
+ return ds.ErrConcurrentTransaction |
} |