| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "strings" | 9 "strings" |
| 10 "sync" | 10 "sync" |
| 11 | 11 |
| 12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/luci-go/common/logging/memlogger" | 13 "github.com/luci/luci-go/common/logging/memlogger" |
| 14 |
| 14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
| 15 ) | 16 ) |
| 16 | 17 |
| 17 var serializationDeterministic = false | 18 var serializationDeterministic = false |
| 18 | 19 |
| 19 type memContextObj interface { | 20 type memContextObj interface { |
| 20 sync.Locker | 21 sync.Locker |
| 21 canApplyTxn(m memContextObj) bool | 22 canApplyTxn(m memContextObj) bool |
| 22 applyTxn(c context.Context, m memContextObj) | 23 applyTxn(c context.Context, m memContextObj) |
| 23 | 24 |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 121 } | 122 } |
| 122 c = memlogger.Use(c) | 123 c = memlogger.Use(c) |
| 123 | 124 |
| 124 fqAppID := aid | 125 fqAppID := aid |
| 125 if parts := strings.SplitN(fqAppID, "~", 2); len(parts) == 2 { | 126 if parts := strings.SplitN(fqAppID, "~", 2); len(parts) == 2 { |
| 126 aid = parts[1] | 127 aid = parts[1] |
| 127 } | 128 } |
| 128 | 129 |
| 129 memctx := newMemContext(fqAppID) | 130 memctx := newMemContext(fqAppID) |
| 130 c = context.WithValue(c, memContextKey, memctx) | 131 c = context.WithValue(c, memContextKey, memctx) |
| 131 c = context.WithValue(c, memContextNoTxnKey, memctx) | |
| 132 c = useGID(c, func(mod *globalInfoData) { | 132 c = useGID(c, func(mod *globalInfoData) { |
| 133 mod.appID = aid | 133 mod.appID = aid |
| 134 mod.fqAppID = fqAppID | 134 mod.fqAppID = fqAppID |
| 135 }) | 135 }) |
| 136 return useMod(useMail(useUser(useTQ(useRDS(useMC(useGI(c))))))) | 136 return useMod(useMail(useUser(useTQ(useRDS(useMC(useGI(c))))))) |
| 137 } | 137 } |
| 138 | 138 |
| 139 func cur(c context.Context) (p *memContext) { | 139 func cur(c context.Context) (*memContext, bool) { |
| 140 » p, _ = c.Value(memContextKey).(*memContext) | 140 » if txn := c.Value(currentTxnKey); txn != nil { |
| 141 » return | 141 » » // We are in a Transaction. |
| 142 } | 142 » » return txn.(*memContext), true |
| 143 | 143 » } |
| 144 func curNoTxn(c context.Context) (p *memContext) { | 144 » return c.Value(memContextKey).(*memContext), false |
| 145 » p, _ = c.Value(memContextNoTxnKey).(*memContext) | |
| 146 » return | |
| 147 } | 145 } |
| 148 | 146 |
| 149 type memContextKeyType int | 147 type memContextKeyType int |
| 150 | 148 |
| 151 var ( | 149 var ( |
| 152 » memContextKey memContextKeyType | 150 » memContextKey memContextKeyType |
| 153 » memContextNoTxnKey memContextKeyType = 1 | 151 » currentTxnKey = 1 |
| 154 ) | 152 ) |
| 155 | 153 |
| 156 // weird stuff | 154 // weird stuff |
| 157 | 155 |
| 158 // RunInTransaction is here because it's really a service-wide transaction, not | 156 // RunInTransaction is here because it's really a service-wide transaction, not |
| 159 // just in the datastore. TaskQueue behaves differently in a transaction in | 157 // just in the datastore. TaskQueue behaves differently in a transaction in |
| 160 // a couple ways, for example. | 158 // a couple ways, for example. |
| 161 // | 159 // |
| 162 // It really should have been appengine.Context.RunInTransaction(func(tc...)), | 160 // It really should have been appengine.Context.RunInTransaction(func(tc...)), |
| 163 // but because it's not, this method is on dsImpl instead to mirror the official | 161 // but because it's not, this method is on dsImpl instead to mirror the official |
| 164 // API. | 162 // API. |
| 165 // | 163 // |
| 166 // The fake implementation also differs from the real implementation because the | 164 // The fake implementation also differs from the real implementation because the |
| 167 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the | 165 // fake TaskQueue is NOT backed by the fake Datastore. This is done to make the |
| 168 // test-access API for TaskQueue better (instead of trying to reconstitute the | 166 // test-access API for TaskQueue better (instead of trying to reconstitute the |
| 169 // state of the task queue from a bunch of datastore accesses). | 167 // state of the task queue from a bunch of datastore accesses). |
| 170 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti
onOptions) error { | 168 func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *ds.Transacti
onOptions) error { |
| 171 if d.data.getDisableSpecialEntities() { | 169 if d.data.getDisableSpecialEntities() { |
| 172 return errors.New("special entities are disabled. no transaction
s for you") | 170 return errors.New("special entities are disabled. no transaction
s for you") |
| 173 } | 171 } |
| 174 | 172 |
| 175 // Keep in separate function for defers. | 173 // Keep in separate function for defers. |
| 176 loopBody := func(applyForReal bool) error { | 174 loopBody := func(applyForReal bool) error { |
| 177 » » curMC := cur(d.c) | 175 » » curMC, inTxn := cur(d) |
| 176 » » if inTxn { |
| 177 » » » return errors.New("datastore: nested transactions are no
t supported") |
| 178 » » } |
| 178 | 179 |
| 179 txnMC := curMC.mkTxn(o) | 180 txnMC := curMC.mkTxn(o) |
| 180 | 181 |
| 181 defer func() { | 182 defer func() { |
| 182 txnMC.Lock() | 183 txnMC.Lock() |
| 183 defer txnMC.Unlock() | 184 defer txnMC.Unlock() |
| 184 | 185 |
| 185 txnMC.endTxn() | 186 txnMC.endTxn() |
| 186 }() | 187 }() |
| 187 | 188 |
| 188 » » if err := f(context.WithValue(d.c, memContextKey, txnMC)); err !
= nil { | 189 » » if err := f(context.WithValue(d, currentTxnKey, txnMC)); err !=
nil { |
| 189 return err | 190 return err |
| 190 } | 191 } |
| 191 | 192 |
| 192 txnMC.Lock() | 193 txnMC.Lock() |
| 193 defer txnMC.Unlock() | 194 defer txnMC.Unlock() |
| 194 | 195 |
| 195 if applyForReal && curMC.canApplyTxn(txnMC) { | 196 if applyForReal && curMC.canApplyTxn(txnMC) { |
| 196 » » » curMC.applyTxn(d.c, txnMC) | 197 » » » curMC.applyTxn(d, txnMC) |
| 197 } else { | 198 } else { |
| 198 return ds.ErrConcurrentTransaction | 199 return ds.ErrConcurrentTransaction |
| 199 } | 200 } |
| 200 return nil | 201 return nil |
| 201 } | 202 } |
| 202 | 203 |
| 203 // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." | 204 // From GAE docs for TransactionOptions: "If omitted, it defaults to 3." |
| 204 attempts := 3 | 205 attempts := 3 |
| 205 if o != nil && o.Attempts != 0 { | 206 if o != nil && o.Attempts != 0 { |
| 206 attempts = o.Attempts | 207 attempts = o.Attempts |
| 207 } | 208 } |
| 208 for attempt := 0; attempt < attempts; attempt++ { | 209 for attempt := 0; attempt < attempts; attempt++ { |
| 209 if err := loopBody(attempt >= d.data.txnFakeRetry); err != ds.Er
rConcurrentTransaction { | 210 if err := loopBody(attempt >= d.data.txnFakeRetry); err != ds.Er
rConcurrentTransaction { |
| 210 return err | 211 return err |
| 211 } | 212 } |
| 212 } | 213 } |
| 213 return ds.ErrConcurrentTransaction | 214 return ds.ErrConcurrentTransaction |
| 214 } | 215 } |
| OLD | NEW |