Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(182)

Side by Side Diff: go/src/infra/gae/libs/wrapper/memory/datastore_data.go

Issue 1152383003: Simple memory testing for gae/wrapper (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@better_context_lite
Patch Set: be internally consistent Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "errors"
9 "infra/gae/libs/wrapper"
10 goon_internal "infra/gae/libs/wrapper/memory/internal/goon"
11 "math/rand"
12 "sync"
13 "sync/atomic"
14
15 "github.com/mjibson/goon"
16
17 "appengine/datastore"
18 pb "appengine_internal/datastore"
19 )
20
21 ////////////////////////////////// knrKeeper ///////////////////////////////////
22
23 type knrKeeper struct {
24 knrLock sync.Mutex
25 knrFunc goon.KindNameResolver
26 }
27
28 func (k *knrKeeper) KindNameResolver() goon.KindNameResolver {
29 k.knrLock.Lock()
30 defer k.knrLock.Unlock()
31 if k.knrFunc == nil {
32 k.knrFunc = goon.DefaultKindName
33 }
34 return k.knrFunc
35 }
36
37 func (k *knrKeeper) SetKindNameResolver(knr goon.KindNameResolver) {
38 k.knrLock.Lock()
39 defer k.knrLock.Unlock()
40 if knr == nil {
41 knr = goon.DefaultKindName
42 }
43 k.knrFunc = knr
44 }
45
46 //////////////////////////////// dataStoreData /////////////////////////////////
47
48 type dataStoreData struct {
M-A Ruel 2015/05/29 16:16:18 Could you describe why dataStoreData exposes sync.
iannucci 2015/05/29 16:33:14 added a note that this implements memContextObj.
49 wrapper.BrokenFeatures
50 knrKeeper
51
52 rwlock sync.RWMutex
53 // See README.md for store schema.
54 store *memStore
55 snap *memStore
56 }
57
58 ////////////////////////////// New(dataStoreData) //////////////////////////////
59
60 func newDataStoreData() *dataStoreData {
61 store := newMemStore()
62 return &dataStoreData{
63 BrokenFeatures: wrapper.BrokenFeatures{DefaultError: newDSError( pb.Error_INTERNAL_ERROR)},
64 store: store,
65 snap: store.Snapshot(), // empty but better than a nil pointer.
66 }
67 }
68
69 ////////////////////////////// Locker(Datastore) ///////////////////////////////
70
71 func (d *dataStoreData) Lock() {
72 d.rwlock.Lock()
73 }
74
75 func (d *dataStoreData) Unlock() {
76 d.rwlock.Unlock()
77 }
78
79 func groupMetaKey(key *datastore.Key) []byte {
80 return keyBytes(noNS, newKey("", "__entity_group__", "", 1, rootKey(key) ))
81 }
82
83 func groupIDsKey(key *datastore.Key) []byte {
84 return keyBytes(noNS, newKey("", "__entity_group_ids__", "", 1, rootKey( key)))
85 }
86
87 func rootIDsKey(kind string) []byte {
88 return keyBytes(noNS, newKey("", "__entity_root_ids__", kind, 0, nil))
89 }
90
91 func curVersion(ents *memCollection, key []byte) (int64, error) {
92 if v := ents.Get(key); v != nil {
93 numData := &propertyList{}
94 if err := numData.UnmarshalBinary(v); err != nil {
95 return 0, err
96 }
97 return (*numData)[0].Value.(int64), nil
98 }
99 return 0, nil
100 }
101
102 func incrementLocked(ents *memCollection, key []byte) (int64, error) {
103 num := int64(0)
104 numData := &propertyList{}
105 if v := ents.Get(key); v != nil {
106 if err := numData.UnmarshalBinary(v); err != nil {
107 return 0, err
108 }
109 num = (*numData)[0].Value.(int64)
110 } else {
111 *numData = append(*numData, datastore.Property{Name: "__version_ _"})
112 }
113 num++
114 (*numData)[0].Value = num
115 incData, err := numData.MarshalBinary()
116 if err != nil {
117 return 0, err
118 }
119 ents.Set(key, incData)
120
121 return num, nil
122 }
123
124 func (d *dataStoreData) entsKeyLocked(key *datastore.Key) (*memCollection, *data store.Key, error) {
125 coll := "ents:" + key.Namespace()
126 ents := d.store.GetCollection(coll)
127 if ents == nil {
128 ents = d.store.SetCollection(coll, nil)
129 }
130
131 if key.Incomplete() {
132 idKey := []byte(nil)
133 if key.Parent() == nil {
134 idKey = rootIDsKey(key.Kind())
135 } else {
136 idKey = groupIDsKey(key)
137 }
138 id, err := incrementLocked(ents, idKey)
139 if err != nil {
140 return nil, nil, err
141 }
142 key = newKey(key.Namespace(), key.Kind(), "", id, key.Parent())
143 }
144
145 return ents, key, nil
146 }
147
148 func putPrelim(ns string, knr goon.KindNameResolver, src interface{}) (*datastor e.Key, *propertyList, error) {
149 key := newKeyObj(ns, knr, src)
150 if !KeyCouldBeValid(ns, key, UserKeyOnly) {
151 // TODO(riannucci): different error for Put-ing to reserved Keys ?
152 return nil, nil, datastore.ErrInvalidKey
153 }
154
155 data, err := toPL(src)
156 return key, data, err
157 }
158
159 func (d *dataStoreData) put(ns string, src interface{}) (*datastore.Key, error) {
160 key, plData, err := putPrelim(ns, d.KindNameResolver(), src)
161 if err != nil {
162 return nil, err
163 }
164 if key, err = d.putInner(key, plData); err != nil {
165 return nil, err
166 }
167 return key, goon_internal.SetStructKey(src, key, d.KindNameResolver())
168 }
169
170 func (d *dataStoreData) putInner(key *datastore.Key, data *propertyList) (*datas tore.Key, error) {
171 dataBytes, err := data.MarshalBinary()
172 if err != nil {
173 return nil, err
174 }
175
176 d.Lock()
177 defer d.Unlock()
178
179 ents, key, err := d.entsKeyLocked(key)
180 if err != nil {
181 return nil, err
182 }
183 if _, err = incrementLocked(ents, groupMetaKey(key)); err != nil {
184 return nil, err
185 }
186
187 ents.Set(keyBytes(noNS, key), dataBytes)
188
189 return key, nil
190 }
191
192 func getInner(ns string, knr goon.KindNameResolver, dst interface{}, getColl fun c(*datastore.Key) (*memCollection, error)) error {
193 key := newKeyObj(ns, knr, dst)
194 if !KeyValid(ns, key, AllowSpecialKeys) {
195 return datastore.ErrInvalidKey
196 }
197
198 ents, err := getColl(key)
199 if err != nil {
200 return err
201 }
202 if ents == nil {
203 return datastore.ErrNoSuchEntity
204 }
205 pdata := ents.Get(keyBytes(noNS, key))
206 if pdata == nil {
207 return datastore.ErrNoSuchEntity
208 }
209 pl := &propertyList{}
210 if err = pl.UnmarshalBinary(pdata); err != nil {
211 return err
212 }
213 return fromPL(pl, dst)
214 }
215
216 func (d *dataStoreData) get(ns string, dst interface{}) error {
217 return getInner(ns, d.KindNameResolver(), dst, func(*datastore.Key) (*me mCollection, error) {
218 d.rwlock.RLock()
219 s := d.store.Snapshot()
220 d.rwlock.RUnlock()
221
222 return s.GetCollection("ents:" + ns), nil
223 })
224 }
225
226 func (d *dataStoreData) del(ns string, key *datastore.Key) error {
227 if !KeyValid(ns, key, UserKeyOnly) {
228 return datastore.ErrInvalidKey
229 }
230
231 keyBuf := keyBytes(noNS, key)
232
233 d.Lock()
M-A Ruel 2015/05/29 16:16:18 I'd prefer to always use d.rwlock.Lock() for consi
iannucci 2015/05/29 16:33:14 done
234 defer d.Unlock()
235
236 ents := d.store.GetCollection("ents:" + ns)
237 if ents == nil {
238 return nil
239 }
240 if _, err := incrementLocked(ents, groupMetaKey(key)); err != nil {
241 return err
242 }
243
244 ents.Delete(keyBuf)
245 return nil
246 }
247
248 ///////////////////////// memContextObj(dataStoreData) /////////////////////////
249
250 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
251 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
252
253 txn := obj.(*txnDataStoreData)
254 for rk, muts := range txn.muts {
255 if len(muts) == 0 { // read-only
256 continue
257 }
258 k, err := keyFromByteString(withNS, rk)
259 if err != nil {
260 panic(err)
261 }
262 entKey := "ents:" + k.Namespace()
263 mkey := groupMetaKey(k)
264 entsHead := d.store.GetCollection(entKey)
265 entsSnap := txn.snap.GetCollection(entKey)
266 vHead, err := curVersion(entsHead, mkey)
267 if err != nil {
268 panic(err)
269 }
270 vSnap, err := curVersion(entsSnap, mkey)
271 if err != nil {
272 panic(err)
273 }
274 if vHead != vSnap {
275 return false
276 }
277 }
278 return true
279 }
280
281 func (d *dataStoreData) applyTxn(r *rand.Rand, obj memContextObj) {
282 txn := obj.(*txnDataStoreData)
283 for _, muts := range txn.muts {
284 if len(muts) == 0 { // read-only
285 continue
286 }
287 for _, m := range muts {
288 err := error(nil)
289 if m.data == nil {
290 err = d.del(m.key.Namespace(), m.key)
291 } else {
292 _, err = d.putInner(m.key, m.data)
293 }
294 if err != nil {
295 panic(err)
296 }
297 }
298 }
299 }
300
301 func (d *dataStoreData) mkTxn(o *datastore.TransactionOptions) (memContextObj, e rror) {
302 return &txnDataStoreData{
303 // alias to the main datastore's so that testing code can have p rimitive
304 // access to break features inside of transactions.
305 BrokenFeatures: &d.BrokenFeatures,
306 parent: d,
307 knrKeeper: knrKeeper{knrFunc: d.knrFunc},
308 isXG: o != nil && o.XG,
309 snap: d.store.Snapshot(),
310 muts: map[string][]txnMutation{},
311 }, nil
312 }
313
314 func (d *dataStoreData) endTxn() {}
315
316 /////////////////////////////// txnDataStoreData ///////////////////////////////
317 type txnMutation struct {
318 key *datastore.Key
319 data *propertyList
320 }
321
322 type txnDataStoreData struct {
323 *wrapper.BrokenFeatures
324 knrKeeper
325 sync.Mutex
326
327 parent *dataStoreData
328
329 // boolean 0 or 1, use atomic.*Int32 to access.
330 closed int32
331 isXG bool
332
333 snap *memStore
334
335 // string is the raw-bytes encoding of the entity root incl. namespace
336 muts map[string][]txnMutation
337 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
338 // length of encoded keys + values.
339 }
340
341 const xgEGLimit = 25
342
343 /////////////////////// memContextObj(txnDataStoreData) ////////////////////////
344
345 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
346 func (td *txnDataStoreData) endTxn() {
347 if atomic.LoadInt32(&td.closed) == 1 {
348 panic("cannot end transaction twice")
349 }
350 atomic.StoreInt32(&td.closed, 1)
351 }
352 func (*txnDataStoreData) applyTxn(*rand.Rand, memContextObj) {
353 panic("txnDataStoreData cannot apply transactions")
354 }
355 func (*txnDataStoreData) mkTxn(*datastore.TransactionOptions) (memContextObj, er ror) {
356 return nil, errors.New("datastore: nested transactions are not supported ")
357 }
358
359 /////////////////// wrapper.BrokenFeatures(txnDataStoreData) ///////////////////
360
361 func (td *txnDataStoreData) IsBroken() error {
362 // Slightly different from the SDK... datastore and taskqueue each imple ment
363 // this here, where in the SDK only datastore.transaction.Call does.
364 if atomic.LoadInt32(&td.closed) == 1 {
365 return errors.New("datastore: transaction context has expired")
366 }
367 return td.BrokenFeatures.IsBroken()
368 }
369
370 // writeMutation ensures that this transaction can support the given key/value
371 // mutation.
372 //
373 // if getOnly is true, don't record the actual mutation data, just ensure that
374 // the key is in an included entity group (or add an empty entry for tha t
375 // group).
376 //
377 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
378 //
379 // Returns an error if this key causes the transaction to cross too many entity
380 // groups.
381 func (td *txnDataStoreData) writeMutation(getOnly bool, key *datastore.Key, data *propertyList) error {
382 rk := string(keyBytes(withNS, rootKey(key)))
383
384 td.Lock()
385 defer td.Unlock()
386
387 if _, ok := td.muts[rk]; !ok {
388 limit := 1
389 if td.isXG {
390 limit = xgEGLimit
391 }
392 if len(td.muts)+1 > limit {
393 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
394 if td.isXG {
395 msg = "operating on too many entity groups in a single transaction"
396 }
397 return newDSError(pb.Error_BAD_REQUEST, msg)
398 }
399 td.muts[rk] = []txnMutation{}
400 }
401 if !getOnly {
402 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
403 }
404
405 return nil
406 }
407
408 func (td *txnDataStoreData) put(ns string, src interface{}) (*datastore.Key, err or) {
409 key, plData, err := putPrelim(ns, td.KindNameResolver(), src)
410 if err != nil {
411 return nil, err
412 }
413
414 func() {
415 td.parent.Lock()
416 defer td.parent.Unlock()
417 _, key, err = td.parent.entsKeyLocked(key)
418 }()
419 if err != nil {
420 return nil, err
421 }
422
423 if err = td.writeMutation(false, key, plData); err != nil {
424 return nil, err
425 }
426
427 return key, goon_internal.SetStructKey(src, key, td.KindNameResolver())
428 }
429
430 func (td *txnDataStoreData) get(ns string, dst interface{}) error {
431 return getInner(ns, td.KindNameResolver(), dst, func(key *datastore.Key) (*memCollection, error) {
432 if err := td.writeMutation(true, key, nil); err != nil {
433 return nil, err
434 }
435 return td.snap.GetCollection("ents:" + ns), nil
436 })
437 }
438
439 func (td *txnDataStoreData) del(ns string, key *datastore.Key) error {
440 if !KeyValid(ns, key, UserKeyOnly) {
441 return datastore.ErrInvalidKey
442 }
443 return td.writeMutation(false, key, nil)
444 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698