Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Side by Side Diff: memory/raw_datastore_data.go

Issue 1243323002: Refactor a bit. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fix golint Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « memory/raw_datastore.go ('k') | memory/raw_datastore_query.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "sync"
12 "sync/atomic"
13
14 "golang.org/x/net/context"
15
16 "github.com/luci/gae"
17 "github.com/luci/gae/helper"
18 )
19
20 //////////////////////////////// dataStoreData /////////////////////////////////
21
22 type dataStoreData struct {
23 rwlock sync.RWMutex
24 // See README.md for store schema.
25 store *memStore
26 snap *memStore
27 }
28
29 var (
30 _ = memContextObj((*dataStoreData)(nil))
31 _ = sync.Locker((*dataStoreData)(nil))
32 )
33
34 func newDataStoreData() *dataStoreData {
35 store := newMemStore()
36 return &dataStoreData{
37 store: store,
38 snap: store.Snapshot(), // empty but better than a nil pointer.
39 }
40 }
41
42 func (d *dataStoreData) Lock() {
43 d.rwlock.Lock()
44 }
45
46 func (d *dataStoreData) Unlock() {
47 d.rwlock.Unlock()
48 }
49
50 /////////////////////////// indicies(dataStoreData) ////////////////////////////
51
52 func groupMetaKey(key gae.DSKey) []byte {
53 return keyBytes(helper.WithoutContext,
54 helper.NewDSKey("", "", "__entity_group__", "", 1, helper.DSKeyR oot(key)))
55 }
56
57 func groupIDsKey(key gae.DSKey) []byte {
58 return keyBytes(helper.WithoutContext,
59 helper.NewDSKey("", "", "__entity_group_ids__", "", 1, helper.DS KeyRoot(key)))
60 }
61
62 func rootIDsKey(kind string) []byte {
63 return keyBytes(helper.WithoutContext,
64 helper.NewDSKey("", "", "__entity_root_ids__", kind, 0, nil))
65 }
66
67 func curVersion(ents *memCollection, key []byte) int64 {
68 if v := ents.Get(key); v != nil {
69 pm, err := rpm(v)
70 if err != nil {
71 panic(err) // memory corruption
72 }
73 pl, ok := pm["__version__"]
74 if ok && len(pl) > 0 && pl[0].Type() == gae.DSPTInt {
75 return pl[0].Value().(int64)
76 }
77 panic(fmt.Errorf("__version__ property missing or wrong: %v", pm ))
78 }
79 return 0
80 }
81
82 func incrementLocked(ents *memCollection, key []byte) int64 {
83 ret := curVersion(ents, key) + 1
84 buf := &bytes.Buffer{}
85 helper.WriteDSPropertyMap(
86 buf, gae.DSPropertyMap{"__version__": {gae.MkDSPropertyNI(ret)}} , helper.WithContext)
87 ents.Set(key, buf.Bytes())
88 return ret
89 }
90
91 func (d *dataStoreData) entsKeyLocked(key gae.DSKey) (*memCollection, gae.DSKey) {
92 coll := "ents:" + key.Namespace()
93 ents := d.store.GetCollection(coll)
94 if ents == nil {
95 ents = d.store.SetCollection(coll, nil)
96 }
97
98 if helper.DSKeyIncomplete(key) {
99 idKey := []byte(nil)
100 if key.Parent() == nil {
101 idKey = rootIDsKey(key.Kind())
102 } else {
103 idKey = groupIDsKey(key)
104 }
105 id := incrementLocked(ents, idKey)
106 key = helper.NewDSKey(key.AppID(), key.Namespace(), key.Kind(), "", id, key.Parent())
107 }
108
109 return ents, key
110 }
111
112 func (d *dataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave r) (gae.DSKey, error) {
113 keys, errs := d.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSaver {pls})
114 if errs == nil {
115 return keys[0], nil
116 }
117 return nil, gae.SingleError(errs)
118 }
119
120 func (d *dataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope rtyLoadSaver) ([]gae.DSKey, error) {
121 pmaps, err := putMultiPrelim(ns, keys, plss)
122 if err != nil {
123 return nil, err
124 }
125 return d.putMultiInner(keys, pmaps)
126 }
127
128 func putMultiPrelim(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver) ([]gae.DSPropertyMap, error) {
129 err := multiValid(keys, plss, ns, true, false)
130 if err != nil {
131 return nil, err
132 }
133 pmaps := make([]gae.DSPropertyMap, len(keys))
134 lme := gae.LazyMultiError{Size: len(keys)}
135 for i, pls := range plss {
136 pm, err := pls.Save(false)
137 lme.Assign(i, err)
138 pmaps[i] = pm
139 }
140 return pmaps, lme.Get()
141 }
142
143 func (d *dataStoreData) putMultiInner(keys []gae.DSKey, data []gae.DSPropertyMap ) ([]gae.DSKey, error) {
144 retKeys := make([]gae.DSKey, len(keys))
145 lme := gae.LazyMultiError{Size: len(keys)}
146 for i, k := range keys {
147 buf := &bytes.Buffer{}
148 helper.WriteDSPropertyMap(buf, data[i], helper.WithoutContext)
149 dataBytes := buf.Bytes()
150
151 rKey, err := func() (ret gae.DSKey, err error) {
152 d.rwlock.Lock()
153 defer d.rwlock.Unlock()
154
155 ents, ret := d.entsKeyLocked(k)
156 incrementLocked(ents, groupMetaKey(ret))
157
158 old := ents.Get(keyBytes(helper.WithoutContext, ret))
159 oldPM := gae.DSPropertyMap(nil)
160 if old != nil {
161 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
162 return
163 }
164 }
165 updateIndicies(d.store, ret, oldPM, data[i])
166 ents.Set(keyBytes(helper.WithoutContext, ret), dataBytes )
167 return
168 }()
169 lme.Assign(i, err)
170 retKeys[i] = rKey
171 }
172 return retKeys, lme.Get()
173 }
174
175 func getMultiInner(ns string, keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, getColl func() (*memCollection, error)) error {
176 if err := multiValid(keys, plss, ns, false, true); err != nil {
177 return err
178 }
179
180 lme := gae.LazyMultiError{Size: len(keys)}
181
182 ents, err := getColl()
183 if err != nil {
184 return err
185 }
186 if ents == nil {
187 for i := range keys {
188 lme.Assign(i, gae.ErrDSNoSuchEntity)
189 }
190 return lme.Get()
191 }
192
193 for i, k := range keys {
194 pdata := ents.Get(keyBytes(helper.WithoutContext, k))
195 if pdata == nil {
196 lme.Assign(i, gae.ErrDSNoSuchEntity)
197 continue
198 }
199
200 got, err := rpmWoCtx(pdata, ns)
201 if err != nil {
202 lme.Assign(i, err)
203 continue
204 }
205
206 lme.Assign(i, plss[i].Load(got))
207 }
208 return lme.Get()
209 }
210
211 func (d *dataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoadSave r) error {
212 return gae.SingleError(d.getMulti(ns, []gae.DSKey{key}, []gae.DSProperty LoadSaver{pls}))
213 }
214
215 func (d *dataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSPrope rtyLoadSaver) error {
216 return getMultiInner(ns, keys, plss, func() (*memCollection, error) {
217 d.rwlock.RLock()
218 s := d.store.Snapshot()
219 d.rwlock.RUnlock()
220
221 return s.GetCollection("ents:" + ns), nil
222 })
223 }
224
225 func (d *dataStoreData) del(ns string, key gae.DSKey) (err error) {
226 return gae.SingleError(d.delMulti(ns, []gae.DSKey{key}))
227 }
228
229 func (d *dataStoreData) delMulti(ns string, keys []gae.DSKey) error {
230 lme := gae.LazyMultiError{Size: len(keys)}
231 toDel := make([][]byte, 0, len(keys))
232 for i, k := range keys {
233 if !helper.DSKeyValid(k, ns, false) {
234 lme.Assign(i, gae.ErrDSInvalidKey)
235 continue
236 }
237 toDel = append(toDel, keyBytes(helper.WithoutContext, k))
238 }
239 err := lme.Get()
240 if err != nil {
241 return err
242 }
243
244 d.rwlock.Lock()
245 defer d.rwlock.Unlock()
246
247 ents := d.store.GetCollection("ents:" + ns)
248 if ents == nil {
249 return nil
250 }
251
252 for i, k := range keys {
253 incrementLocked(ents, groupMetaKey(k))
254 kb := toDel[i]
255 old := ents.Get(kb)
256 oldPM := gae.DSPropertyMap(nil)
257 if old != nil {
258 if oldPM, err = rpmWoCtx(old, ns); err != nil {
259 lme.Assign(i, err)
260 continue
261 }
262 }
263 updateIndicies(d.store, k, oldPM, nil)
264 ents.Delete(kb)
265 }
266 return lme.Get()
267 }
268
269 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
270 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
271
272 txn := obj.(*txnDataStoreData)
273 for rk, muts := range txn.muts {
274 if len(muts) == 0 { // read-only
275 continue
276 }
277 k, err := helper.ReadDSKey(bytes.NewBufferString(rk), helper.Wit hContext, "", "")
278 if err != nil {
279 panic(err)
280 }
281
282 entKey := "ents:" + k.Namespace()
283 mkey := groupMetaKey(k)
284 entsHead := d.store.GetCollection(entKey)
285 entsSnap := txn.snap.GetCollection(entKey)
286 vHead := curVersion(entsHead, mkey)
287 vSnap := curVersion(entsSnap, mkey)
288 if vHead != vSnap {
289 return false
290 }
291 }
292 return true
293 }
294
295 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
296 txn := obj.(*txnDataStoreData)
297 for _, muts := range txn.muts {
298 if len(muts) == 0 { // read-only
299 continue
300 }
301 for _, m := range muts {
302 err := error(nil)
303 if m.data == nil {
304 err = d.del(m.key.Namespace(), m.key)
305 } else {
306 _, err = d.put(m.key.Namespace(), m.key, m.data)
307 }
308 if err != nil {
309 panic(err)
310 }
311 }
312 }
313 }
314
315 func (d *dataStoreData) mkTxn(o *gae.DSTransactionOptions) memContextObj {
316 return &txnDataStoreData{
317 // alias to the main datastore's so that testing code can have p rimitive
318 // access to break features inside of transactions.
319 parent: d,
320 isXG: o != nil && o.XG,
321 snap: d.store.Snapshot(),
322 muts: map[string][]txnMutation{},
323 }
324 }
325
326 func (d *dataStoreData) endTxn() {}
327
328 /////////////////////////////// txnDataStoreData ///////////////////////////////
329
330 type txnMutation struct {
331 key gae.DSKey
332 data gae.DSPropertyMap
333 }
334
335 type txnDataStoreData struct {
336 sync.Mutex
337
338 parent *dataStoreData
339
340 // boolean 0 or 1, use atomic.*Int32 to access.
341 closed int32
342 isXG bool
343
344 snap *memStore
345
346 // string is the raw-bytes encoding of the entity root incl. namespace
347 muts map[string][]txnMutation
348 // TODO(riannucci): account for 'transaction size' limit of 10MB by summ ing
349 // length of encoded keys + values.
350 }
351
352 var _ memContextObj = (*txnDataStoreData)(nil)
353
354 const xgEGLimit = 25
355
356 func (*txnDataStoreData) canApplyTxn(memContextObj) bool { return false }
357 func (td *txnDataStoreData) endTxn() {
358 if atomic.LoadInt32(&td.closed) == 1 {
359 panic("cannot end transaction twice")
360 }
361 atomic.StoreInt32(&td.closed, 1)
362 }
363 func (*txnDataStoreData) applyTxn(context.Context, memContextObj) {
364 panic("txnDataStoreData cannot apply transactions")
365 }
366 func (*txnDataStoreData) mkTxn(*gae.DSTransactionOptions) memContextObj {
367 panic("impossible")
368 }
369
370 func (td *txnDataStoreData) run(f func() error) error {
371 // Slightly different from the SDK... datastore and taskqueue each imple ment
372 // this here, where in the SDK only datastore.transaction.Call does.
373 if atomic.LoadInt32(&td.closed) == 1 {
374 return errors.New("datastore: transaction context has expired")
375 }
376 return f()
377 }
378
379 // writeMutation ensures that this transaction can support the given key/value
380 // mutation.
381 //
382 // if getOnly is true, don't record the actual mutation data, just ensure that
383 // the key is in an included entity group (or add an empty entry for tha t
384 // group).
385 //
386 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
387 //
388 // Returns an error if this key causes the transaction to cross too many entity
389 // groups.
390 func (td *txnDataStoreData) writeMutation(getOnly bool, key gae.DSKey, data gae. DSPropertyMap) error {
391 rk := string(keyBytes(helper.WithContext, helper.DSKeyRoot(key)))
392
393 td.Lock()
394 defer td.Unlock()
395
396 if _, ok := td.muts[rk]; !ok {
397 limit := 1
398 if td.isXG {
399 limit = xgEGLimit
400 }
401 if len(td.muts)+1 > limit {
402 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
403 if td.isXG {
404 msg = "operating on too many entity groups in a single transaction"
405 }
406 return errors.New(msg)
407 }
408 td.muts[rk] = []txnMutation{}
409 }
410 if !getOnly {
411 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
412 }
413
414 return nil
415 }
416
417 func (td *txnDataStoreData) put(ns string, key gae.DSKey, pls gae.DSPropertyLoad Saver) (gae.DSKey, error) {
418 keys, errs := td.putMulti(ns, []gae.DSKey{key}, []gae.DSPropertyLoadSave r{pls})
419 if errs == nil {
420 return keys[0], nil
421 }
422 return nil, gae.SingleError(errs)
423 }
424
425 func (td *txnDataStoreData) putMulti(ns string, keys []gae.DSKey, plss []gae.DSP ropertyLoadSaver) ([]gae.DSKey, error) {
426 pmaps, err := putMultiPrelim(ns, keys, plss)
427 if err != nil {
428 return nil, err
429 }
430
431 retKeys := make([]gae.DSKey, len(keys))
432 lme := gae.LazyMultiError{Size: len(keys)}
433 for i, k := range keys {
434 func() {
435 td.parent.Lock()
436 defer td.parent.Unlock()
437 _, k = td.parent.entsKeyLocked(k)
438 }()
439 lme.Assign(i, td.writeMutation(false, k, pmaps[i]))
440 retKeys[i] = k
441 }
442
443 return retKeys, lme.Get()
444 }
445
446 func (td *txnDataStoreData) get(ns string, key gae.DSKey, pls gae.DSPropertyLoad Saver) error {
447 return gae.SingleError(td.getMulti(ns, []gae.DSKey{key}, []gae.DSPropert yLoadSaver{pls}))
448 }
449
450 func (td *txnDataStoreData) getMulti(ns string, keys []gae.DSKey, plss []gae.DSP ropertyLoadSaver) error {
451 return getMultiInner(ns, keys, plss, func() (*memCollection, error) {
452 lme := gae.LazyMultiError{Size: len(keys)}
453 for i, k := range keys {
454 lme.Assign(i, td.writeMutation(true, k, nil))
455 }
456 return td.snap.GetCollection("ents:" + ns), lme.Get()
457 })
458 }
459
460 func (td *txnDataStoreData) del(ns string, key gae.DSKey) error {
461 return gae.SingleError(td.delMulti(ns, []gae.DSKey{key}))
462 }
463
464 func (td *txnDataStoreData) delMulti(ns string, keys []gae.DSKey) error {
465 lme := gae.LazyMultiError{Size: len(keys)}
466 for i, k := range keys {
467 if !helper.DSKeyValid(k, ns, false) {
468 lme.Assign(i, gae.ErrDSInvalidKey)
469 } else {
470 lme.Assign(i, td.writeMutation(false, k, nil))
471 }
472 }
473 return lme.Get()
474 }
475
476 func keyBytes(ctx helper.DSKeyContext, key gae.DSKey) []byte {
477 buf := &bytes.Buffer{}
478 helper.WriteDSKey(buf, ctx, key)
479 return buf.Bytes()
480 }
481
482 func rpmWoCtx(data []byte, ns string) (gae.DSPropertyMap, error) {
483 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithoutCon text, globalAppID, ns)
484 }
485
486 func rpm(data []byte) (gae.DSPropertyMap, error) {
487 return helper.ReadDSPropertyMap(bytes.NewBuffer(data), helper.WithContex t, "", "")
488 }
489
490 func multiValid(keys []gae.DSKey, plss []gae.DSPropertyLoadSaver, ns string, pot entialKey, allowSpecial bool) error {
491 vfn := func(k gae.DSKey) bool {
492 return !helper.DSKeyIncomplete(k) && helper.DSKeyValid(k, ns, al lowSpecial)
493 }
494 if potentialKey {
495 vfn = func(k gae.DSKey) bool {
496 // adds an id to k if it's incomplete.
497 if helper.DSKeyIncomplete(k) {
498 k = helper.NewDSKey(k.AppID(), k.Namespace(), k. Kind(), "", 1, k.Parent())
499 }
500 return helper.DSKeyValid(k, ns, allowSpecial)
501 }
502 }
503
504 if keys == nil || plss == nil {
505 return errors.New("gae: key or plss slices were nil")
506 }
507 if len(keys) != len(plss) {
508 return errors.New("gae: key and dst slices have different length ")
509 }
510 lme := gae.LazyMultiError{Size: len(keys)}
511 for i, k := range keys {
512 if !vfn(k) {
513 lme.Assign(i, gae.ErrDSInvalidKey)
514 }
515 }
516 return lme.Get()
517 }
OLDNEW
« no previous file with comments | « memory/raw_datastore.go ('k') | memory/raw_datastore_query.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698