OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "strings" | 9 "strings" |
10 "sync" | 10 "sync" |
11 | 11 |
12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/luci-go/common/logging/memlogger" | 13 "github.com/luci/luci-go/common/logging/memlogger" |
| 14 |
14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
15 ) | 16 ) |
16 | 17 |
17 var serializationDeterministic = false | 18 var serializationDeterministic = false |
18 | 19 |
19 type memContextObj interface { | 20 type memContextObj interface { |
20 sync.Locker | 21 sync.Locker |
21 canApplyTxn(m memContextObj) bool | 22 canApplyTxn(m memContextObj) bool |
22 applyTxn(c context.Context, m memContextObj) | 23 applyTxn(c context.Context, m memContextObj) |
23 | 24 |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
121 } | 122 } |
122 c = memlogger.Use(c) | 123 c = memlogger.Use(c) |
123 | 124 |
124 fqAppID := aid | 125 fqAppID := aid |
125 if parts := strings.SplitN(fqAppID, "~", 2); len(parts) == 2 { | 126 if parts := strings.SplitN(fqAppID, "~", 2); len(parts) == 2 { |
126 aid = parts[1] | 127 aid = parts[1] |
127 } | 128 } |
128 | 129 |
129 memctx := newMemContext(fqAppID) | 130 memctx := newMemContext(fqAppID) |
130 c = context.WithValue(c, memContextKey, memctx) | 131 c = context.WithValue(c, memContextKey, memctx) |
131 c = context.WithValue(c, memContextNoTxnKey, memctx) | |
132 c = useGID(c, func(mod *globalInfoData) { | 132 c = useGID(c, func(mod *globalInfoData) { |
133 mod.appID = aid | 133 mod.appID = aid |
134 mod.fqAppID = fqAppID | 134 mod.fqAppID = fqAppID |
135 }) | 135 }) |
136 return useMod(useMail(useUser(useTQ(useRDS(useMC(useGI(c))))))) | 136 return useMod(useMail(useUser(useTQ(useRDS(useMC(useGI(c))))))) |
137 } | 137 } |
138 | 138 |
139 func cur(c context.Context) (p *memContext) { | 139 func cur(c context.Context) (*memContext, bool) { |
140 » p, _ = c.Value(memContextKey).(*memContext) | 140 » if txn := c.Value(currentTxnKey); txn != nil { |
141 » return | 141 » » // We are in a Transaction. |
142 } | 142 » » return txn.(*memContext), true |
143 | 143 » } |
144 func curNoTxn(c context.Context) (p *memContext) { | 144 » return c.Value(memContextKey).(*memContext), false |
145 » p, _ = c.Value(memContextNoTxnKey).(*memContext) | |
146 » return | |
147 } | 145 } |
148 | 146 |
149 type memContextKeyType int | 147 type memContextKeyType int |
150 | 148 |
151 var ( | 149 var ( |
152 » memContextKey memContextKeyType | 150 » memContextKey memContextKeyType |
153 » memContextNoTxnKey memContextKeyType = 1 | 151 » currentTxnKey = 1 |
154 ) | 152 ) |
155 | 153 |
156 // weird stuff | 154 // weird stuff |
157 | 155 |
158 // RunInTransaction is here because it's really a service-wide transaction, not | 156 // RunInTransaction is here because it's really a service-wide transaction, not |
159 // just in the datastore. TaskQueue behaves differently in a transaction in | 157 // just in the datastore. TaskQueue behaves differently in a transaction in |
160 // a couple ways, for example. | 158 // a couple ways, for example. |
161 // | 159 // |
162 // It really should have been appengine.Context.RunInTransaction(func(tc...)), | 160 // It really should have been appengine.Context.RunInTransaction(func(tc...)), |
163 // but because it's not, this method is on dsImpl instead to mirror the official | 161 // but because it's not, this method is on dsImpl instead to mirror the official |
164 // API. | 162 // API. |
165 // | 163 // |
166 // The fake implementation also differs from the real implementation because the | 164 // The fake implementation also differs from the real implementation because the |
167 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the | 165 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the |
168 // test-access API for TaskQueue better (instead of trying to reconstitute the | 166 // test-access API for TaskQueue better (instead of trying to reconstitute the |
169 // state of the task queue from a bunch of datastore accesses). | 167 // state of the task queue from a bunch of datastore accesses). |
170 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti
onOptions) error { | 168 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti
onOptions) error { |
171 if d.data.getDisableSpecialEntities() { | 169 if d.data.getDisableSpecialEntities() { |
172 return errors.New("special entities are disabled. no transaction
s for you") | 170 return errors.New("special entities are disabled. no transaction
s for you") |
173 } | 171 } |
174 | 172 |
175 // Keep in separate function for defers. | 173 // Keep in separate function for defers. |
176 loopBody := func(applyForReal bool) error { | 174 loopBody := func(applyForReal bool) error { |
177 » » curMC := cur(d.c) | 175 » » curMC, inTxn := cur(d) |
| 176 » » if inTxn { |
| 177 » » » return errors.New("datastore: nested transactions are no
t supported") |
| 178 » » } |
178 | 179 |
179 txnMC := curMC.mkTxn(o) | 180 txnMC := curMC.mkTxn(o) |
180 | 181 |
181 defer func() { | 182 defer func() { |
182 txnMC.Lock() | 183 txnMC.Lock() |
183 defer txnMC.Unlock() | 184 defer txnMC.Unlock() |
184 | 185 |
185 txnMC.endTxn() | 186 txnMC.endTxn() |
186 }() | 187 }() |
187 | 188 |
188 » » if err := f(context.WithValue(d.c, memContextKey, txnMC)); err !
= nil { | 189 » » if err := f(context.WithValue(d, currentTxnKey, txnMC)); err !=
nil { |
189 return err | 190 return err |
190 } | 191 } |
191 | 192 |
192 txnMC.Lock() | 193 txnMC.Lock() |
193 defer txnMC.Unlock() | 194 defer txnMC.Unlock() |
194 | 195 |
195 if applyForReal && curMC.canApplyTxn(txnMC) { | 196 if applyForReal && curMC.canApplyTxn(txnMC) { |
196 » » » curMC.applyTxn(d.c, txnMC) | 197 » » » curMC.applyTxn(d, txnMC) |
197 } else { | 198 } else { |
198 return ds.ErrConcurrentTransaction | 199 return ds.ErrConcurrentTransaction |
199 } | 200 } |
200 return nil | 201 return nil |
201 } | 202 } |
202 | 203 |
203 // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." | 204 // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." |
204 attempts := 3 | 205 attempts := 3 |
205 if o != nil && o.Attempts != 0 { | 206 if o != nil && o.Attempts != 0 { |
206 attempts = o.Attempts | 207 attempts = o.Attempts |
207 } | 208 } |
208 for attempt := 0; attempt < attempts; attempt++ { | 209 for attempt := 0; attempt < attempts; attempt++ { |
209 if err := loopBody(attempt >= d.data.txnFakeRetry); err != ds.Er
rConcurrentTransaction { | 210 if err := loopBody(attempt >= d.data.txnFakeRetry); err != ds.Er
rConcurrentTransaction { |
210 return err | 211 return err |
211 } | 212 } |
212 } | 213 } |
213 return ds.ErrConcurrentTransaction | 214 return ds.ErrConcurrentTransaction |
214 } | 215 } |
OLD | NEW |