Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/datastore_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: fix capitalization Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "infra/gae/libs/wrapper"
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
11 "math/rand"
12 "sync"
13 "sync/atomic"
14
15 "github.com/mjibson/goon"
16
17 "appengine/datastore"
18 pb "appengine_internal/datastore"
19 )
20
21 ////////////////////////////////// knrKeeper ///////////////////////////////////
22
23 type knrKeeper struct {
24 knrLock sync.Mutex
25 knrFunc goon.KindNameResolver
26 }
27
28 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
29 k.knrLock.Lock()
30 defer k.knrLock.Unlock()
31 ret := k.knrFunc
32 if ret == nil {
33 ret = goon.DefaultKindName
34 k.knrFunc = ret
35 }
36 return ret
37 }
38
39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
40 k.knrLock.Lock()
41 defer k.knrLock.Unlock()
42 if knr == nil {
43 knr = goon.DefaultKindName
44 }
45 k.knrFunc = knr
46 }
47
48 //////////////////////////////// dataStoreData /////////////////////////////////
49
50 type dataStoreData struct {
51 wrapper.BrokenFeatures
52 knrKeeper
53
54 rwlock sync.RWMutex
55 store *memStore
56 snap *memStore
57
58 /* collections:
59 * ents:ns -> key -> value
60 * (rootkind, rootid, __entity_group__,1 ) -> {__version__: int}
61 * (rootkind, rootid, __entity_group_ids __,1) -> {__version__: int}
62 * (__entity_group_ids__,1) -> {__versio n__: int}
63 * idx -> kind,A?,[-?prop]*
64 * idx:ns:kind -> key = nil
65 * idx:ns:kind|prop -> propval|key = [prev val]
66 * idx:ns:kind|-prop -> -propval|key = [next val]
67 * idx:ns:kind|A|?prop|?prop -> A|propval|propval|key = [prev/next va l]|[prev/next val]
68 * idx:ns:kind|?prop|?prop -> propval|propval|key = [prev/next val] |[prev/next val]
69 */
70 }
71
72 ////////////////////////////// New(dataStoreData) //////////////////////////////
73
74 func newDataStoreData() *dataStoreData {
75 store := newMemStore()
76 return &dataStoreData{
77 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)},
78 store: store,
79 snap: store.Snapshot(), // empty but better than a nil pointer.
80 }
81 }
82
83 ////////////////////////////// Locker(Datastore) ///////////////////////////////
84
85 func (d *dataStoreData) Lock() {
86 d.rwlock.Lock()
87 }
88
89 func (d *dataStoreData) Unlock() {
90 d.rwlock.Unlock()
91 }
92
93 func groupMetaKey(key *datastore.Key) []byte {
94 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) ))
95 }
96
97 func groupIDsKey(key *datastore.Key) []byte {
98 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key)))
99 }
100
101 func rootIDsKey(kind string) []byte {
102 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
103 }
104
105 func curVersion(ents *memCollection, key []byte) (int64, error) {
106 var err error
107 v := ents.Get(key)
108 num := int64(0)
109 numData := &propertyList{}
110 if v != nil {
111 err = numData.UnmarshalBinary(v)
112 num = (*numData)[0].Value.(int64)
113 }
114 return num, err
115 }
116
117 func plusPlusLocked(ents *memCollection, key []byte) (int64, error) {
M-A Ruel 2015/05/27 20:14:47 s/plusPlus/increment/g
iannucci 2015/05/27 21:36:22 done
118 v := ents.Get(key)
119
120 num := int64(0)
121 numData := &propertyList{}
122 if v == nil {
123 num++
124 *numData = append(*numData, datastore.Property{Name: "__version_ _"})
125 } else {
126 err := numData.UnmarshalBinary(v)
127 if err != nil {
128 return 0, err
129 }
130 num = (*numData)[0].Value.(int64)
131 num++
132 }
133 (*numData)[0].Value = num
134 incData, err := numData.MarshalBinary()
135 if err != nil {
136 return 0, err
137 }
138 ents.Set(key, incData)
139
140 return num, nil
141 }
142
143 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) {
144 coll := "ents:" + key.Namespace()
145 ents := d.store.GetCollection(coll)
146 if ents == nil {
147 ents = d.store.SetCollection(coll, nil)
148 }
149
150 if key.Incomplete() {
151 var idKey []byte
152 if key.Parent() == nil {
153 idKey = rootIDsKey(key.Kind())
154 } else {
155 idKey = groupIDsKey(key)
156 }
157
158 id, err := plusPlusLocked(ents, idKey)
159 if err != nil {
160 return nil, nil, err
161 }
162 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
163 }
164
165 return ents, key, nil
166 }
167
168 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) {
169 key := newKeyObj(ns, knr, src)
170 if !KeyCouldBeValid(ns, key, UserKeyOnly) {
171 // TODO(riannucci): different error for Put-ing to reserved Keys ?
172 return nil, nil, datastore.ErrInvalidKey
173 }
174
175 data, err := toPL(src)
176 return key, data, err
177 }
178
179 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
180 key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
181 if err != nil {
182 return nil, err
183 }
184 key, err = d.putInner(key, plData)
185 if err != nil {
186 return nil, err
187 }
188 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
189 }
190
191 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) {
192 dataBytes, err := data.MarshalBinary()
193 if err != nil {
194 return nil, err
195 }
196
197 d.rwlock.Lock()
198 defer d.Unlock()
199
200 ents, key, err := d.entsKeyLocked(key)
201 if err != nil {
202 return nil, err
203 }
204
205 if _, err = plusPlusLocked(ents, groupMetaKey(key)); err != nil {
206 return nil, err
207 }
208
209 ents.Set(keyBytes(noNS, key), dataBytes)
210
211 return key, nil
212 }
213
214 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error {
215 key := newKeyObj(ns, knr, dst)
216 if !KeyValid(ns, key, AllowSpecialKeys) {
217 return datastore.ErrInvalidKey
218 }
219
220 ents, err := getColl(key)
221 if err != nil {
222 return err
223 }
224 if ents == nil {
225 return datastore.ErrNoSuchEntity
226 }
227 pdata := ents.Get(keyBytes(noNS, key))
228 if pdata == nil {
229 return datastore.ErrNoSuchEntity
230 }
231 pl := &propertyList{}
232 err = pl.UnmarshalBinary(pdata)
233 if err != nil {
234 return err
235 }
236 return fromPL(pl, dst)
237 }
238
239 func (d *dataStoreData) get(ns string, dst interface{}) error {
240 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) {
241 d.rwlock.RLock()
242 s := d.store.Snapshot()
243 d.rwlock.RUnlock()
244
245 return s.GetCollection("ents:" + ns), nil
246 })
247 }
248
249 func (d *dataStoreData) del(ns string, key *datastore.Key) error {
250 if !KeyValid(ns, key, UserKeyOnly) {
251 return datastore.ErrInvalidKey
252 }
253
254 keyBuf := keyBytes(noNS, key)
255
256 d.rwlock.Lock()
257 defer d.Unlock()
258
259 ents := d.store.GetCollection("ents:" + ns)
260 if ents == nil {
261 return nil
262 }
263
264 _, err := plusPlusLocked(ents, groupMetaKey(key))
265 if err != nil {
266 return err
267 }
268
269 ents.Delete(keyBuf)
270 return nil
271 }
272
273 ///////////////////////// memContextObj(dataStoreData) /////////////////////////
274
275 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
276 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
277
278 txn := obj.(*txnDataStoreData)
279 for rk, muts := range txn.muts {
280 if len(muts) == 0 { // read-only
281 continue
282 }
283 k, err := keyFromByteString(withNS, rk)
284 if err != nil {
285 panic(err)
286 }
287 entKey := "ents:" + k.Namespace()
288 mkey := groupMetaKey(k)
289 entsHead := d.store.GetCollection(entKey)
290 entsSnap := txn.snap.GetCollection(entKey)
291 vHead, err := curVersion(entsHead, mkey)
292 if err != nil {
293 panic(err)
294 }
295 vSnap, err := curVersion(entsSnap, mkey)
296 if err != nil {
297 panic(err)
298 }
299 if vHead != vSnap {
300 return false
301 }
302 }
303 return true
304 }
305
306 func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) {
307 txn := obj.(*txnDataStoreData)
308 for _, muts := range txn.muts {
309 if len(muts) == 0 { // read-only
310 continue
311 }
312 for _, m := range muts {
313 var err error
314 if m.data == nil {
315 err = d.del(m.key.Namespace(), m.key)
316 } else {
317 _, err = d.putInner(m.key, m.data)
318 }
319 if err != nil {
320 panic(err)
321 }
322 }
323 }
324 }
325
326 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) {
327 return &txnDataStoreData{
328 // alias to the main datastore's so that testing code can have p rimitive
329 // access to break features inside of transactions.
330 BrokenFeatures: &d.BrokenFeatures,
331 parent: d,
332 knrKeeper: knrKeeper{knrFunc: d.knrFunc},
333 isXG: o != nil && o.XG,
334 snap: d.store.Snapshot(),
335 muts: map[string][]txnMutation{},
336 }, nil
337 }
338
339 func (d *dataStoreData) endTxn() {}
340
341 /////////////////////////////// txnDataStoreData ///////////////////////////////
342 type txnMutation struct {
343 key *datastore.Key
344 data *propertyList
345 }
346
347 type txnDataStoreData struct {
348 *wrapper.BrokenFeatures
349 knrKeeper
350 sync.Mutex
351
352 parent *dataStoreData
353
354 // boolean 0 or 1, use atomic.*Int32 to access.
355 closed int32
356 isXG bool
357
358 snap *memStore
359
360 // string is the raw-bytes encoding of the entity root incl. namespace
361 muts map[string][]txnMutation
362 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
363 // length of encoded keys + values.
364 }
365
366 const xgEGLimit = 25
367
368 /////////////////////// memContextObj(txnDataStoreData) ////////////////////////
369
370 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
371 func (td *txnDataStoreData) endTxn() {
372 if atomic.LoadInt32(&td.closed) == 1 {
373 panic("cannot end transaction twice")
374 }
375 atomic.StoreInt32(&td.closed, 1)
376 }
377 func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) {
378 panic("txnDataStoreData cannot apply transactions")
379 }
380 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) {
381 return nil, errors.New("datastore: nested transactions are not supported ")
382 }
383
384 /////////////////// wrapper.BrokenFeatures(txnDataStoreData) ///////////////////
385
386 func (td *txnDataStoreData) IsBroken() error {
387 // Slightly different from the SDK... datastore and taskqueue each imple ment
388 // this here, where in the SDK only datastore.transaction.Call does.
389 if atomic.LoadInt32(&td.closed) == 1 {
390 return errors.New("datastore: transaction context has expired")
391 }
392 return td.BrokenFeatures.IsBroken()
393 }
394
395 // writeMutation ensures that this transaction can support the given key/value
396 // mutation.
397 //
398 // if getOnly is true, don't record the actual mutation data, just ensure that
399 // the key is in an included entity group (or add an empty entry for tha t
400 // group).
401 //
402 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
403 //
404 // Returns an error if this key causes the transaction to cross too many entity
405 // groups.
406 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
407 rk := string(keyBytes(withNS, rootKey(key)))
408
409 td.Lock()
410 defer td.Unlock()
411
412 if _, ok := td.muts[rk]; !ok {
413 limit := 1
414 if td.isXG {
415 limit = xgEGLimit
416 }
417 if len(td.muts)+1 > limit {
418 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
419 if td.isXG {
420 msg = "operating on too many entity groups in a single transaction"
421 }
422 return newDSError(pb.Error_BAD_REQUEST, msg)
423 }
424 td.muts[rk] = []txnMutation{}
425 }
426 if !getOnly {
427 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
428 }
429
430 return nil
431 }
432
433 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) {
434 key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
435 if err != nil {
436 return nil, err
437 }
438
439 func() {
440 td.parent.Lock()
441 defer td.parent.Unlock()
442 _, key, err = td.parent.entsKeyLocked(key)
443 }()
444 if err != nil {
445 return nil, err
446 }
447
448 if err = td.writeMutation(false, key, plData); err != nil {
449 return nil, err
450 }
451
452 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
453 }
454
455 func (td *txnDataStoreData) get(ns string, dst interface{}) error {
456 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
457 if err := td.writeMutation(true, key, nil); err != nil {
458 return nil, err
459 }
460 return td.snap.GetCollection("ents:" + ns), nil
461 })
462 }
463
464 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
465 if !KeyValid(ns, key, UserKeyOnly) {
466 return datastore.ErrInvalidKey
467 }
468 return td.writeMutation(false, key, nil)
469 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698