Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: make data flow clearer, implement Count Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 ) 17 )
18 18
19 //////////////////////////////// dataStoreData ///////////////////////////////// 19 //////////////////////////////// dataStoreData /////////////////////////////////
20 20
21 type dataStoreData struct { 21 type dataStoreData struct {
22 rwlock sync.RWMutex 22 rwlock sync.RWMutex
23
24 // the 'appid' of this datastore
25 aid string
26
23 // See README.md for head schema. 27 // See README.md for head schema.
24 head *memStore 28 head *memStore
29 // if snap is nil, that means that this is always-consistent, and
30 // getQuerySnaps will return (head, head)
25 snap *memStore 31 snap *memStore
26 // For testing, see SetTransactionRetryCount. 32 // For testing, see SetTransactionRetryCount.
27 txnFakeRetry int 33 txnFakeRetry int
28 // true means that head always == snap
29 consistent bool
30 // true means that queries with insufficent indexes will pause to add th em 34 // true means that queries with insufficent indexes will pause to add th em
31 // and then continue instead of failing. 35 // and then continue instead of failing.
32 autoIndex bool 36 autoIndex bool
33 // true means that all of the __...__ keys which are normally automatica lly 37 // true means that all of the __...__ keys which are normally automatica lly
34 // maintained will be omitted. This also means that Put with an incomple te 38 // maintained will be omitted. This also means that Put with an incomple te
35 // key will become an error. 39 // key will become an error.
36 disableSpecialEntities bool 40 disableSpecialEntities bool
37 } 41 }
38 42
39 var ( 43 var (
40 _ = memContextObj((*dataStoreData)(nil)) 44 _ = memContextObj((*dataStoreData)(nil))
41 _ = sync.Locker((*dataStoreData)(nil)) 45 _ = sync.Locker((*dataStoreData)(nil))
42 ) 46 )
43 47
44 func newDataStoreData() *dataStoreData { 48 func newDataStoreData(aid string) *dataStoreData {
45 head := newMemStore() 49 head := newMemStore()
46 return &dataStoreData{ 50 return &dataStoreData{
51 aid: aid,
47 head: head, 52 head: head,
48 snap: head.Snapshot(), // empty but better than a nil pointer. 53 snap: head.Snapshot(), // empty but better than a nil pointer.
49 } 54 }
50 } 55 }
51 56
52 func (d *dataStoreData) Lock() { 57 func (d *dataStoreData) Lock() {
53 d.rwlock.Lock() 58 d.rwlock.Lock()
54 } 59 }
55 60
56 func (d *dataStoreData) Unlock() { 61 func (d *dataStoreData) Unlock() {
57 d.rwlock.Unlock() 62 d.rwlock.Unlock()
58 } 63 }
59 64
60 func (d *dataStoreData) setTxnRetry(count int) { 65 func (d *dataStoreData) setTxnRetry(count int) {
61 d.Lock() 66 d.Lock()
62 defer d.Unlock() 67 defer d.Unlock()
63 d.txnFakeRetry = count 68 d.txnFakeRetry = count
64 } 69 }
65 70
66 func (d *dataStoreData) setConsistent(always bool) { 71 func (d *dataStoreData) setConsistent(always bool) {
67 d.Lock() 72 d.Lock()
68 defer d.Unlock() 73 defer d.Unlock()
69 74
70 » d.consistent = always 75 » if always {
71 » if d.consistent { 76 » » d.snap = nil
77 » } else {
72 d.snap = d.head.Snapshot() 78 d.snap = d.head.Snapshot()
73 } 79 }
74 } 80 }
75 81
76 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { 82 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) {
77 d.Lock() 83 d.Lock()
78 defer d.Unlock() 84 defer d.Unlock()
79 » addIndexes(d.head, ns, idxs) 85 » addIndexes(d.head, d.aid, ns, idxs)
80 » if d.consistent {
81 » » d.snap = d.head.Snapshot()
82 » }
83 } 86 }
84 87
85 func (d *dataStoreData) setAutoIndex(enable bool) { 88 func (d *dataStoreData) setAutoIndex(enable bool) {
86 d.Lock() 89 d.Lock()
87 defer d.Unlock() 90 defer d.Unlock()
88 d.autoIndex = enable 91 d.autoIndex = enable
89 } 92 }
90 93
91 func (d *dataStoreData) maybeAutoIndex(err error) bool { 94 func (d *dataStoreData) maybeAutoIndex(err error) bool {
92 mi, ok := err.(*ErrMissingIndex) 95 mi, ok := err.(*ErrMissingIndex)
(...skipping 21 matching lines...) Expand all
114 117
115 func (d *dataStoreData) getDisableSpecialEntities() bool { 118 func (d *dataStoreData) getDisableSpecialEntities() bool {
116 d.rwlock.RLock() 119 d.rwlock.RLock()
117 defer d.rwlock.RUnlock() 120 defer d.rwlock.RUnlock()
118 return d.disableSpecialEntities 121 return d.disableSpecialEntities
119 } 122 }
120 123
121 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { 124 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
122 d.rwlock.RLock() 125 d.rwlock.RLock()
123 defer d.rwlock.RUnlock() 126 defer d.rwlock.RUnlock()
124 » if d.consistent { 127 » if d.snap == nil {
125 » » // snap is already a consistent snapshot of head 128 » » // we're 'always consistent'
126 » » return d.snap, d.snap 129 » » return d.head, d.head
127 } 130 }
128 131
129 head = d.head.Snapshot() 132 head = d.head.Snapshot()
130 if consistent { 133 if consistent {
131 idx = head 134 idx = head
132 } else { 135 } else {
133 idx = d.snap 136 idx = d.snap
134 } 137 }
135 return 138 return
136 } 139 }
137 140
138 func (d *dataStoreData) takeSnapshot() *memStore { 141 func (d *dataStoreData) takeSnapshot() *memStore {
139 d.rwlock.RLock() 142 d.rwlock.RLock()
140 defer d.rwlock.RUnlock() 143 defer d.rwlock.RUnlock()
141 if d.consistent {
142 return d.snap
143 }
144 return d.head.Snapshot() 144 return d.head.Snapshot()
145 } 145 }
146 146
147 func (d *dataStoreData) setSnapshot(snap *memStore) { 147 func (d *dataStoreData) setSnapshot(snap *memStore) {
148 d.rwlock.Lock() 148 d.rwlock.Lock()
149 defer d.rwlock.Unlock() 149 defer d.rwlock.Unlock()
150 » if d.consistent { 150 » if d.snap == nil {
151 » » // we're 'always consistent'
151 return 152 return
152 } 153 }
153 d.snap = snap 154 d.snap = snap
154 } 155 }
155 156
156 func (d *dataStoreData) catchupIndexes() { 157 func (d *dataStoreData) catchupIndexes() {
157 d.rwlock.Lock() 158 d.rwlock.Lock()
158 defer d.rwlock.Unlock() 159 defer d.rwlock.Unlock()
159 » if d.consistent { 160 » if d.snap == nil {
161 » » // we're 'always consistent'
160 return 162 return
161 } 163 }
162 d.snap = d.head.Snapshot() 164 d.snap = d.head.Snapshot()
163 } 165 }
164 166
165 /////////////////////////// indexes(dataStoreData) //////////////////////////// 167 /////////////////////////// indexes(dataStoreData) ////////////////////////////
166 168
167 func groupMetaKey(key *ds.Key) []byte { 169 func groupMetaKey(key *ds.Key) []byte {
168 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) 170 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) )
169 } 171 }
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
265 if err != nil { 267 if err != nil {
266 return 268 return
267 } 269 }
268 if !d.disableSpecialEntities { 270 if !d.disableSpecialEntities {
269 incrementLocked(ents, groupMetaKey(ret), 1) 271 incrementLocked(ents, groupMetaKey(ret), 1)
270 } 272 }
271 273
272 old := ents.Get(keyBytes(ret)) 274 old := ents.Get(keyBytes(ret))
273 oldPM := ds.PropertyMap(nil) 275 oldPM := ds.PropertyMap(nil)
274 if old != nil { 276 if old != nil {
275 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { 277 » » » » if oldPM, err = rpm(old); err != nil {
276 return 278 return
277 } 279 }
278 } 280 }
279 updateIndexes(d.head, ret, oldPM, pmap) 281 updateIndexes(d.head, ret, oldPM, pmap)
280 ents.Set(keyBytes(ret), dataBytes) 282 ents.Set(keyBytes(ret), dataBytes)
281 if d.consistent {
282 d.snap = d.head.Snapshot()
283 }
284 return 283 return
285 }() 284 }()
286 if cb != nil { 285 if cb != nil {
287 cb(k, err) 286 cb(k, err)
288 } 287 }
289 } 288 }
290 } 289 }
291 290
292 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { 291 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error {
293 ents, err := getColl() 292 ents, err := getColl()
294 if err != nil { 293 if err != nil {
295 return err 294 return err
296 } 295 }
297 if ents == nil { 296 if ents == nil {
298 for range keys { 297 for range keys {
299 cb(nil, ds.ErrNoSuchEntity) 298 cb(nil, ds.ErrNoSuchEntity)
300 } 299 }
301 return nil 300 return nil
302 } 301 }
303 302
304 for _, k := range keys { 303 for _, k := range keys {
305 pdata := ents.Get(keyBytes(k)) 304 pdata := ents.Get(keyBytes(k))
306 if pdata == nil { 305 if pdata == nil {
307 cb(nil, ds.ErrNoSuchEntity) 306 cb(nil, ds.ErrNoSuchEntity)
308 continue 307 continue
309 } 308 }
310 » » cb(rpmWoCtx(pdata, k.Namespace())) 309 » » cb(rpm(pdata))
311 } 310 }
312 return nil 311 return nil
313 } 312 }
314 313
315 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { 314 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
316 return getMultiInner(keys, cb, func() (*memCollection, error) { 315 return getMultiInner(keys, cb, func() (*memCollection, error) {
317 s := d.takeSnapshot() 316 s := d.takeSnapshot()
318 317
319 return s.GetCollection("ents:" + keys[0].Namespace()), nil 318 return s.GetCollection("ents:" + keys[0].Namespace()), nil
320 }) 319 })
321 } 320 }
322 321
323 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { 322 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
324 ns := keys[0].Namespace() 323 ns := keys[0].Namespace()
325 ents := d.mutableEnts(ns) 324 ents := d.mutableEnts(ns)
326 325
327 if ents != nil { 326 if ents != nil {
328 for _, k := range keys { 327 for _, k := range keys {
329 err := func() error { 328 err := func() error {
330 kb := keyBytes(k) 329 kb := keyBytes(k)
331 330
332 d.Lock() 331 d.Lock()
333 defer d.Unlock() 332 defer d.Unlock()
334 333
335 if !d.disableSpecialEntities { 334 if !d.disableSpecialEntities {
336 incrementLocked(ents, groupMetaKey(k), 1 ) 335 incrementLocked(ents, groupMetaKey(k), 1 )
337 } 336 }
338 if old := ents.Get(kb); old != nil { 337 if old := ents.Get(kb); old != nil {
339 » » » » » oldPM, err := rpmWoCtx(old, ns) 338 » » » » » oldPM, err := rpm(old)
340 if err != nil { 339 if err != nil {
341 return err 340 return err
342 } 341 }
343 updateIndexes(d.head, k, oldPM, nil) 342 updateIndexes(d.head, k, oldPM, nil)
344 ents.Delete(kb) 343 ents.Delete(kb)
345 if d.consistent {
346 d.snap = d.head.Snapshot()
347 }
348 } 344 }
349 return nil 345 return nil
350 }() 346 }()
351 if cb != nil { 347 if cb != nil {
352 cb(err) 348 cb(err)
353 } 349 }
354 } 350 }
355 } else if cb != nil { 351 } else if cb != nil {
356 for range keys { 352 for range keys {
357 cb(nil) 353 cb(nil)
(...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after
549 cb(err) 545 cb(err)
550 } 546 }
551 } 547 }
552 return nil 548 return nil
553 } 549 }
554 550
555 func keyBytes(key *ds.Key) []byte { 551 func keyBytes(key *ds.Key) []byte {
556 return serialize.ToBytes(ds.MkProperty(key)) 552 return serialize.ToBytes(ds.MkProperty(key))
557 } 553 }
558 554
559 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
560 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
561 serialize.WithoutContext, globalAppID, ns)
562 }
563
564 func rpm(data []byte) (ds.PropertyMap, error) { 555 func rpm(data []byte) (ds.PropertyMap, error) {
565 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
566 serialize.WithContext, "", "") 557 serialize.WithContext, "", "")
567 } 558 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698