Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(93)

Side by Side Diff: filter/txnBuf/state.go

Issue 1776833002: txnbuf: Add write count budget, size budget. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/gae@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package txnBuf 5 package txnBuf
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sync" 9 "sync"
10 10
11 "github.com/luci/gae/impl/memory" 11 "github.com/luci/gae/impl/memory"
12 "github.com/luci/gae/service/datastore" 12 "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize" 13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gae/service/info" 14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/parallel" 16 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/common/stringset" 17 "github.com/luci/luci-go/common/stringset"
18 "golang.org/x/net/context" 18 "golang.org/x/net/context"
19 ) 19 )
20 20
21 // DefaultSizeBudget is the size budget for the root transaction. 21 // DefaultSizeBudget is the size budget for the root transaction.
22 // 22 //
23 // Because our estimation algorithm isn't entirely correct, we take 5% off 23 // Because our estimation algorithm isn't entirely correct, we take 5% off
24 // the limit for encoding and estimate inaccuracies. 24 // the limit for encoding and estimate inaccuracies.
25 // 25 //
26 // 10MB taken on 2015/09/24: 26 // 10MB taken on 2015/09/24:
27 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits 27 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits
28 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) 28 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95)
29 29
30 // DefaultSizeThreshold prevents the root transaction from getting too close 30 // DefaultWriteCountBudget is the maximum number of entities that can be written
31 // to the budget. If the code attempts to begin a transaction which would have 31 // in a single call.
32 // less than this threshold for its budget, the transaction will immediately 32 //
33 // return ErrTransactionTooLarge. 33 // This is not known to be documented, and has instead been extracted from a
34 const DefaultSizeThreshold = int64(10 * 1000) 34 // datastore error message.
35 const DefaultWriteCountBudget = 500
35 36
36 // XGTransactionGroupLimit is the number of transaction groups to allow in an 37 // XGTransactionGroupLimit is the number of transaction groups to allow in an
37 // XG transaction. 38 // XG transaction.
38 // 39 //
39 // 25 taken on 2015/09/24: 40 // 25 taken on 2015/09/24:
40 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction 41 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction
41 const XGTransactionGroupLimit = 25 42 const XGTransactionGroupLimit = 25
42 43
43 // sizeTracker tracks the size of a buffered transaction. The rules are simple: 44 // sizeTracker tracks the size of a buffered transaction. The rules are simple:
44 // * deletes count for the size of their key, but 0 data 45 // * deletes count for the size of their key, but 0 data
(...skipping 24 matching lines...) Expand all
69 size, has := s.keyToSize[key] 70 size, has := s.keyToSize[key]
70 return size, has 71 return size, has
71 } 72 }
72 73
73 // has returns true iff key has a tracked value. 74 // has returns true iff key has a tracked value.
74 func (s *sizeTracker) has(key string) bool { 75 func (s *sizeTracker) has(key string) bool {
75 _, has := s.keyToSize[key] 76 _, has := s.keyToSize[key]
76 return has 77 return has
77 } 78 }
78 79
80 // numWrites returns the number of tracked write operations.
81 func (s *sizeTracker) numWrites() int {
82 return len(s.keyToSize)
83 }
84
79 // dup returns a duplicate sizeTracker. 85 // dup returns a duplicate sizeTracker.
80 func (s *sizeTracker) dup() *sizeTracker { 86 func (s *sizeTracker) dup() *sizeTracker {
81 if len(s.keyToSize) == 0 { 87 if len(s.keyToSize) == 0 {
82 return &sizeTracker{} 88 return &sizeTracker{}
83 } 89 }
84 k2s := make(map[string]int64, len(s.keyToSize)) 90 k2s := make(map[string]int64, len(s.keyToSize))
85 for k, v := range s.keyToSize { 91 for k, v := range s.keyToSize {
86 k2s[k] = v 92 k2s[k] = v
87 } 93 }
88 return &sizeTracker{k2s, s.total} 94 return &sizeTracker{k2s, s.total}
(...skipping 14 matching lines...) Expand all
103 ns string 109 ns string
104 parentDS datastore.RawInterface 110 parentDS datastore.RawInterface
105 111
106 // sizeBudget is the number of bytes that this transaction has to operat e 112 // sizeBudget is the number of bytes that this transaction has to operat e
107 // within. It's only used when attempting to apply() the transaction, an d 113 // within. It's only used when attempting to apply() the transaction, an d
108 // it is the threshold for the delta of applying this transaction to the 114 // it is the threshold for the delta of applying this transaction to the
109 // parent transaction. Note that a buffered transaction could actually h ave 115 // parent transaction. Note that a buffered transaction could actually h ave
110 // a negative delta if the parent transaction had many large entities wh ich 116 // a negative delta if the parent transaction had many large entities wh ich
111 // the inner transaction deleted. 117 // the inner transaction deleted.
112 sizeBudget int64 118 sizeBudget int64
119 // countBudget is the number of entity writes that this transaction has to
120 // operate in.
121 writeCountBudget int
113 } 122 }
114 123
115 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error { 124 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error {
116 inf := info.Get(ctx) 125 inf := info.Get(ctx)
117 ns := inf.GetNamespace() 126 ns := inf.GetNamespace()
118 127
119 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) 128 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState)
120 roots := stringset.New(0) 129 roots := stringset.New(0)
121 rootLimit := 1 130 rootLimit := 1
122 if opts != nil && opts.XG { 131 if opts != nil && opts.XG {
123 rootLimit = XGTransactionGroupLimit 132 rootLimit = XGTransactionGroupLimit
124 } 133 }
125 » sizeBudget := DefaultSizeBudget 134 » sizeBudget, writeCountBudget := DefaultSizeBudget, DefaultWriteCountBudg et
126 if parentState != nil { 135 if parentState != nil {
127 // TODO(riannucci): this is a bit wonky since it means that a ch ild 136 // TODO(riannucci): this is a bit wonky since it means that a ch ild
128 // transaction declaring XG=true will only get to modify 25 grou ps IF 137 // transaction declaring XG=true will only get to modify 25 grou ps IF
129 // they're same groups affected by the parent transactions. So i nstead of 138 // they're same groups affected by the parent transactions. So i nstead of
130 // respecting opts.XG for inner transactions, we just dup everyt hing from 139 // respecting opts.XG for inner transactions, we just dup everyt hing from
131 // the parent transaction. 140 // the parent transaction.
132 roots = parentState.roots.Dup() 141 roots = parentState.roots.Dup()
133 rootLimit = parentState.rootLimit 142 rootLimit = parentState.rootLimit
134 143
135 sizeBudget = parentState.sizeBudget - parentState.entState.total 144 sizeBudget = parentState.sizeBudget - parentState.entState.total
136 » » if sizeBudget < DefaultSizeThreshold { 145 » » writeCountBudget = parentState.writeCountBudget - parentState.en tState.numWrites()
137 » » » return ErrTransactionTooLarge
138 » » }
139 } 146 }
140 147
141 bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) 148 bufDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
142 if err != nil { 149 if err != nil {
143 return err 150 return err
144 } 151 }
145 152
146 state := &txnBufState{ 153 state := &txnBufState{
147 » » entState: &sizeTracker{}, 154 » » entState: &sizeTracker{},
148 » » bufDS: bufDS.Raw(), 155 » » bufDS: bufDS.Raw(),
149 » » roots: roots, 156 » » roots: roots,
150 » » rootLimit: rootLimit, 157 » » rootLimit: rootLimit,
151 » » ns: ns, 158 » » ns: ns,
152 » » aid: inf.AppID(), 159 » » aid: inf.AppID(),
153 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufHaveLoc k, true)).Raw(), 160 » » parentDS: datastore.Get(context.WithValue(ctx, dsTxnBufH aveLock, true)).Raw(),
154 » » sizeBudget: sizeBudget, 161 » » sizeBudget: sizeBudget,
162 » » writeCountBudget: writeCountBudget,
155 } 163 }
156 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil { 164 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil {
157 return err 165 return err
158 } 166 }
159 167
160 // no reason to unlock this ever. At this point it's toast. 168 // no reason to unlock this ever. At this point it's toast.
161 state.Lock() 169 state.Lock()
162 170
163 if parentState == nil { 171 if parentState == nil {
164 return commitToReal(state) 172 return commitToReal(state)
(...skipping 320 matching lines...) Expand 10 before | Expand all | Expand 10 after
485 493
486 return 494 return
487 } 495 }
488 496
489 func (t *txnBufState) canApplyLocked(s *txnBufState) error { 497 func (t *txnBufState) canApplyLocked(s *txnBufState) error {
490 proposedState := t.entState.dup() 498 proposedState := t.entState.dup()
491 499
492 for k, v := range s.entState.keyToSize { 500 for k, v := range s.entState.keyToSize {
493 proposedState.set(k, v) 501 proposedState.set(k, v)
494 } 502 }
495 » if proposedState.total > s.sizeBudget { 503 » switch {
504 » case proposedState.numWrites() > t.writeCountBudget:
505 » » // The new net number of writes must be below the parent's write count
506 » » // cutoff.
507 » » fallthrough
508
509 » case proposedState.total > t.sizeBudget:
510 » » // Make sure our new calculated size is within the parent's size budget.
511 » » //
512 » » // We have:
513 » » // - proposedState.total: The "new world" total bytes were this child
514 » » // transaction committed to the parent.
515 » » // - t.sizeBudget: The maximum number of bytes that this parent can
516 » » // accommodate.
496 return ErrTransactionTooLarge 517 return ErrTransactionTooLarge
497 } 518 }
519
498 return nil 520 return nil
499 } 521 }
500 522
501 func (t *txnBufState) commitLocked(s *txnBufState) { 523 func (t *txnBufState) commitLocked(s *txnBufState) {
502 toPut, toPutKeys, toDel := s.effect() 524 toPut, toPutKeys, toDel := s.effect()
503 525
504 if len(toPut) > 0 { 526 if len(toPut) > 0 {
505 impossible(t.putMulti(toPutKeys, toPut, 527 impossible(t.putMulti(toPutKeys, toPut,
506 func(_ *datastore.Key, err error) error { return err }, true)) 528 func(_ *datastore.Key, err error) error { return err }, true))
507 } 529 }
508 530
509 if len(toDel) > 0 { 531 if len(toDel) > 0 {
510 impossible(t.deleteMulti(toDel, func(err error) error { return e rr }, true)) 532 impossible(t.deleteMulti(toDel, func(err error) error { return e rr }, true))
511 } 533 }
512 } 534 }
513 535
514 // toEncoded returns a list of all of the serialized versions of these keys, 536 // toEncoded returns a list of all of the serialized versions of these keys,
515 // plus a stringset of all the encoded root keys that `keys` represents. 537 // plus a stringset of all the encoded root keys that `keys` represents.
516 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { 538 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
517 roots = stringset.New(len(keys)) 539 roots = stringset.New(len(keys))
518 full = make([]string, len(keys)) 540 full = make([]string, len(keys))
519 for i, k := range keys { 541 for i, k := range keys {
520 roots.Add(string(serialize.ToBytes(k.Root()))) 542 roots.Add(string(serialize.ToBytes(k.Root())))
521 full[i] = string(serialize.ToBytes(k)) 543 full[i] = string(serialize.ToBytes(k))
522 } 544 }
523 return 545 return
524 } 546 }
OLDNEW
« no previous file with comments | « no previous file | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698