Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(381)

Side by Side Diff: filter/txnBuf/state.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: fix comments Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sync"
10
11 "github.com/luci/gae/impl/memory"
12 "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gae/service/info"
15 "github.com/luci/luci-go/common/errors"
16 "github.com/luci/luci-go/common/stringset"
17 "golang.org/x/net/context"
18 )
19
20 // DefaultSizeBudget is the size budget for the root transaction.
21 //
22 // Because our estimation algorithm isn't entirely correct, we take 5% off
23 // the limit for encoding and estimate inaccuracies.
24 //
25 // 10MB taken on 2015/09/24:
26 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits
27 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95)
28
29 // DefaultSizeThreshold prevents the root transaction from getting too close
30 // to the budget. If the code attempts to begin a transaction which would have
31 // less than this threshold for its budget, the transaction will immediately
32 // return ErrTransactionTooLarge.
33 const DefaultSizeThreshold = int64(10 * 1000)
34
35 // XGTransactionGroupLimit is the number of transaction groups to allow in an
36 // XG transaction.
37 //
38 // 25 taken on 2015/09/24:
39 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction
40 const XGTransactionGroupLimit = 25
41
42 type sizeTracker struct {
43 keyToSize map[string]int64
44 total int64
45 }
46
47 func (s *sizeTracker) set(key string, val int64) {
48 prev, existed := s.keyToSize[key]
49 if s.keyToSize == nil {
50 s.keyToSize = make(map[string]int64)
51 }
52 s.keyToSize[key] = val
53 s.total += val - prev
54 if !existed {
55 s.total += int64(len(key))
56 }
57 }
58
59 func (s *sizeTracker) get(key string) (int64, bool) {
60 size, has := s.keyToSize[key]
61 return size, has
62 }
63
64 func (s *sizeTracker) has(key string) bool {
65 _, has := s.keyToSize[key]
66 return has
67 }
68
69 func (s *sizeTracker) dup() *sizeTracker {
70 if len(s.keyToSize) == 0 {
71 return &sizeTracker{}
72 }
73 k2s := make(map[string]int64, len(s.keyToSize))
74 for k, v := range s.keyToSize {
75 k2s[k] = v
76 }
77 return &sizeTracker{k2s, s.total}
78 }
79
80 type txnBufState struct {
81 sync.Mutex
82
83 // encoded key -> size of entity. A size of 0 means that the entity is
84 // deleted.
85 entState *sizeTracker
86 memDS datastore.RawInterface
87
88 roots stringset.Set
89 rootLimit int
90
91 aid string
92 ns string
93 parentDS datastore.RawInterface
94 parentState *txnBufState
95
96 // sizeBudget is the number of bytes that this transaction has to operat e
97 // within. It's only used when attempting to apply() the transaction, an d
98 // it is the threshold for the delta of applying this transaction to the
99 // parent transaction. Note that a buffered transaction could actually h ave
100 // a negative delta if the parent transaction had many large entities wh ich
101 // the inner transaction deleted.
102 sizeBudget int64
103
104 siblingLock sync.Mutex
105 }
106
107 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error {
108 inf := info.Get(ctx)
109 ns := inf.GetNamespace()
110
111 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState)
112 roots := stringset.New(0)
113 rootLimit := 1
114 if opts != nil && opts.XG {
115 rootLimit = XGTransactionGroupLimit
116 }
117 sizeBudget := DefaultSizeBudget
118 if parentState != nil {
119 parentState.siblingLock.Lock()
120 defer parentState.siblingLock.Unlock()
121
122 // TODO(riannucci): this is a bit wonky since it means that a ch ild
123 // transaction declaring XG=true will only get to modify 25 grou ps IF
124 // they're same groups affected by the parent transactions. So i nstead of
125 // respecting opts.XG for inner transactions, we just dup everyt hing from
126 // the parent transaction.
127 roots = parentState.roots.Dup()
128 rootLimit = parentState.rootLimit
129
130 sizeBudget = parentState.sizeBudget - parentState.entState.total
131 if sizeBudget < DefaultSizeThreshold {
132 return ErrTransactionTooLarge
133 }
134 }
135
136 memDS := memory.NewDatastore(inf.FullyQualifiedAppID(), ns)
137 t := memDS.Testable()
138 t.Consistent(true)
139 t.AutoIndex(true)
140 t.DisableSpecialEntities(true)
141
142 state := &txnBufState{
143 entState: &sizeTracker{},
144 memDS: memDS,
145 roots: roots,
146 rootLimit: rootLimit,
147 ns: ns,
148 aid: inf.AppID(),
149 parentDS: datastore.Get(ctx).Raw(),
150 parentState: parentState,
151 sizeBudget: sizeBudget,
152 }
153 ctx = context.WithValue(ctx, dsTxnBufParent, state)
154 err := cb(ctx)
155 if err != nil {
156 return err
157 }
158 return state.apply()
159 }
160
161 type item struct {
162 key *datastore.Key
163 data datastore.PropertyMap
164 buffered bool
165
166 encKey string
167 cmpRow string
168 err error
169 }
170
171 func (i *item) getEncKey() string {
172 if i.encKey == "" {
173 i.encKey = string(serialize.ToBytes(i.key))
174 }
175 return i.encKey
176 }
177
178 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str ing {
179 if i.cmpRow == "" {
180 row, key := toComparableString(lower, upper, order, i.key, i.dat a)
181 i.cmpRow = string(row)
182 if i.encKey == "" {
183 i.encKey = string(key)
184 }
185 }
186 return i.cmpRow
187 }
188
189 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error {
190 curRootLen := t.roots.Len()
191 proposedRoots := stringset.New(1)
192 roots.Iter(func(root string) bool {
193 if !t.roots.Has(root) {
194 proposedRoots.Add(root)
195 }
196 return proposedRoots.Len()+curRootLen <= t.rootLimit
197 })
198 if proposedRoots.Len()+curRootLen > t.rootLimit {
199 return errors.New("operating on too many entity groups in nested transaction")
200 }
201 // only need to update the roots if they did something that required upd ating
202 if proposedRoots.Len() > 0 {
203 proposedRoots.Iter(func(root string) bool {
204 t.roots.Add(root)
205 return true
206 })
207 }
208 return nil
209 }
210
211 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) {
212 encKeys, roots := toEncoded(keys)
213 ret := make([]item, len(keys))
214
215 idxMap := []int(nil)
216 toGetKeys := []*datastore.Key(nil)
217
218 t.Lock()
219 defer t.Unlock()
220
221 if err := t.updateRootsLocked(roots); err != nil {
222 return nil, err
223 }
224
225 for i, key := range keys {
226 ret[i].key = key
227 ret[i].encKey = encKeys[i]
228 if size, ok := t.entState.get(ret[i].getEncKey()); ok {
229 ret[i].buffered = true
230 if size > 0 {
231 idxMap = append(idxMap, i)
232 toGetKeys = append(toGetKeys, key)
233 }
234 }
235 }
236
237 if len(toGetKeys) > 0 {
238 j := 0
239 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) {
240 impossible(err)
241 ret[idxMap[j]].data = pm
242 j++
243 })
244 }
245
246 return ret, nil
247 }
248
249 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error {
250 encKeys, roots := toEncoded(keys)
251
252 t.Lock()
253 defer t.Unlock()
254
255 if err := t.updateRootsLocked(roots); err != nil {
256 return err
257 }
258
259 i := 0
260 err := t.memDS.DeleteMulti(keys, func(err error) {
261 impossible(err)
262 t.entState.set(encKeys[i], 0)
263 i++
264 })
265 impossible(err)
266 return nil
267 }
268
269 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error {
270 encKeys, roots := toEncoded(keys)
271
272 t.Lock()
273 defer t.Unlock()
274
275 if err := t.updateRootsLocked(roots); err != nil {
276 return err
277 }
278
279 i := 0
280 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) {
281 impossible(err)
282 t.entState.set(encKeys[i], vals[i].EstimateSize())
283 i++
284 })
285 impossible(err)
286 return nil
287 }
288
289 func (t *txnBufState) apply() error {
290 t.Lock()
291 defer t.Unlock()
292
293 // if parentState is nil... just try to commit this anyway. The estimate s
294 // we're using here are just educated guesses. If it fits for real, then
295 // hooray. If not, then the underlying datastore will error.
296 if t.parentState != nil {
297 proposedState := t.parentState.entState.dup()
298 for k, v := range t.entState.keyToSize {
299 proposedState.set(k, v)
300 }
301 if proposedState.total > t.sizeBudget {
302 return ErrTransactionTooLarge
303 }
304 }
305
306 toPutKeys := []*datastore.Key(nil)
307 toPut := []datastore.PropertyMap(nil)
308 toDel := []*datastore.Key(nil)
309
310 // need to pull all items out of the in-memory datastore. Fortunately we have
311 // kindless queries, and we disabled all the special entities, so just
312 // run a kindless query without any filters and it will return all data
313 // currently in memDS :).
314 fq, err := datastore.NewQuery("").Finalize()
315 impossible(err)
316
317 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool {
318 toPutKeys = append(toPutKeys, key)
319 toPut = append(toPut, data)
320 return true
321 })
322 memoryCorruption(err)
323
324 for keyStr, size := range t.entState.keyToSize {
325 if size == 0 {
326 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns)
327 memoryCorruption(err)
328 toDel = append(toDel, k)
329 }
330 }
331
332 wg := sync.WaitGroup{}
333
334 pErr := error(nil)
335 dErr := error(nil)
336
337 ds := t.parentDS
iannucci 2015/09/29 03:32:22 parentDS is either the real datastore transaction
338 if toPut != nil {
339 wg.Add(1)
340 go func() {
341 defer wg.Done()
342 mErr := errors.NewLazyMultiError(len(toPut))
343 i := 0
344 pErr = ds.PutMulti(toPutKeys, toPut, func(_ *datastore.K ey, err error) {
345 i++
346 mErr.Assign(i, err)
347 })
348 pErr = mErr.Get()
349 }()
350 }
351
352 if toDel != nil {
353 wg.Add(1)
354 go func() {
355 defer wg.Done()
356 mErr := errors.NewLazyMultiError(len(toDel))
357 i := 0
358 dErr = ds.DeleteMulti(toDel, func(err error) {
iannucci 2015/09/29 03:32:22 This is where the actual deletion occurs in the pa
359 mErr.Assign(i, err)
360 i++
361 })
362 dErr = mErr.Get()
363 }()
364 }
365 wg.Wait()
366
367 if pErr != nil {
368 return pErr
369 }
370 return dErr
371 }
372
373 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) {
374 roots = stringset.New(len(keys))
375 full = make([]string, len(keys))
376 for i, k := range keys {
377 roots.Add(string(serialize.ToBytes(k.Root())))
378 full[i] = string(serialize.ToBytes(k))
379 }
380 return
381 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698