Index: go/src/infra/gae/libs/wrapper/memory/context.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/context.go b/go/src/infra/gae/libs/wrapper/memory/context.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..2f8a841b6b606840e9f3738bbbce48a9997bc813 |
--- /dev/null |
+++ b/go/src/infra/gae/libs/wrapper/memory/context.go |
@@ -0,0 +1,161 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package memory |
+ |
+import ( |
+ "fmt" |
+ "math/rand" |
+ "sync" |
+ |
+ "golang.org/x/net/context" |
+ |
+ "appengine/datastore" |
+ |
+ "infra/gae/libs/wrapper" |
+) |
+ |
+type memContextObj interface { |
+ sync.Locker |
+ canApplyTxn(m memContextObj) bool |
+ applyTxn(rnd *rand.Rand, m memContextObj) |
+ |
+ endTxn() |
+ mkTxn(*datastore.TransactionOptions) (memContextObj, error) |
+} |
+ |
+type memContext []memContextObj |
Vadim Sh.
2015/05/24 19:43:26
memContext implements memContextObj too?
iannucci
2015/05/24 20:33:54
yeah it does (plus some extra stuff)
|
+ |
+func newMemContext() memContext { |
+ return memContext{ |
+ newTaskQueueData(), |
+ newDataStoreData(), |
+ } |
+} |
+ |
+var memContextIndices = map[string]int{ |
+ "TQ": 0, |
+ "DS": 1, |
+} |
+ |
+func (m memContext) Get(item string) memContextObj { |
+ if i, ok := memContextIndices[item]; !ok { |
+ panic(fmt.Errorf("wrapper/memory: cannot get context item %q", item)) |
+ } else { |
+ return m[i] |
+ } |
+} |
+ |
+func (m memContext) Lock() { |
+ for _, itm := range m { |
+ itm.Lock() |
+ } |
+} |
+ |
+func (m memContext) Unlock() { |
+ for i := len(m) - 1; i >= 0; i-- { |
+ m[i].Unlock() |
+ } |
+} |
+ |
+func (m memContext) endTxn() { |
+ for _, itm := range m { |
+ itm.endTxn() |
+ } |
+} |
+ |
+func (m memContext) mkTxn(o *datastore.TransactionOptions) (ret memContext, err error) { |
+ for _, itm := range m { |
+ newItm, err := itm.mkTxn(o) |
+ if err != nil { |
+ return nil, err |
+ } |
+ ret = append(ret, newItm) |
+ } |
+ return ret, nil |
+} |
+ |
+func (m memContext) canApplyTxn(txnCtx memContext) bool { |
+ for i := range m { |
+ if !m[i].canApplyTxn(txnCtx[i]) { |
+ return false |
+ } |
+ } |
+ return true |
+} |
+ |
+func (m memContext) applyTxn(rnd *rand.Rand, txnCtx memContext) { |
+ for i := range m { |
+ m[i].applyTxn(rnd, txnCtx[i]) |
+ } |
+} |
+ |
+// Enable adds a new memory context to c. This new memory context will have |
+// a zeroed state. |
+func Enable(c context.Context) context.Context { |
+ return context.WithValue( |
+ context.WithValue(c, memContextKey, newMemContext()), |
+ giContextKey, newGlobalInfoData()) |
+} |
+ |
+// Use calls ALL of this packages Use* methods on c. This enables all |
+// wrapper/memory Get* api's. |
+func Use(c context.Context) context.Context { |
+ return UseTQ(UseDS(UseMC(UseGI(c)))) |
+} |
+ |
+func cur(c context.Context) (p memContext) { |
+ p, _ = c.Value(memContextKey).(memContext) |
+ return |
+} |
+ |
+type memContextKeyType int |
+ |
+var memContextKey memContextKeyType |
+ |
+// weird stuff |
+ |
+// RunInTransaction is here because it's really a service-wide transaction, not just in |
+// the datastore. TaskQueue behaves differently in a transaction in a couple ways, for |
+// example. |
+// |
+// It really should have been appengine.Context.RunInTransaction(func(tc...)), but |
+// because it's not, this method is on dsImpl instead to mirror the official API. |
+// |
+// The fake implementation also differs from the real implementation because |
+// the fake TaskQueue is NOT backed by the fake Datastore. This is done to make |
+// the test-access API for TaskQueue better (instead of trying to reconstitute |
+// the state of the task queue from a bunch of datastore accesses). |
+func (d *dsImpl) RunInTransaction(f func(context.Context) error, o *datastore.TransactionOptions) (err error) { |
+ curMC := cur(d.c) |
+ |
+ txnMC, err := curMC.mkTxn(o) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ defer func() { |
+ txnMC.Lock() |
+ defer txnMC.Unlock() |
+ |
+ txnMC.endTxn() |
+ }() |
+ |
+ err = f(context.WithValue(d.c, memContextKey, txnMC)) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ return func() error { |
+ txnMC.Lock() |
+ defer txnMC.Unlock() |
+ |
+ if curMC.canApplyTxn(txnMC) { |
+ curMC.applyTxn(wrapper.GetMathRand(d.c), txnMC) |
+ } else { |
+ return datastore.ErrConcurrentTransaction |
+ } |
+ return nil |
+ }() |
+} |