Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: filter/txnBuf/state.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: one more test Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sync"
10
11 "github.com/luci/gae/impl/memory"
12 "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/stringset"
17 "golang.org/x/net/context"
18 )
19
20 // DefaultSizeBudget is the size budget for the root transaction.
21 //
22 // Because our estimation algorithm isn't entirely correct, we take 5% off
23 // the limit for encoding and estimate inaccuracies.
24 //
25 // 10MB taken on 2015/09/24:
26 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits
27 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95)
28
29 // DefaultSizeThreshold prevents the root transaction from getting too close
30 // to the budget. If the code attempts to begin a transaction which would have
31 // less than this threshold for its budget, the transaction will immediately
32 // return ErrTransactionTooLarge.
33 const DefaultSizeThreshold = int64(10 * 1000)
34
35 // XGTransactionGroupLimit is the number of transaction groups to allow in an
36 // XG transaction.
37 //
38 // 25 taken on 2015/09/24:
39 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction
40 const XGTransactionGroupLimit = 25
41
42 type sizeTracker struct {
43 keyToSize map[string]int64
44 total int64
45 }
46
47 func (s *sizeTracker) set(key string, val int64) {
48 prev, existed := s.keyToSize[key]
49 if s.keyToSize == nil {
50 s.keyToSize = make(map[string]int64)
51 }
52 s.keyToSize[key] = val
53 s.total += val - prev
54 if !existed {
55 s.total += int64(len(key))
56 }
57 }
58
59 func (s *sizeTracker) get(key string) (int64, bool) {
60 size, has := s.keyToSize[key]
61 return size, has
62 }
63
64 func (s *sizeTracker) has(key string) bool {
65 _, has := s.keyToSize[key]
66 return has
67 }
68
69 func (s *sizeTracker) dup() *sizeTracker {
70 if len(s.keyToSize) == 0 {
71 return &sizeTracker{}
72 }
73 k2s := make(map[string]int64, len(s.keyToSize))
74 for k, v := range s.keyToSize {
75 k2s[k] = v
76 }
77 return &sizeTracker{k2s, s.total}
78 }
79
80 type txnBufState struct {
81 sync.Mutex
82
83 // encoded key -> size of entity. A size of 0 means that the entity is
84 // deleted.
85 entState *sizeTracker
86 memDS datastore.RawInterface
87
88 roots stringset.Set
89 rootLimit int
90
91 aid string
92 ns string
93 parentDS datastore.RawInterface
94 parentState *txnBufState
95
96 // sizeBudget is the number of bytes that this transaction has to operat e
97 // within. It's only used when attempting to apply() the transaction, an d
98 // it is the threshold for the delta of applying this transaction to the
99 // parent transaction. Note that a buffered transaction could actually h ave
100 // a negative delta if the parent transaction had many large entities wh ich
101 // the inner transaction deleted.
102 sizeBudget int64
103
104 siblingLock sync.Mutex
105 }
106
107 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error {
108 inf := info.Get(ctx)
109 ns := inf.GetNamespace()
110
111 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState)
112 roots := stringset.New(0)
113 rootLimit := 1
114 if opts != nil && opts.XG {
115 rootLimit = XGTransactionGroupLimit
116 }
117 sizeBudget := DefaultSizeBudget
118 if parentState != nil {
119 parentState.siblingLock.Lock()
120 defer parentState.siblingLock.Unlock()
121
122 // TODO(riannucci): this is a bit wonky since it means that a ch ild
123 // transaction declaring XG=true will only get to modify 25 grou ps IF
124 // they're same groups affected by the parent transactions. So i nstead of
125 // respecting opts.XG for inner transactions, we just dup everyt hing from
126 // the parent transaction.
127 roots = parentState.roots.Dup()
128 rootLimit = parentState.rootLimit
129
130 sizeBudget = parentState.sizeBudget - parentState.entState.total
131 if sizeBudget < DefaultSizeThreshold {
132 return ErrTransactionTooLarge
133 }
134 }
135
136 memDS := memory.NewDatastore(ns)
137 {
Vadim Sh. 2015/09/28 18:52:56 Remove...
iannucci 2015/09/29 03:21:38 k
138 t := memDS.Testable()
139 t.Consistent(true)
140 t.AutoIndex(true)
141 t.DisableSpecialEntities(true)
142 }
143 state := &txnBufState{
144 entState: &sizeTracker{},
145 memDS: memDS,
146 roots: roots,
147 rootLimit: rootLimit,
148 ns: ns,
149 aid: inf.AppID(),
150 parentDS: datastore.Get(ctx).Raw(),
151 parentState: parentState,
152 sizeBudget: sizeBudget,
153 }
154 ctx = context.WithValue(ctx, dsTxnBufParent, state)
155 err := cb(ctx)
156 if err != nil {
157 return err
158 }
159 return state.apply()
160 }
161
162 type item struct {
163 key *datastore.Key
164 data datastore.PropertyMap
165 buffered bool
166
167 encKey string
168 cmpRow string
169 err error
170 }
171
172 func (i *item) getEncKey() string {
173 if i.encKey == "" {
174 i.encKey = string(serialize.ToBytes(i.key))
175 }
176 return i.encKey
177 }
178
179 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str ing {
180 if i.cmpRow == "" {
181 row, key := toComparableString(lower, upper, order, i.key, i.dat a)
182 i.cmpRow = string(row)
183 if i.encKey == "" {
184 i.encKey = string(key)
185 }
186 }
187 return i.cmpRow
188 }
189
190 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error {
191 curRootLen := t.roots.Len()
192 proposedRoots := stringset.New(1)
193 roots.Iter(func(root string) bool {
194 if !t.roots.Has(root) {
195 proposedRoots.Add(root)
196 }
197 return proposedRoots.Len()+curRootLen <= t.rootLimit
198 })
199 if proposedRoots.Len()+curRootLen > t.rootLimit {
200 return errors.New("operating on too many entity groups in nested transaction")
201 }
202 // only need to update the roots if they did something that required upd ating
203 if proposedRoots.Len() > 0 {
204 proposedRoots.Iter(func(root string) bool {
205 t.roots.Add(root)
206 return true
207 })
208 }
209 return nil
210 }
211
212 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) {
213 encKeys, roots := toEncoded(keys)
214 ret := make([]item, len(keys))
215
216 idxMap := []int(nil)
217 toGetKeys := []*datastore.Key(nil)
218
219 t.Lock()
220 defer t.Unlock()
221
222 if err := t.updateRootsLocked(roots); err != nil {
223 return nil, err
224 }
225
226 for i, key := range keys {
227 ret[i].key = key
228 ret[i].encKey = encKeys[i]
229 if size, ok := t.entState.get(ret[i].getEncKey()); ok {
230 ret[i].buffered = true
231 if size > 0 {
232 idxMap = append(idxMap, i)
233 toGetKeys = append(toGetKeys, key)
234 }
235 }
236 }
237
238 if len(toGetKeys) > 0 {
239 j := 0
240 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
241 idx := idxMap[j]
242 impossible(err)
Vadim Sh. 2015/09/28 18:52:56 nit: move it up one line
iannucci 2015/09/29 03:21:38 done
243 ret[idx].data = pm
244 j++
245 })
246 }
247
248 return ret, nil
249 }
250
251 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
252 encKeys, roots := toEncoded(keys)
253
254 t.Lock()
255 defer t.Unlock()
256
257 if err := t.updateRootsLocked(roots); err != nil {
258 return err
259 }
260
261 i := 0
262 err := t.memDS.DeleteMulti(keys, func(err error) {
263 impossible(err)
264 t.entState.set(encKeys[i], 0)
265 i++
266 })
267 impossible(err)
268 return nil
269 }
270
271 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error {
272 encKeys, roots := toEncoded(keys)
273
274 t.Lock()
275 defer t.Unlock()
276
277 if err := t.updateRootsLocked(roots); err != nil {
278 return err
279 }
280
281 i := 0
282 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
283 impossible(err)
284 t.entState.set(encKeys[i], vals[i].EstimateSize())
285 i++
286 })
287 impossible(err)
288 return nil
289 }
290
291 func (t *txnBufState) apply() error {
292 t.Lock()
293 defer t.Unlock()
294
295 // if parentState is nil... just try to commit this anyway. The estimate s
296 // we're using here are just educated guesses. If it fits for real, then
297 // hooray. If not, then the underlying datastore will error.
298 if t.parentState != nil {
299 proposedState := t.parentState.entState.dup()
300 for k, v := range t.entState.keyToSize {
301 proposedState.set(k, v)
302 }
303 if proposedState.total > t.sizeBudget {
304 return ErrTransactionTooLarge
305 }
306 }
307
308 toPutKeys := []*datastore.Key(nil)
309 toPut := []datastore.PropertyMap(nil)
310 toDel := []*datastore.Key(nil)
311
312 fq, err := datastore.NewQuery("").Finalize()
Vadim Sh. 2015/09/28 18:52:56 wat?
iannucci 2015/09/29 03:21:38 added comment
313 impossible(err)
314
315 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool {
316 toPutKeys = append(toPutKeys, key)
317 toPut = append(toPut, data)
318 return true
319 })
320 memoryCorruption(err)
321
322 for keyStr, size := range t.entState.keyToSize {
323 if size == 0 {
324 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
325 memoryCorruption(err)
326 toDel = append(toDel, k)
327 }
328 }
329
330 wg := sync.WaitGroup{}
331
332 pErr := error(nil)
333 dErr := error(nil)
334
335 ds := t.parentDS
336 if toPut != nil {
337 wg.Add(1)
338 go func() {
339 defer wg.Done()
340 mErr := errors.NewLazyMultiError(len(toPut))
341 i := 0
342 pErr = ds.PutMulti(toPutKeys, toPut, func(_ *datastore.K ey, err error) {
343 i++
344 mErr.Assign(i, err)
345 })
346 pErr = mErr.Get()
347 }()
348 }
349
350 if toDel != nil {
351 wg.Add(1)
352 go func() {
353 defer wg.Done()
354 mErr := errors.NewLazyMultiError(len(toDel))
355 i := 0
356 dErr = ds.DeleteMulti(toDel, func(err error) {
357 mErr.Assign(i, err)
358 i++
359 })
360 dErr = mErr.Get()
361 }()
362 }
363 wg.Wait()
364
365 if pErr != nil {
366 return pErr
367 }
368 return dErr
369 }
370
371 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
372 roots = stringset.New(len(keys))
373 full = make([]string, len(keys))
374 for i, k := range keys {
375 roots.Add(string(serialize.ToBytes(k.Root())))
376 full[i] = string(serialize.ToBytes(k))
377 }
378 return
379 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698