OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "sync" | 9 "sync" |
10 | 10 |
(...skipping 117 matching lines...) Loading... | |
128 // | 128 // |
129 // It really should have been appengine.Context.RunInTransaction(func(tc...)), | 129 // It really should have been appengine.Context.RunInTransaction(func(tc...)), |
130 // but because it's not, this method is on dsImpl instead to mirror the official | 130 // but because it's not, this method is on dsImpl instead to mirror the official |
131 // API. | 131 // API. |
132 // | 132 // |
133 // The fake implementation also differs from the real implementation because the | 133 // The fake implementation also differs from the real implementation because the |
134 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the | 134 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the |
135 // test-access API for TaskQueue better (instead of trying to reconstitute the | 135 // test-access API for TaskQueue better (instead of trying to reconstitute the |
136 // state of the task queue from a bunch of datastore accesses). | 136 // state of the task queue from a bunch of datastore accesses). |
137 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti onOptions) error { | 137 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti onOptions) error { |
138 » curMC := cur(d.c) | 138 » errNope := errors.New("nope") |
139 | 139 |
140 » txnMC := curMC.mkTxn(o) | 140 » // Keep in separate function for defers. |
141 » loopBody := func(applyForReal bool) error { | |
142 » » curMC := cur(d.c) | |
141 | 143 |
142 » defer func() { | 144 » » txnMC := curMC.mkTxn(o) |
145 | |
146 » » defer func() { | |
147 » » » txnMC.Lock() | |
148 » » » defer txnMC.Unlock() | |
149 | |
150 » » » txnMC.endTxn() | |
151 » » }() | |
152 | |
153 » » if err := f(context.WithValue(d.c, memContextKey, txnMC)); err ! = nil { | |
154 » » » return err | |
155 » » } | |
156 | |
143 txnMC.Lock() | 157 txnMC.Lock() |
144 defer txnMC.Unlock() | 158 defer txnMC.Unlock() |
145 | 159 |
146 » » txnMC.endTxn() | 160 » » if !applyForReal { |
147 » }() | 161 » » » // Be defensive and expect that f may return ErrConcurre ntTransaction. So |
162 » » » // use some other error to signal that we skipped commit . | |
163 » » » return errNope | |
164 » » } | |
148 | 165 |
149 » if err := f(context.WithValue(d.c, memContextKey, txnMC)); err != nil { | 166 » » if curMC.canApplyTxn(txnMC) { |
150 » » return err | 167 » » » curMC.applyTxn(d.c, txnMC) |
168 » » } else { | |
169 » » » return ds.ErrConcurrentTransaction | |
170 » » } | |
171 » » return nil | |
151 } | 172 } |
152 | 173 |
153 » txnMC.Lock() | 174 » // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." |
154 » defer txnMC.Unlock() | 175 » maxAttempts := 3 |
155 | 176 » if o != nil && o.Attempts != 0 { |
156 » if curMC.canApplyTxn(txnMC) { | 177 » » maxAttempts = o.Attempts |
157 » » curMC.applyTxn(d.c, txnMC) | |
158 » } else { | |
159 » » return ds.ErrConcurrentTransaction | |
160 } | 178 } |
161 » return nil | 179 » for attempt := 0; attempt < maxAttempts; attempt++ { |
180 » » err := loopBody(attempt >= d.txnFakeRetry) | |
181 » » if err != errNope { | |
182 » » » return err | |
183 » » } | |
184 » } | |
185 » return ds.ErrConcurrentTransaction | |
iannucci
2015/09/11 22:31:11
Can we return a different error when the retries d
Vadim Sh.
2015/09/14 01:07:20
Real GAE returns ErrConcurrentTransaction: https:/
| |
162 } | 186 } |
OLD | NEW |