Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(320)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/datastore_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "infra/gae/libs/wrapper"
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
11 "math/rand"
12 "sync"
13 "sync/atomic"
14
15 "github.com/mjibson/goon"
16
17 "appengine/datastore"
18 pb "appengine_internal/datastore"
19 )
20
21 ////////////////////////////////// knrKeeper ///////////////////////////////////
22
23 type knrKeeper struct {
24 knrLock sync.Mutex
25 knrFunc goon.KindNameResolver
26 }
27
28 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
29 k.knrLock.Lock()
30 defer k.knrLock.Unlock()
31 if k.knrFunc == nil {
32 k.knrFunc = goon.DefaultKindName
33 }
34 return k.knrFunc
35 }
36
37 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
38 k.knrLock.Lock()
39 defer k.knrLock.Unlock()
40 if knr == nil {
41 knr = goon.DefaultKindName
42 }
43 k.knrFunc = knr
44 }
45
46 //////////////////////////////// dataStoreData /////////////////////////////////
47
48 type dataStoreData struct {
49 // implements memContextObj.
50
51 wrapper.BrokenFeatures
52 knrKeeper
53
54 rwlock sync.RWMutex
55 // See README.md for store schema.
56 store *memStore
57 snap *memStore
58 }
59
60 ////////////////////////////// New(dataStoreData) //////////////////////////////
M-A Ruel 2015/05/29 18:16:52 FTR, I don't see value in these kinds of separator
61
62 func newDataStoreData() *dataStoreData {
M-A Ruel 2015/05/29 18:16:52 The thing I'd really like for each object is for y
63 store := newMemStore()
64 return &dataStoreData{
65 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)},
66 store: store,
67 snap: store.Snapshot(), // empty but better than a nil pointer.
68 }
69 }
70
71 ////////////////////////////// Locker(Datastore) ///////////////////////////////
72
73 func (d *dataStoreData) Lock() {
74 d.rwlock.Lock()
75 }
76
77 func (d *dataStoreData) Unlock() {
78 d.rwlock.Unlock()
79 }
80
81 func groupMetaKey(key *datastore.Key) []byte {
82 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) ))
83 }
84
85 func groupIDsKey(key *datastore.Key) []byte {
86 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key)))
87 }
88
89 func rootIDsKey(kind string) []byte {
90 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
91 }
92
93 func curVersion(ents *memCollection, key []byte) (int64, error) {
94 if v := ents.Get(key); v != nil {
95 numData := &propertyList{}
96 if err := numData.UnmarshalBinary(v); err != nil {
97 return 0, err
98 }
99 return (*numData)[0].Value.(int64), nil
100 }
101 return 0, nil
102 }
103
104 func incrementLocked(ents *memCollection, key []byte) (int64, error) {
105 num := int64(0)
106 numData := &propertyList{}
107 if v := ents.Get(key); v != nil {
108 if err := numData.UnmarshalBinary(v); err != nil {
109 return 0, err
110 }
111 num = (*numData)[0].Value.(int64)
112 } else {
113 *numData = append(*numData, datastore.Property{Name: "__version_ _"})
114 }
115 num++
116 (*numData)[0].Value = num
117 incData, err := numData.MarshalBinary()
118 if err != nil {
119 return 0, err
120 }
121 ents.Set(key, incData)
122
123 return num, nil
124 }
125
126 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) {
127 coll := "ents:" + key.Namespace()
128 ents := d.store.GetCollection(coll)
129 if ents == nil {
130 ents = d.store.SetCollection(coll, nil)
131 }
132
133 if key.Incomplete() {
134 idKey := []byte(nil)
135 if key.Parent() == nil {
136 idKey = rootIDsKey(key.Kind())
137 } else {
138 idKey = groupIDsKey(key)
139 }
140 id, err := incrementLocked(ents, idKey)
141 if err != nil {
142 return nil, nil, err
143 }
144 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
145 }
146
147 return ents, key, nil
148 }
149
150 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) {
151 key := newKeyObj(ns, knr, src)
152 if !KeyCouldBeValid(ns, key, UserKeyOnly) {
153 // TODO(riannucci): different error for Put-ing to reserved Keys ?
154 return nil, nil, datastore.ErrInvalidKey
155 }
156
157 data, err := toPL(src)
158 return key, data, err
159 }
160
161 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
162 key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
163 if err != nil {
164 return nil, err
165 }
166 if key, err = d.putInner(key, plData); err != nil {
167 return nil, err
168 }
169 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
170 }
171
172 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) {
173 dataBytes, err := data.MarshalBinary()
174 if err != nil {
175 return nil, err
176 }
177
178 d.rwlock.Lock()
179 defer d.rwlock.Unlock()
180
181 ents, key, err := d.entsKeyLocked(key)
182 if err != nil {
183 return nil, err
184 }
185 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
186 return nil, err
187 }
188
189 ents.Set(keyBytes(noNS, key), dataBytes)
190
191 return key, nil
192 }
193
194 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error {
195 key := newKeyObj(ns, knr, dst)
196 if !KeyValid(ns, key, AllowSpecialKeys) {
197 return datastore.ErrInvalidKey
198 }
199
200 ents, err := getColl(key)
201 if err != nil {
202 return err
203 }
204 if ents == nil {
205 return datastore.ErrNoSuchEntity
206 }
207 pdata := ents.Get(keyBytes(noNS, key))
208 if pdata == nil {
209 return datastore.ErrNoSuchEntity
210 }
211 pl := &propertyList{}
212 if err = pl.UnmarshalBinary(pdata); err != nil {
213 return err
214 }
215 return fromPL(pl, dst)
216 }
217
218 func (d *dataStoreData) get(ns string, dst interface{}) error {
219 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) {
220 d.rwlock.RLock()
221 s := d.store.Snapshot()
222 d.rwlock.RUnlock()
223
224 return s.GetCollection("ents:" + ns), nil
225 })
226 }
227
228 func (d *dataStoreData) del(ns string, key *datastore.Key) error {
229 if !KeyValid(ns, key, UserKeyOnly) {
230 return datastore.ErrInvalidKey
231 }
232
233 keyBuf := keyBytes(noNS, key)
234
235 d.rwlock.Lock()
236 defer d.rwlock.Unlock()
237
238 ents := d.store.GetCollection("ents:" + ns)
239 if ents == nil {
240 return nil
241 }
242 if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil {
243 return err
244 }
245
246 ents.Delete(keyBuf)
247 return nil
248 }
249
250 ///////////////////////// memContextObj(dataStoreData) /////////////////////////
251
252 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
253 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
254
255 txn := obj.(*txnDataStoreData)
256 for rk, muts := range txn.muts {
257 if len(muts) == 0 { // read-only
258 continue
259 }
260 k, err := keyFromByteString(withNS, rk)
261 if err != nil {
262 panic(err)
263 }
264 entKey := "ents:" + k.Namespace()
265 mkey := groupMetaKey(k)
266 entsHead := d.store.GetCollection(entKey)
267 entsSnap := txn.snap.GetCollection(entKey)
268 vHead, err := curVersion(entsHead, mkey)
269 if err != nil {
270 panic(err)
271 }
272 vSnap, err := curVersion(entsSnap, mkey)
273 if err != nil {
274 panic(err)
275 }
276 if vHead != vSnap {
277 return false
278 }
279 }
280 return true
281 }
282
283 func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) {
284 txn := obj.(*txnDataStoreData)
285 for _, muts := range txn.muts {
286 if len(muts) == 0 { // read-only
287 continue
288 }
289 for _, m := range muts {
290 err := error(nil)
291 if m.data == nil {
292 err = d.del(m.key.Namespace(), m.key)
293 } else {
294 _, err = d.putInner(m.key, m.data)
295 }
296 if err != nil {
297 panic(err)
298 }
299 }
300 }
301 }
302
303 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) {
304 return &txnDataStoreData{
305 // alias to the main datastore's so that testing code can have p rimitive
306 // access to break features inside of transactions.
307 BrokenFeatures: &d.BrokenFeatures,
308 parent: d,
309 knrKeeper: knrKeeper{knrFunc: d.knrFunc},
310 isXG: o != nil && o.XG,
311 snap: d.store.Snapshot(),
312 muts: map[string][]txnMutation{},
313 }, nil
314 }
315
316 func (d *dataStoreData) endTxn() {}
317
318 /////////////////////////////// txnDataStoreData ///////////////////////////////
M-A Ruel 2015/05/29 18:16:52 I don't know if you intended to insert a line in-b
319 type txnMutation struct {
320 key *datastore.Key
321 data *propertyList
322 }
323
324 type txnDataStoreData struct {
325 *wrapper.BrokenFeatures
326 knrKeeper
327 sync.Mutex
328
329 parent *dataStoreData
330
331 // boolean 0 or 1, use atomic.*Int32 to access.
332 closed int32
333 isXG bool
334
335 snap *memStore
336
337 // string is the raw-bytes encoding of the entity root incl. namespace
338 muts map[string][]txnMutation
339 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
340 // length of encoded keys + values.
341 }
342
343 const xgEGLimit = 25
344
345 /////////////////////// memContextObj(txnDataStoreData) ////////////////////////
M-A Ruel 2015/05/29 18:16:52 I assume you mean that txnDataStoreData implements
346
347 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
348 func (td *txnDataStoreData) endTxn() {
349 if atomic.LoadInt32(&td.closed) == 1 {
350 panic("cannot end transaction twice")
351 }
352 atomic.StoreInt32(&td.closed, 1)
353 }
354 func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) {
355 panic("txnDataStoreData cannot apply transactions")
356 }
357 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) {
358 return nil, errors.New("datastore: nested transactions are not supported ")
359 }
360
361 /////////////////// wrapper.BrokenFeatures(txnDataStoreData) ///////////////////
362
363 func (td *txnDataStoreData) IsBroken() error {
364 // Slightly different from the SDK... datastore and taskqueue each imple ment
365 // this here, where in the SDK only datastore.transaction.Call does.
366 if atomic.LoadInt32(&td.closed) == 1 {
367 return errors.New("datastore: transaction context has expired")
368 }
369 return td.BrokenFeatures.IsBroken()
370 }
371
372 // writeMutation ensures that this transaction can support the given key/value
373 // mutation.
374 //
375 // if getOnly is true, don't record the actual mutation data, just ensure that
376 // the key is in an included entity group (or add an empty entry for tha t
377 // group).
378 //
379 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
380 //
381 // Returns an error if this key causes the transaction to cross too many entity
382 // groups.
383 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
384 rk := string(keyBytes(withNS, rootKey(key)))
385
386 td.Lock()
387 defer td.Unlock()
388
389 if _, ok := td.muts[rk]; !ok {
390 limit := 1
391 if td.isXG {
392 limit = xgEGLimit
393 }
394 if len(td.muts)+1 > limit {
395 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
396 if td.isXG {
397 msg = "operating on too many entity groups in a single transaction"
398 }
399 return newDSError(pb.Error_BAD_REQUEST, msg)
400 }
401 td.muts[rk] = []txnMutation{}
402 }
403 if !getOnly {
404 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
405 }
406
407 return nil
408 }
409
410 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) {
411 key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
412 if err != nil {
413 return nil, err
414 }
415
416 func() {
417 td.parent.Lock()
418 defer td.parent.Unlock()
419 _, key, err = td.parent.entsKeyLocked(key)
420 }()
421 if err != nil {
422 return nil, err
423 }
424
425 if err = td.writeMutation(false, key, plData); err != nil {
426 return nil, err
427 }
428
429 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
430 }
431
432 func (td *txnDataStoreData) get(ns string, dst interface{}) error {
433 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
434 if err := td.writeMutation(true, key, nil); err != nil {
435 return nil, err
436 }
437 return td.snap.GetCollection("ents:" + ns), nil
438 })
439 }
440
441 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
442 if !KeyValid(ns, key, UserKeyOnly) {
443 return datastore.ErrInvalidKey
444 }
445 return td.writeMutation(false, key, nil)
446 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698