Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(48)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/datastore_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: add go-slab dependency Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "infra/gae/libs/wrapper"
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
11 "math/rand"
12 "sync"
13 "sync/atomic"
14
15 "github.com/mjibson/goon"
16
17 "appengine/datastore"
18 pb "appengine_internal/datastore"
19 )
20
21 ////////////////////////////////// knrKeeper ///////////////////////////////////
22
23 type knrKeeper struct {
24 knrLock sync.Mutex
25 knrFunc goon.KindNameResolver
26 }
27
28 var _ = wrapper.DSKindSetter((*knrKeeper)(nil))
29
30 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
31 k.knrLock.Lock()
32 defer k.knrLock.Unlock()
33 if k.knrFunc == nil {
34 k.knrFunc = goon.DefaultKindName
35 }
36 return k.knrFunc
37 }
38
39 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
40 k.knrLock.Lock()
41 defer k.knrLock.Unlock()
42 if knr == nil {
43 knr = goon.DefaultKindName
44 }
45 k.knrFunc = knr
46 }
47
48 //////////////////////////////// dataStoreData /////////////////////////////////
49
50 type dataStoreData struct {
51 wrapper.BrokenFeatures
52 knrKeeper
53
54 rwlock sync.RWMutex
55 // See README.md for store schema.
56 store *memStore
57 snap *memStore
58 }
59
60 var (
61 _ = memContextObj((*dataStoreData)(nil))
62 _ = sync.Locker((*dataStoreData)(nil))
63 _ = wrapper.Testable((*dataStoreData)(nil))
64 _ = wrapper.DSKindSetter((*dataStoreData)(nil))
65 )
66
67 func newDataStoreData() *dataStoreData {
68 store := newMemStore()
69 return &dataStoreData{
70 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)},
71 store: store,
72 snap: store.Snapshot(), // empty but better than a nil pointer.
73 }
74 }
75
76 func (d *dataStoreData) Lock() {
77 d.rwlock.Lock()
78 }
79
80 func (d *dataStoreData) Unlock() {
81 d.rwlock.Unlock()
82 }
83
84 func groupMetaKey(key *datastore.Key) []byte {
85 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) ))
86 }
87
88 func groupIDsKey(key *datastore.Key) []byte {
89 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key)))
90 }
91
92 func rootIDsKey(kind string) []byte {
93 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
94 }
95
96 func curVersion(ents *memCollection, key []byte) (int64, error) {
97 if v := ents.Get(key); v != nil {
98 numData := &propertyList{}
99 if err := numData.UnmarshalBinary(v); err != nil {
100 return 0, err
101 }
102 return (*numData)[0].Value.(int64), nil
103 }
104 return 0, nil
105 }
106
107 func incrementLocked(ents *memCollection, key []byte) (int64, error) {
108 num := int64(0)
109 numData := &propertyList{}
110 if v := ents.Get(key); v != nil {
111 if err := numData.UnmarshalBinary(v); err != nil {
112 return 0, err
113 }
114 num = (*numData)[0].Value.(int64)
115 } else {
116 *numData = append(*numData, datastore.Property{Name: "__version_ _"})
117 }
118 num++
119 (*numData)[0].Value = num
120 incData, err := numData.MarshalBinary()
121 if err != nil {
122 return 0, err
123 }
124 ents.Set(key, incData)
125
126 return num, nil
127 }
128
129 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) {
130 coll := "ents:" + key.Namespace()
131 ents := d.store.GetCollection(coll)
132 if ents == nil {
133 ents = d.store.SetCollection(coll, nil)
134 }
135
136 if key.Incomplete() {
137 idKey := []byte(nil)
138 if key.Parent() == nil {
139 idKey = rootIDsKey(key.Kind())
140 } else {
141 idKey = groupIDsKey(key)
142 }
143 id, err := incrementLocked(ents, idKey)
144 if err != nil {
145 return nil, nil, err
146 }
147 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
148 }
149
150 return ents, key, nil
151 }
152
153 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) {
154 key := newKeyObj(ns, knr, src)
155 if !KeyCouldBeValid(ns, key, UserKeyOnly) {
156 // TODO(riannucci): different error for Put-ing to reserved Keys ?
157 return nil, nil, datastore.ErrInvalidKey
158 }
159
160 data, err := toPL(src)
161 return key, data, err
162 }
163
164 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
165 key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
166 if err != nil {
167 return nil, err
168 }
169 if key, err = d.putInner(key, plData); err != nil {
170 return nil, err
171 }
172 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
173 }
174
175 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) {
176 dataBytes, err := data.MarshalBinary()
177 if err != nil {
178 return nil, err
179 }
180
181 d.rwlock.Lock()
182 defer d.rwlock.Unlock()
183
184 ents, key, err := d.entsKeyLocked(key)
185 if err != nil {
186 return nil, err
187 }
188 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
189 return nil, err
190 }
191
192 ents.Set(keyBytes(noNS, key), dataBytes)
193
194 return key, nil
195 }
196
197 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error {
198 key := newKeyObj(ns, knr, dst)
199 if !KeyValid(ns, key, AllowSpecialKeys) {
200 return datastore.ErrInvalidKey
201 }
202
203 ents, err := getColl(key)
204 if err != nil {
205 return err
206 }
207 if ents == nil {
208 return datastore.ErrNoSuchEntity
209 }
210 pdata := ents.Get(keyBytes(noNS, key))
211 if pdata == nil {
212 return datastore.ErrNoSuchEntity
213 }
214 pl := &propertyList{}
215 if err = pl.UnmarshalBinary(pdata); err != nil {
216 return err
217 }
218 return fromPL(pl, dst)
219 }
220
221 func (d *dataStoreData) get(ns string, dst interface{}) error {
222 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) {
223 d.rwlock.RLock()
224 s := d.store.Snapshot()
225 d.rwlock.RUnlock()
226
227 return s.GetCollection("ents:" + ns), nil
228 })
229 }
230
231 func (d *dataStoreData) del(ns string, key *datastore.Key) error {
232 if !KeyValid(ns, key, UserKeyOnly) {
233 return datastore.ErrInvalidKey
234 }
235
236 keyBuf := keyBytes(noNS, key)
237
238 d.rwlock.Lock()
239 defer d.rwlock.Unlock()
240
241 ents := d.store.GetCollection("ents:" + ns)
242 if ents == nil {
243 return nil
244 }
245 if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil {
246 return err
247 }
248
249 ents.Delete(keyBuf)
250 return nil
251 }
252
253 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
254 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
255
256 txn := obj.(*txnDataStoreData)
257 for rk, muts := range txn.muts {
258 if len(muts) == 0 { // read-only
259 continue
260 }
261 k, err := keyFromByteString(withNS, rk)
262 if err != nil {
263 panic(err)
264 }
265 entKey := "ents:" + k.Namespace()
266 mkey := groupMetaKey(k)
267 entsHead := d.store.GetCollection(entKey)
268 entsSnap := txn.snap.GetCollection(entKey)
269 vHead, err := curVersion(entsHead, mkey)
270 if err != nil {
271 panic(err)
272 }
273 vSnap, err := curVersion(entsSnap, mkey)
274 if err != nil {
275 panic(err)
276 }
277 if vHead != vSnap {
278 return false
279 }
280 }
281 return true
282 }
283
284 func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) {
285 txn := obj.(*txnDataStoreData)
286 for _, muts := range txn.muts {
287 if len(muts) == 0 { // read-only
288 continue
289 }
290 for _, m := range muts {
291 err := error(nil)
292 if m.data == nil {
293 err = d.del(m.key.Namespace(), m.key)
294 } else {
295 _, err = d.putInner(m.key, m.data)
296 }
297 if err != nil {
298 panic(err)
299 }
300 }
301 }
302 }
303
304 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) {
305 return &txnDataStoreData{
306 // alias to the main datastore's so that testing code can have p rimitive
307 // access to break features inside of transactions.
308 BrokenFeatures: &d.BrokenFeatures,
309 parent: d,
310 knrKeeper: knrKeeper{knrFunc: d.knrFunc},
311 isXG: o != nil && o.XG,
312 snap: d.store.Snapshot(),
313 muts: map[string][]txnMutation{},
314 }, nil
315 }
316
317 func (d *dataStoreData) endTxn() {}
318
319 /////////////////////////////// txnDataStoreData ///////////////////////////////
320
321 type txnMutation struct {
322 key *datastore.Key
323 data *propertyList
324 }
325
326 type txnDataStoreData struct {
327 *wrapper.BrokenFeatures
328 knrKeeper
329 sync.Mutex
330
331 parent *dataStoreData
332
333 // boolean 0 or 1, use atomic.*Int32 to access.
334 closed int32
335 isXG bool
336
337 snap *memStore
338
339 // string is the raw-bytes encoding of the entity root incl. namespace
340 muts map[string][]txnMutation
341 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
342 // length of encoded keys + values.
343 }
344
345 var (
346 _ = memContextObj((*txnDataStoreData)(nil))
347 _ = sync.Locker((*txnDataStoreData)(nil))
348 _ = wrapper.Testable((*txnDataStoreData)(nil))
349 _ = wrapper.DSKindSetter((*txnDataStoreData)(nil))
350 )
351
352 const xgEGLimit = 25
353
354 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
355 func (td *txnDataStoreData) endTxn() {
356 if atomic.LoadInt32(&td.closed) == 1 {
357 panic("cannot end transaction twice")
358 }
359 atomic.StoreInt32(&td.closed, 1)
360 }
361 func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) {
362 panic("txnDataStoreData cannot apply transactions")
363 }
364 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) {
365 return nil, errors.New("datastore: nested transactions are not supported ")
366 }
367
368 func (td *txnDataStoreData) IsBroken() error {
369 // Slightly different from the SDK... datastore and taskqueue each imple ment
370 // this here, where in the SDK only datastore.transaction.Call does.
371 if atomic.LoadInt32(&td.closed) == 1 {
372 return errors.New("datastore: transaction context has expired")
373 }
374 return td.BrokenFeatures.IsBroken()
375 }
376
377 // writeMutation ensures that this transaction can support the given key/value
378 // mutation.
379 //
380 // if getOnly is true, don't record the actual mutation data, just ensure that
381 // the key is in an included entity group (or add an empty entry for tha t
382 // group).
383 //
384 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
385 //
386 // Returns an error if this key causes the transaction to cross too many entity
387 // groups.
388 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
389 rk := string(keyBytes(withNS, rootKey(key)))
390
391 td.Lock()
392 defer td.Unlock()
393
394 if _, ok := td.muts[rk]; !ok {
395 limit := 1
396 if td.isXG {
397 limit = xgEGLimit
398 }
399 if len(td.muts)+1 > limit {
400 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
401 if td.isXG {
402 msg = "operating on too many entity groups in a single transaction"
403 }
404 return newDSError(pb.Error_BAD_REQUEST, msg)
405 }
406 td.muts[rk] = []txnMutation{}
407 }
408 if !getOnly {
409 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
410 }
411
412 return nil
413 }
414
415 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) {
416 key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
417 if err != nil {
418 return nil, err
419 }
420
421 func() {
422 td.parent.Lock()
423 defer td.parent.Unlock()
424 _, key, err = td.parent.entsKeyLocked(key)
425 }()
426 if err != nil {
427 return nil, err
428 }
429
430 if err = td.writeMutation(false, key, plData); err != nil {
431 return nil, err
432 }
433
434 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
435 }
436
437 func (td *txnDataStoreData) get(ns string, dst interface{}) error {
438 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
439 if err := td.writeMutation(true, key, nil); err != nil {
440 return nil, err
441 }
442 return td.snap.GetCollection("ents:" + ns), nil
443 })
444 }
445
446 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
447 if !KeyValid(ns, key, UserKeyOnly) {
448 return datastore.ErrInvalidKey
449 }
450 return td.writeMutation(false, key, nil)
451 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/wrapper/memory/datastore.go ('k') | go/src/infra/gae/libs/wrapper/memory/datastore_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698