OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package txnBuf | 5 package txnBuf |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "sync" | 9 "sync" |
10 | 10 |
11 "github.com/luci/gae/impl/memory" | 11 "github.com/luci/gae/impl/memory" |
12 "github.com/luci/gae/service/datastore" | 12 "github.com/luci/gae/service/datastore" |
13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
14 "github.com/luci/gae/service/info" | 14 "github.com/luci/gae/service/info" |
15 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/parallel" | 16 "github.com/luci/luci-go/common/parallel" |
17 "github.com/luci/luci-go/common/stringset" | 17 "github.com/luci/luci-go/common/stringset" |
18 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
19 ) | 19 ) |
20 | 20 |
21 // DefaultSizeBudget is the size budget for the root transaction. | 21 // DefaultSizeBudget is the size budget for the root transaction. |
22 // | 22 // |
23 // Because our estimation algorithm isn't entirely correct, we take 5% off | 23 // Because our estimation algorithm isn't entirely correct, we take 5% off |
24 // the limit for encoding and estimate inaccuracies. | 24 // the limit for encoding and estimate inaccuracies. |
25 // | 25 // |
26 // 10MB taken on 2015/09/24: | 26 // 10MB taken on 2015/09/24: |
27 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits | 27 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits |
28 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) | 28 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) |
29 | 29 |
30 // DefaultSizeThreshold prevents the root transaction from getting too close | 30 // DefaultWriteCountBudget is the maximum number of entities that can be written |
31 // to the budget. If the code attempts to begin a transaction which would have | 31 // in a single call. |
32 // less than this threshold for its budget, the transaction will immediately | 32 // |
33 // return ErrTransactionTooLarge. | 33 // This is not known to be documented, and has instead been extracted from a |
34 const DefaultSizeThreshold = int64(10 * 1000) | 34 // datastore error message. |
| 35 const DefaultWriteCountBudget = 500 |
35 | 36 |
36 // XGTransactionGroupLimit is the number of transaction groups to allow in an | 37 // XGTransactionGroupLimit is the number of transaction groups to allow in an |
37 // XG transaction. | 38 // XG transaction. |
38 // | 39 // |
39 // 25 taken on 2015/09/24: | 40 // 25 taken on 2015/09/24: |
40 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can
_be_done_in_a_transaction | 41 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can
_be_done_in_a_transaction |
41 const XGTransactionGroupLimit = 25 | 42 const XGTransactionGroupLimit = 25 |
42 | 43 |
43 // sizeTracker tracks the size of a buffered transaction. The rules are simple: | 44 // sizeTracker tracks the size of a buffered transaction. The rules are simple: |
44 // * deletes count for the size of their key, but 0 data | 45 // * deletes count for the size of their key, but 0 data |
(...skipping 24 matching lines...) Expand all Loading... |
69 size, has := s.keyToSize[key] | 70 size, has := s.keyToSize[key] |
70 return size, has | 71 return size, has |
71 } | 72 } |
72 | 73 |
73 // has returns true iff key has a tracked value. | 74 // has returns true iff key has a tracked value. |
74 func (s *sizeTracker) has(key string) bool { | 75 func (s *sizeTracker) has(key string) bool { |
75 _, has := s.keyToSize[key] | 76 _, has := s.keyToSize[key] |
76 return has | 77 return has |
77 } | 78 } |
78 | 79 |
| 80 // numWrites returns the number of tracked write operations. |
| 81 func (s *sizeTracker) numWrites() int { |
| 82 return len(s.keyToSize) |
| 83 } |
| 84 |
79 // dup returns a duplicate sizeTracker. | 85 // dup returns a duplicate sizeTracker. |
80 func (s *sizeTracker) dup() *sizeTracker { | 86 func (s *sizeTracker) dup() *sizeTracker { |
81 if len(s.keyToSize) == 0 { | 87 if len(s.keyToSize) == 0 { |
82 return &sizeTracker{} | 88 return &sizeTracker{} |
83 } | 89 } |
84 k2s := make(map[string]int64, len(s.keyToSize)) | 90 k2s := make(map[string]int64, len(s.keyToSize)) |
85 for k, v := range s.keyToSize { | 91 for k, v := range s.keyToSize { |
86 k2s[k] = v | 92 k2s[k] = v |
87 } | 93 } |
88 return &sizeTracker{k2s, s.total} | 94 return &sizeTracker{k2s, s.total} |
(...skipping 14 matching lines...) Expand all Loading... |
103 ns string | 109 ns string |
104 parentDS datastore.RawInterface | 110 parentDS datastore.RawInterface |
105 | 111 |
106 // sizeBudget is the number of bytes that this transaction has to operat
e | 112 // sizeBudget is the number of bytes that this transaction has to operat
e |
107 // within. It's only used when attempting to apply() the transaction, an
d | 113 // within. It's only used when attempting to apply() the transaction, an
d |
108 // it is the threshold for the delta of applying this transaction to the | 114 // it is the threshold for the delta of applying this transaction to the |
109 // parent transaction. Note that a buffered transaction could actually h
ave | 115 // parent transaction. Note that a buffered transaction could actually h
ave |
110 // a negative delta if the parent transaction had many large entities wh
ich | 116 // a negative delta if the parent transaction had many large entities wh
ich |
111 // the inner transaction deleted. | 117 // the inner transaction deleted. |
112 sizeBudget int64 | 118 sizeBudget int64 |
| 119 // countBudget is the number of entity writes that this transaction has
to |
| 120 // operate in. |
| 121 writeCountBudget int |
113 } | 122 } |
114 | 123 |
115 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { | 124 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas
tore.TransactionOptions) error { |
116 inf := info.Get(ctx) | 125 inf := info.Get(ctx) |
117 ns := inf.GetNamespace() | 126 ns := inf.GetNamespace() |
118 | 127 |
119 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | 128 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) |
120 roots := stringset.New(0) | 129 roots := stringset.New(0) |
121 rootLimit := 1 | 130 rootLimit := 1 |
122 if opts != nil && opts.XG { | 131 if opts != nil && opts.XG { |
123 rootLimit = XGTransactionGroupLimit | 132 rootLimit = XGTransactionGroupLimit |
124 } | 133 } |
125 » sizeBudget := DefaultSizeBudget | 134 » sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudg
et |
126 if parentState != nil { | 135 if parentState != nil { |
127 // TODO(riannucci): this is a bit wonky since it means that a ch
ild | 136 // TODO(riannucci): this is a bit wonky since it means that a ch
ild |
128 // transaction declaring XG=true will only get to modify 25 grou
ps IF | 137 // transaction declaring XG=true will only get to modify 25 grou
ps IF |
129 // they're same groups affected by the parent transactions. So i
nstead of | 138 // they're same groups affected by the parent transactions. So i
nstead of |
130 // respecting opts.XG for inner transactions, we just dup everyt
hing from | 139 // respecting opts.XG for inner transactions, we just dup everyt
hing from |
131 // the parent transaction. | 140 // the parent transaction. |
132 roots = parentState.roots.Dup() | 141 roots = parentState.roots.Dup() |
133 rootLimit = parentState.rootLimit | 142 rootLimit = parentState.rootLimit |
134 | 143 |
135 sizeBudget = parentState.sizeBudget - parentState.entState.total | 144 sizeBudget = parentState.sizeBudget - parentState.entState.total |
136 » » if sizeBudget < DefaultSizeThreshold { | 145 » » writeCountBudget = parentState.writeCountBudget - parentState.en
tState.numWrites() |
137 » » » return ErrTransactionTooLarge | |
138 » » } | |
139 } | 146 } |
140 | 147 |
141 bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) | 148 bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) |
142 if err != nil { | 149 if err != nil { |
143 return err | 150 return err |
144 } | 151 } |
145 | 152 |
146 state := &txnBufState{ | 153 state := &txnBufState{ |
147 » » entState: &sizeTracker{}, | 154 » » entState: &sizeTracker{}, |
148 » » bufDS: bufDS.Raw(), | 155 » » bufDS: bufDS.Raw(), |
149 » » roots: roots, | 156 » » roots: roots, |
150 » » rootLimit: rootLimit, | 157 » » rootLimit: rootLimit, |
151 » » ns: ns, | 158 » » ns: ns, |
152 » » aid: inf.AppID(), | 159 » » aid: inf.AppID(), |
153 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLoc
k, true)).Raw(), | 160 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufH
aveLock, true)).Raw(), |
154 » » sizeBudget: sizeBudget, | 161 » » sizeBudget: sizeBudget, |
| 162 » » writeCountBudget: writeCountBudget, |
155 } | 163 } |
156 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { | 164 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { |
157 return err | 165 return err |
158 } | 166 } |
159 | 167 |
160 // no reason to unlock this ever. At this point it's toast. | 168 // no reason to unlock this ever. At this point it's toast. |
161 state.Lock() | 169 state.Lock() |
162 | 170 |
163 if parentState == nil { | 171 if parentState == nil { |
164 return commitToReal(state) | 172 return commitToReal(state) |
(...skipping 320 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
485 | 493 |
486 return | 494 return |
487 } | 495 } |
488 | 496 |
489 func (t *txnBufState) canApplyLocked(s *txnBufState) error { | 497 func (t *txnBufState) canApplyLocked(s *txnBufState) error { |
490 proposedState := t.entState.dup() | 498 proposedState := t.entState.dup() |
491 | 499 |
492 for k, v := range s.entState.keyToSize { | 500 for k, v := range s.entState.keyToSize { |
493 proposedState.set(k, v) | 501 proposedState.set(k, v) |
494 } | 502 } |
495 » if proposedState.total > s.sizeBudget { | 503 » switch { |
| 504 » case proposedState.numWrites() > t.writeCountBudget: |
| 505 » » // The new net number of writes must be below the parent's write
count |
| 506 » » // cutoff. |
| 507 » » fallthrough |
| 508 |
| 509 » case proposedState.total > t.sizeBudget: |
| 510 » » // Make sure our new calculated size is within the parent's size
budget. |
| 511 » » // |
| 512 » » // We have: |
| 513 » » // - proposedState.total: The "new world" total bytes were this
child |
| 514 » » // transaction committed to the parent. |
| 515 » » // - t.sizeBudget: The maximum number of bytes that this parent
can |
| 516 » » // accommodate. |
496 return ErrTransactionTooLarge | 517 return ErrTransactionTooLarge |
497 } | 518 } |
| 519 |
498 return nil | 520 return nil |
499 } | 521 } |
500 | 522 |
501 func (t *txnBufState) commitLocked(s *txnBufState) { | 523 func (t *txnBufState) commitLocked(s *txnBufState) { |
502 toPut, toPutKeys, toDel := s.effect() | 524 toPut, toPutKeys, toDel := s.effect() |
503 | 525 |
504 if len(toPut) > 0 { | 526 if len(toPut) > 0 { |
505 impossible(t.putMulti(toPutKeys, toPut, | 527 impossible(t.putMulti(toPutKeys, toPut, |
506 func(_ *datastore.Key, err error) error { return err },
true)) | 528 func(_ *datastore.Key, err error) error { return err },
true)) |
507 } | 529 } |
508 | 530 |
509 if len(toDel) > 0 { | 531 if len(toDel) > 0 { |
510 impossible(t.deleteMulti(toDel, func(err error) error { return e
rr }, true)) | 532 impossible(t.deleteMulti(toDel, func(err error) error { return e
rr }, true)) |
511 } | 533 } |
512 } | 534 } |
513 | 535 |
514 // toEncoded returns a list of all of the serialized versions of these keys, | 536 // toEncoded returns a list of all of the serialized versions of these keys, |
515 // plus a stringset of all the encoded root keys that `keys` represents. | 537 // plus a stringset of all the encoded root keys that `keys` represents. |
516 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | 538 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { |
517 roots = stringset.New(len(keys)) | 539 roots = stringset.New(len(keys)) |
518 full = make([]string, len(keys)) | 540 full = make([]string, len(keys)) |
519 for i, k := range keys { | 541 for i, k := range keys { |
520 roots.Add(string(serialize.ToBytes(k.Root()))) | 542 roots.Add(string(serialize.ToBytes(k.Root()))) |
521 full[i] = string(serialize.ToBytes(k)) | 543 full[i] = string(serialize.ToBytes(k)) |
522 } | 544 } |
523 return | 545 return |
524 } | 546 } |
OLD | NEW |