Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(196)

Side by Side Diff: filter/txnBuf/state.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: change pcg timeout and codereview server to crcr.as.com Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/txnBuf/query_merger.go ('k') | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sync"
10
11 "github.com/luci/gae/impl/memory"
12 "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/common/stringset"
18 "golang.org/x/net/context"
19 )
20
21 // DefaultSizeBudget is the size budget for the root transaction.
22 //
23 // Because our estimation algorithm isn't entirely correct, we take 5% off
24 // the limit for encoding and estimate inaccuracies.
25 //
26 // 10MB taken on 2015/09/24:
27 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits
28 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95)
29
30 // DefaultSizeThreshold prevents the root transaction from getting too close
31 // to the budget. If the code attempts to begin a transaction which would have
32 // less than this threshold for its budget, the transaction will immediately
33 // return ErrTransactionTooLarge.
34 const DefaultSizeThreshold = int64(10 * 1000)
35
36 // XGTransactionGroupLimit is the number of transaction groups to allow in an
37 // XG transaction.
38 //
39 // 25 taken on 2015/09/24:
40 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction
41 const XGTransactionGroupLimit = 25
42
43 // sizeTracker tracks the size of a buffered transaction. The rules are simple:
44 // * deletes count for the size of their key, but 0 data
45 // * puts count for the size of their key plus the 'EstimateSize' for their
46 // data.
47 type sizeTracker struct {
48 keyToSize map[string]int64
49 total int64
50 }
51
52 // set states that the given key is being set to an entity with the size `val`.
53 // A val of 0 means "I'm deleting this key"
54 func (s *sizeTracker) set(key string, val int64) {
55 if s.keyToSize == nil {
56 s.keyToSize = make(map[string]int64)
57 }
58 prev, existed := s.keyToSize[key]
59 s.keyToSize[key] = val
60 s.total += val - prev
61 if !existed {
62 s.total += int64(len(key))
63 }
64 }
65
66 // get returns the currently tracked size for key, and wheter or not the key
67 // has any tracked value.
68 func (s *sizeTracker) get(key string) (int64, bool) {
69 size, has := s.keyToSize[key]
70 return size, has
71 }
72
73 // has returns true iff key has a tracked value.
74 func (s *sizeTracker) has(key string) bool {
75 _, has := s.keyToSize[key]
76 return has
77 }
78
79 // dup returns a duplicate sizeTracker.
80 func (s *sizeTracker) dup() *sizeTracker {
81 if len(s.keyToSize) == 0 {
82 return &sizeTracker{}
83 }
84 k2s := make(map[string]int64, len(s.keyToSize))
85 for k, v := range s.keyToSize {
86 k2s[k] = v
87 }
88 return &sizeTracker{k2s, s.total}
89 }
90
91 type txnBufState struct {
92 sync.Mutex
93
94 // encoded key -> size of entity. A size of 0 means that the entity is
95 // deleted.
96 entState *sizeTracker
97 memDS datastore.RawInterface
98
99 roots stringset.Set
100 rootLimit int
101
102 aid string
103 ns string
104 parentDS datastore.RawInterface
105 parentState *txnBufState
106
107 // sizeBudget is the number of bytes that this transaction has to operat e
108 // within. It's only used when attempting to apply() the transaction, an d
109 // it is the threshold for the delta of applying this transaction to the
110 // parent transaction. Note that a buffered transaction could actually h ave
111 // a negative delta if the parent transaction had many large entities wh ich
112 // the inner transaction deleted.
113 sizeBudget int64
114
115 // siblingLock is to prevent two nested transactions from running at the same
116 // time.
117 //
118 // Example:
119 // RunInTransaction() { // root
120 // RunInTransaction() // A
121 // RunInTransaction() // B
122 // }
123 //
124 // This will prevent A and B from running simulatneously.
125 siblingLock sync.Mutex
126 }
127
128 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error {
129 inf := info.Get(ctx)
130 ns := inf.GetNamespace()
131
132 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState)
133 roots := stringset.New(0)
134 rootLimit := 1
135 if opts != nil && opts.XG {
136 rootLimit = XGTransactionGroupLimit
137 }
138 sizeBudget := DefaultSizeBudget
139 if parentState != nil {
140 parentState.siblingLock.Lock()
141 defer parentState.siblingLock.Unlock()
142
143 // TODO(riannucci): this is a bit wonky since it means that a ch ild
144 // transaction declaring XG=true will only get to modify 25 grou ps IF
145 // they're same groups affected by the parent transactions. So i nstead of
146 // respecting opts.XG for inner transactions, we just dup everyt hing from
147 // the parent transaction.
148 roots = parentState.roots.Dup()
149 rootLimit = parentState.rootLimit
150
151 sizeBudget = parentState.sizeBudget - parentState.entState.total
152 if sizeBudget < DefaultSizeThreshold {
153 return ErrTransactionTooLarge
154 }
155 }
156
157 memDS, err := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
158 if err != nil {
159 return err
160 }
161
162 state := &txnBufState{
163 entState: &sizeTracker{},
164 memDS: memDS.Raw(),
165 roots: roots,
166 rootLimit: rootLimit,
167 ns: ns,
168 aid: inf.AppID(),
169 parentDS: datastore.Get(ctx).Raw(),
170 parentState: parentState,
171 sizeBudget: sizeBudget,
172 }
173 if err = cb(context.WithValue(ctx, dsTxnBufParent, state)); err != nil {
174 return err
175 }
176 return state.apply()
177 }
178
179 // item is a temporary object for representing key/entity pairs and their cache
180 // state (e.g. if they exist in the in-memory datastore buffer or not).
181 // Additionally item memoizes some common comparison strings. item objects
182 // must never be persisted outside of a single function/query context.
183 type item struct {
184 key *datastore.Key
185 data datastore.PropertyMap
186 buffered bool
187
188 encKey string
189
190 // cmpRow is used to hold the toComparableString value for this item dur ing
191 // a query.
192 cmpRow string
193
194 // err is a bit of a hack for passing back synchronized errors from
195 // queryToIter.
196 err error
197 }
198
199 func (i *item) getEncKey() string {
200 if i.encKey == "" {
201 i.encKey = string(serialize.ToBytes(i.key))
202 }
203 return i.encKey
204 }
205
206 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str ing {
207 if i.cmpRow == "" {
208 row, key := toComparableString(lower, upper, order, i.key, i.dat a)
209 i.cmpRow = string(row)
210 if i.encKey == "" {
211 i.encKey = string(key)
212 }
213 }
214 return i.cmpRow
215 }
216
217 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error {
218 curRootLen := t.roots.Len()
219 proposedRoots := stringset.New(1)
220 roots.Iter(func(root string) bool {
221 if !t.roots.Has(root) {
222 proposedRoots.Add(root)
223 }
224 return proposedRoots.Len()+curRootLen <= t.rootLimit
225 })
226 if proposedRoots.Len()+curRootLen > t.rootLimit {
227 return ErrTooManyRoots
228 }
229 // only need to update the roots if they did something that required upd ating
230 if proposedRoots.Len() > 0 {
231 proposedRoots.Iter(func(root string) bool {
232 t.roots.Add(root)
233 return true
234 })
235 }
236 return nil
237 }
238
239 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) {
240 encKeys, roots := toEncoded(keys)
241 ret := make([]item, len(keys))
242
243 idxMap := []int(nil)
244 toGetKeys := []*datastore.Key(nil)
245
246 t.Lock()
247 defer t.Unlock()
248
249 if err := t.updateRootsLocked(roots); err != nil {
250 return nil, err
251 }
252
253 for i, key := range keys {
254 ret[i].key = key
255 ret[i].encKey = encKeys[i]
256 if size, ok := t.entState.get(ret[i].getEncKey()); ok {
257 ret[i].buffered = true
258 if size > 0 {
259 idxMap = append(idxMap, i)
260 toGetKeys = append(toGetKeys, key)
261 }
262 }
263 }
264
265 if len(toGetKeys) > 0 {
266 j := 0
267 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
268 impossible(err)
269 ret[idxMap[j]].data = pm
270 j++
271 })
272 }
273
274 return ret, nil
275 }
276
277 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
278 encKeys, roots := toEncoded(keys)
279
280 t.Lock()
281 defer t.Unlock()
282
283 if err := t.updateRootsLocked(roots); err != nil {
284 return err
285 }
286
287 i := 0
288 err := t.memDS.DeleteMulti(keys, func(err error) {
289 impossible(err)
290 t.entState.set(encKeys[i], 0)
291 i++
292 })
293 impossible(err)
294 return nil
295 }
296
297 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error {
298 encKeys, roots := toEncoded(keys)
299
300 t.Lock()
301 defer t.Unlock()
302
303 if err := t.updateRootsLocked(roots); err != nil {
304 return err
305 }
306
307 i := 0
308 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
309 impossible(err)
310 t.entState.set(encKeys[i], vals[i].EstimateSize())
311 i++
312 })
313 impossible(err)
314 return nil
315 }
316
317 // apply actually takes the buffered transaction and applies it to the parent
318 // transaction. It will only return an error if the underlying 'real' datastore
319 // returns an error on PutMulti or DeleteMulti.
320 func (t *txnBufState) apply() error {
321 t.Lock()
322 defer t.Unlock()
323
324 // if parentState is nil... just try to commit this anyway. The estimate s
325 // we're using here are just educated guesses. If it fits for real, then
326 // hooray. If not, then the underlying datastore will error.
327 if t.parentState != nil {
328 t.parentState.Lock()
329 proposedState := t.parentState.entState.dup()
330 t.parentState.Unlock()
331 for k, v := range t.entState.keyToSize {
332 proposedState.set(k, v)
333 }
334 if proposedState.total > t.sizeBudget {
335 return ErrTransactionTooLarge
336 }
337 }
338
339 toPutKeys := []*datastore.Key(nil)
340 toPut := []datastore.PropertyMap(nil)
341 toDel := []*datastore.Key(nil)
342
343 // need to pull all items out of the in-memory datastore. Fortunately we have
344 // kindless queries, and we disabled all the special entities, so just
345 // run a kindless query without any filters and it will return all data
346 // currently in memDS :).
347 fq, err := datastore.NewQuery("").Finalize()
348 impossible(err)
349
350 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool {
351 toPutKeys = append(toPutKeys, key)
352 toPut = append(toPut, data)
353 return true
354 })
355 memoryCorruption(err)
356
357 for keyStr, size := range t.entState.keyToSize {
358 if size == 0 {
359 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
360 memoryCorruption(err)
361 toDel = append(toDel, k)
362 }
363 }
364
365 ds := t.parentDS
366
367 return parallel.FanOutIn(func(ch chan<- func() error) {
368 if len(toPut) > 0 {
369 ch <- func() error {
370 mErr := errors.NewLazyMultiError(len(toPut))
371 i := 0
372 err := ds.PutMulti(toPutKeys, toPut, func(_ *dat astore.Key, err error) {
373 mErr.Assign(i, err)
374 i++
375 })
376 if err == nil {
377 err = mErr.Get()
378 }
379 return err
380 }
381 }
382 if len(toDel) > 0 {
383 ch <- func() error {
384 mErr := errors.NewLazyMultiError(len(toDel))
385 i := 0
386 err := ds.DeleteMulti(toDel, func(err error) {
387 mErr.Assign(i, err)
388 i++
389 })
390 if err == nil {
391 err = mErr.Get()
392 }
393 return err
394 }
395 }
396 })
397 }
398
399 // toEncoded returns a list of all of the serialized versions of these keys,
400 // plus a stringset of all the encoded root keys that `keys` represents.
401 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
402 roots = stringset.New(len(keys))
403 full = make([]string, len(keys))
404 for i, k := range keys {
405 roots.Add(string(serialize.ToBytes(k.Root())))
406 full[i] = string(serialize.ToBytes(k))
407 }
408 return
409 }
OLDNEW
« no previous file with comments | « filter/txnBuf/query_merger.go ('k') | filter/txnBuf/txnbuf_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698