OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package txnBuf | |
6 | |
7 import ( | |
8 "bytes" | |
9 "sync" | |
10 | |
11 "github.com/luci/gae/impl/memory" | |
12 "github.com/luci/gae/service/datastore" | |
13 "github.com/luci/gae/service/datastore/serialize" | |
14 "github.com/luci/gae/service/info" | |
15 "github.com/luci/luci-go/common/errors" | |
16 "github.com/luci/luci-go/common/stringset" | |
17 "golang.org/x/net/context" | |
18 ) | |
19 | |
20 // DefaultSizeBudget is the size budget for the root transaction. | |
21 // | |
22 // Because our estimation algorithm isn't entirely correct, we take 5% off | |
23 // the limit for encoding and estimate inaccuracies. | |
24 // | |
25 // 10MB taken on 2015/09/24: | |
26 // https://cloud.google.com/appengine/docs/go/datastore/#Go_Quotas_and_limits | |
27 const DefaultSizeBudget = int64((10 * 1000 * 1000) * 0.95) | |
28 | |
29 // DefaultSizeThreshold prevents the root transaction from getting too close | |
30 // to the budget. If the code attempts to begin a transaction which would have | |
31 // less than this threshold for its budget, the transaction will immediately | |
32 // return ErrTransactionTooLarge. | |
33 const DefaultSizeThreshold = int64(10 * 1000) | |
34 | |
35 // XGTransactionGroupLimit is the number of transaction groups to allow in an | |
36 // XG transaction. | |
37 // | |
38 // 25 taken on 2015/09/24: | |
39 // https://cloud.google.com/appengine/docs/go/datastore/transactions#Go_What_can _be_done_in_a_transaction | |
40 const XGTransactionGroupLimit = 25 | |
41 | |
42 type sizeTracker struct { | |
43 keyToSize map[string]int64 | |
44 total int64 | |
45 } | |
46 | |
47 func (s *sizeTracker) set(key string, val int64) { | |
48 prev, existed := s.keyToSize[key] | |
49 if s.keyToSize == nil { | |
50 s.keyToSize = make(map[string]int64) | |
51 } | |
52 s.keyToSize[key] = val | |
53 s.total += val - prev | |
54 if !existed { | |
55 s.total += int64(len(key)) | |
56 } | |
57 } | |
58 | |
59 func (s *sizeTracker) get(key string) (int64, bool) { | |
60 size, has := s.keyToSize[key] | |
61 return size, has | |
62 } | |
63 | |
64 func (s *sizeTracker) has(key string) bool { | |
65 _, has := s.keyToSize[key] | |
66 return has | |
67 } | |
68 | |
69 func (s *sizeTracker) dup() *sizeTracker { | |
70 if len(s.keyToSize) == 0 { | |
71 return &sizeTracker{} | |
72 } | |
73 k2s := make(map[string]int64, len(s.keyToSize)) | |
74 for k, v := range s.keyToSize { | |
75 k2s[k] = v | |
76 } | |
77 return &sizeTracker{k2s, s.total} | |
78 } | |
79 | |
80 type txnBufState struct { | |
81 sync.Mutex | |
82 | |
83 // encoded key -> size of entity. A size of 0 means that the entity is | |
84 // deleted. | |
85 entState *sizeTracker | |
86 memDS datastore.RawInterface | |
87 | |
88 roots stringset.Set | |
89 rootLimit int | |
90 | |
91 aid string | |
92 ns string | |
93 parentDS datastore.RawInterface | |
94 parentState *txnBufState | |
95 | |
96 // sizeBudget is the number of bytes that this transaction has to operat e | |
97 // within. It's only used when attempting to apply() the transaction, an d | |
98 // it is the threshold for the delta of applying this transaction to the | |
99 // parent transaction. Note that a buffered transaction could actually h ave | |
100 // a negative delta if the parent transaction had many large entities wh ich | |
101 // the inner transaction deleted. | |
102 sizeBudget int64 | |
103 | |
104 siblingLock sync.Mutex | |
105 } | |
106 | |
107 func withTxnBuf(ctx context.Context, cb func(context.Context) error, opts *datas tore.TransactionOptions) error { | |
108 inf := info.Get(ctx) | |
109 ns := inf.GetNamespace() | |
110 | |
111 parentState, _ := ctx.Value(dsTxnBufParent).(*txnBufState) | |
112 roots := stringset.New(0) | |
113 rootLimit := 1 | |
114 if opts != nil && opts.XG { | |
115 rootLimit = XGTransactionGroupLimit | |
116 } | |
117 sizeBudget := DefaultSizeBudget | |
118 if parentState != nil { | |
119 parentState.siblingLock.Lock() | |
120 defer parentState.siblingLock.Unlock() | |
121 | |
122 // TODO(riannucci): this is a bit wonky since it means that a ch ild | |
123 // transaction declaring XG=true will only get to modify 25 grou ps IF | |
124 // they're same groups affected by the parent transactions. So i nstead of | |
125 // respecting opts.XG for inner transactions, we just dup everyt hing from | |
126 // the parent transaction. | |
127 roots = parentState.roots.Dup() | |
128 rootLimit = parentState.rootLimit | |
129 | |
130 sizeBudget = parentState.sizeBudget - parentState.entState.total | |
131 if sizeBudget < DefaultSizeThreshold { | |
132 return ErrTransactionTooLarge | |
133 } | |
134 } | |
135 | |
136 memDS := memory.NewDatastore(inf.FullyQualifiedAppID(), ns) | |
137 t := memDS.Testable() | |
138 t.Consistent(true) | |
139 t.AutoIndex(true) | |
140 t.DisableSpecialEntities(true) | |
141 | |
142 state := &txnBufState{ | |
143 entState: &sizeTracker{}, | |
144 memDS: memDS, | |
145 roots: roots, | |
146 rootLimit: rootLimit, | |
147 ns: ns, | |
148 aid: inf.AppID(), | |
149 parentDS: datastore.Get(ctx).Raw(), | |
150 parentState: parentState, | |
151 sizeBudget: sizeBudget, | |
152 } | |
153 ctx = context.WithValue(ctx, dsTxnBufParent, state) | |
154 err := cb(ctx) | |
155 if err != nil { | |
156 return err | |
157 } | |
158 return state.apply() | |
159 } | |
160 | |
161 type item struct { | |
162 key *datastore.Key | |
163 data datastore.PropertyMap | |
164 buffered bool | |
165 | |
166 encKey string | |
167 cmpRow string | |
168 err error | |
169 } | |
170 | |
171 func (i *item) getEncKey() string { | |
172 if i.encKey == "" { | |
173 i.encKey = string(serialize.ToBytes(i.key)) | |
174 } | |
175 return i.encKey | |
176 } | |
177 | |
178 func (i *item) getCmpRow(lower, upper []byte, order []datastore.IndexColumn) str ing { | |
179 if i.cmpRow == "" { | |
180 row, key := toComparableString(lower, upper, order, i.key, i.dat a) | |
181 i.cmpRow = string(row) | |
182 if i.encKey == "" { | |
183 i.encKey = string(key) | |
184 } | |
185 } | |
186 return i.cmpRow | |
187 } | |
188 | |
189 func (t *txnBufState) updateRootsLocked(roots stringset.Set) error { | |
190 curRootLen := t.roots.Len() | |
191 proposedRoots := stringset.New(1) | |
192 roots.Iter(func(root string) bool { | |
193 if !t.roots.Has(root) { | |
194 proposedRoots.Add(root) | |
195 } | |
196 return proposedRoots.Len()+curRootLen <= t.rootLimit | |
197 }) | |
198 if proposedRoots.Len()+curRootLen > t.rootLimit { | |
199 return errors.New("operating on too many entity groups in nested transaction") | |
200 } | |
201 // only need to update the roots if they did something that required upd ating | |
202 if proposedRoots.Len() > 0 { | |
203 proposedRoots.Iter(func(root string) bool { | |
204 t.roots.Add(root) | |
205 return true | |
206 }) | |
207 } | |
208 return nil | |
209 } | |
210 | |
211 func (t *txnBufState) getMulti(keys []*datastore.Key) ([]item, error) { | |
212 encKeys, roots := toEncoded(keys) | |
213 ret := make([]item, len(keys)) | |
214 | |
215 idxMap := []int(nil) | |
216 toGetKeys := []*datastore.Key(nil) | |
217 | |
218 t.Lock() | |
219 defer t.Unlock() | |
220 | |
221 if err := t.updateRootsLocked(roots); err != nil { | |
222 return nil, err | |
223 } | |
224 | |
225 for i, key := range keys { | |
226 ret[i].key = key | |
227 ret[i].encKey = encKeys[i] | |
228 if size, ok := t.entState.get(ret[i].getEncKey()); ok { | |
229 ret[i].buffered = true | |
230 if size > 0 { | |
231 idxMap = append(idxMap, i) | |
232 toGetKeys = append(toGetKeys, key) | |
233 } | |
234 } | |
235 } | |
236 | |
237 if len(toGetKeys) > 0 { | |
238 j := 0 | |
239 t.memDS.GetMulti(toGetKeys, nil, func(pm datastore.PropertyMap, err error) { | |
240 impossible(err) | |
241 ret[idxMap[j]].data = pm | |
242 j++ | |
243 }) | |
244 } | |
245 | |
246 return ret, nil | |
247 } | |
248 | |
249 func (t *txnBufState) deleteMulti(keys []*datastore.Key) error { | |
250 encKeys, roots := toEncoded(keys) | |
251 | |
252 t.Lock() | |
253 defer t.Unlock() | |
254 | |
255 if err := t.updateRootsLocked(roots); err != nil { | |
256 return err | |
257 } | |
258 | |
259 i := 0 | |
260 err := t.memDS.DeleteMulti(keys, func(err error) { | |
261 impossible(err) | |
262 t.entState.set(encKeys[i], 0) | |
263 i++ | |
264 }) | |
265 impossible(err) | |
266 return nil | |
267 } | |
268 | |
269 func (t *txnBufState) putMulti(keys []*datastore.Key, vals []datastore.PropertyM ap) error { | |
270 encKeys, roots := toEncoded(keys) | |
271 | |
272 t.Lock() | |
273 defer t.Unlock() | |
274 | |
275 if err := t.updateRootsLocked(roots); err != nil { | |
276 return err | |
277 } | |
278 | |
279 i := 0 | |
280 err := t.memDS.PutMulti(keys, vals, func(k *datastore.Key, err error) { | |
281 impossible(err) | |
282 t.entState.set(encKeys[i], vals[i].EstimateSize()) | |
283 i++ | |
284 }) | |
285 impossible(err) | |
286 return nil | |
287 } | |
288 | |
289 func (t *txnBufState) apply() error { | |
290 t.Lock() | |
291 defer t.Unlock() | |
292 | |
293 // if parentState is nil... just try to commit this anyway. The estimate s | |
294 // we're using here are just educated guesses. If it fits for real, then | |
295 // hooray. If not, then the underlying datastore will error. | |
296 if t.parentState != nil { | |
297 proposedState := t.parentState.entState.dup() | |
298 for k, v := range t.entState.keyToSize { | |
299 proposedState.set(k, v) | |
300 } | |
301 if proposedState.total > t.sizeBudget { | |
302 return ErrTransactionTooLarge | |
303 } | |
304 } | |
305 | |
306 toPutKeys := []*datastore.Key(nil) | |
307 toPut := []datastore.PropertyMap(nil) | |
308 toDel := []*datastore.Key(nil) | |
309 | |
310 // need to pull all items out of the in-memory datastore. Fortunately we have | |
311 // kindless queries, and we disabled all the special entities, so just | |
312 // run a kindless query without any filters and it will return all data | |
313 // currently in memDS :). | |
314 fq, err := datastore.NewQuery("").Finalize() | |
315 impossible(err) | |
316 | |
317 err = t.memDS.Run(fq, func(key *datastore.Key, data datastore.PropertyMa p, _ datastore.CursorCB) bool { | |
318 toPutKeys = append(toPutKeys, key) | |
319 toPut = append(toPut, data) | |
320 return true | |
321 }) | |
322 memoryCorruption(err) | |
323 | |
324 for keyStr, size := range t.entState.keyToSize { | |
325 if size == 0 { | |
326 k, err := serialize.ReadKey(bytes.NewBufferString(keyStr ), serialize.WithoutContext, t.aid, t.ns) | |
327 memoryCorruption(err) | |
328 toDel = append(toDel, k) | |
329 } | |
330 } | |
331 | |
332 wg := sync.WaitGroup{} | |
333 | |
334 pErr := error(nil) | |
335 dErr := error(nil) | |
336 | |
337 ds := t.parentDS | |
iannucci
2015/09/29 03:32:22
parentDS is either the real datastore transaction
| |
338 if toPut != nil { | |
339 wg.Add(1) | |
340 go func() { | |
341 defer wg.Done() | |
342 mErr := errors.NewLazyMultiError(len(toPut)) | |
343 i := 0 | |
344 pErr = ds.PutMulti(toPutKeys, toPut, func(_ *datastore.K ey, err error) { | |
345 i++ | |
346 mErr.Assign(i, err) | |
347 }) | |
348 pErr = mErr.Get() | |
349 }() | |
350 } | |
351 | |
352 if toDel != nil { | |
353 wg.Add(1) | |
354 go func() { | |
355 defer wg.Done() | |
356 mErr := errors.NewLazyMultiError(len(toDel)) | |
357 i := 0 | |
358 dErr = ds.DeleteMulti(toDel, func(err error) { | |
iannucci
2015/09/29 03:32:22
This is where the actual deletion occurs in the pa
| |
359 mErr.Assign(i, err) | |
360 i++ | |
361 }) | |
362 dErr = mErr.Get() | |
363 }() | |
364 } | |
365 wg.Wait() | |
366 | |
367 if pErr != nil { | |
368 return pErr | |
369 } | |
370 return dErr | |
371 } | |
372 | |
373 func toEncoded(keys []*datastore.Key) (full []string, roots stringset.Set) { | |
374 roots = stringset.New(len(keys)) | |
375 full = make([]string, len(keys)) | |
376 for i, k := range keys { | |
377 roots.Add(string(serialize.ToBytes(k.Root()))) | |
378 full[i] = string(serialize.ToBytes(k)) | |
379 } | |
380 return | |
381 } | |
OLD | NEW |