Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: one more test Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 ) 17 )
18 18
19 //////////////////////////////// dataStoreData ///////////////////////////////// 19 //////////////////////////////// dataStoreData /////////////////////////////////
20 20
21 type dataStoreData struct { 21 type dataStoreData struct {
22 rwlock sync.RWMutex 22 rwlock sync.RWMutex
23 // See README.md for head schema. 23 // See README.md for head schema.
24 head *memStore 24 head *memStore
25 // if snap is nil, that means that this is always-consistent, and
26 // getQuerySnaps will return (head, head)
25 snap *memStore 27 snap *memStore
26 // For testing, see SetTransactionRetryCount. 28 // For testing, see SetTransactionRetryCount.
27 txnFakeRetry int 29 txnFakeRetry int
28 // true means that head always == snap
29 consistent bool
iannucci 2015/09/29 03:21:38 This change means that we avoid taking a snapshot
30 // true means that queries with insufficent indexes will pause to add th em 30 // true means that queries with insufficent indexes will pause to add th em
31 // and then continue instead of failing. 31 // and then continue instead of failing.
32 autoIndex bool 32 autoIndex bool
33 // true means that all of the __...__ keys which are normally automatica lly 33 // true means that all of the __...__ keys which are normally automatica lly
34 // maintained will be omitted. This also means that Put with an incomple te 34 // maintained will be omitted. This also means that Put with an incomple te
35 // key will become an error. 35 // key will become an error.
36 disableSpecialEntities bool 36 disableSpecialEntities bool
37 } 37 }
38 38
39 var ( 39 var (
(...skipping 20 matching lines...) Expand all
60 func (d *dataStoreData) setTxnRetry(count int) { 60 func (d *dataStoreData) setTxnRetry(count int) {
61 d.Lock() 61 d.Lock()
62 defer d.Unlock() 62 defer d.Unlock()
63 d.txnFakeRetry = count 63 d.txnFakeRetry = count
64 } 64 }
65 65
66 func (d *dataStoreData) setConsistent(always bool) { 66 func (d *dataStoreData) setConsistent(always bool) {
67 d.Lock() 67 d.Lock()
68 defer d.Unlock() 68 defer d.Unlock()
69 69
70 » d.consistent = always 70 » if always {
71 » if d.consistent { 71 » » d.snap = nil
72 » } else {
72 d.snap = d.head.Snapshot() 73 d.snap = d.head.Snapshot()
73 } 74 }
74 } 75 }
75 76
76 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { 77 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) {
77 d.Lock() 78 d.Lock()
78 defer d.Unlock() 79 defer d.Unlock()
79 addIndexes(d.head, ns, idxs) 80 addIndexes(d.head, ns, idxs)
80 if d.consistent {
81 d.snap = d.head.Snapshot()
82 }
83 } 81 }
84 82
85 func (d *dataStoreData) setAutoIndex(enable bool) { 83 func (d *dataStoreData) setAutoIndex(enable bool) {
86 d.Lock() 84 d.Lock()
87 defer d.Unlock() 85 defer d.Unlock()
88 d.autoIndex = enable 86 d.autoIndex = enable
89 } 87 }
90 88
91 func (d *dataStoreData) maybeAutoIndex(err error) bool { 89 func (d *dataStoreData) maybeAutoIndex(err error) bool {
92 mi, ok := err.(*ErrMissingIndex) 90 mi, ok := err.(*ErrMissingIndex)
(...skipping 21 matching lines...) Expand all
114 112
115 func (d *dataStoreData) getDisableSpecialEntities() bool { 113 func (d *dataStoreData) getDisableSpecialEntities() bool {
116 d.rwlock.RLock() 114 d.rwlock.RLock()
117 defer d.rwlock.RUnlock() 115 defer d.rwlock.RUnlock()
118 return d.disableSpecialEntities 116 return d.disableSpecialEntities
119 } 117 }
120 118
121 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { 119 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
122 d.rwlock.RLock() 120 d.rwlock.RLock()
123 defer d.rwlock.RUnlock() 121 defer d.rwlock.RUnlock()
124 » if d.consistent { 122 » if d.snap == nil {
125 » » // snap is already a consistent snapshot of head 123 » » // we're 'always consistent'
126 » » return d.snap, d.snap 124 » » return d.head, d.head
127 } 125 }
128 126
129 head = d.head.Snapshot() 127 head = d.head.Snapshot()
130 if consistent { 128 if consistent {
131 idx = head 129 idx = head
132 } else { 130 } else {
133 idx = d.snap 131 idx = d.snap
134 } 132 }
135 return 133 return
136 } 134 }
137 135
138 func (d *dataStoreData) takeSnapshot() *memStore { 136 func (d *dataStoreData) takeSnapshot() *memStore {
139 d.rwlock.RLock() 137 d.rwlock.RLock()
140 defer d.rwlock.RUnlock() 138 defer d.rwlock.RUnlock()
141 if d.consistent {
142 return d.snap
143 }
144 return d.head.Snapshot() 139 return d.head.Snapshot()
145 } 140 }
146 141
147 func (d *dataStoreData) setSnapshot(snap *memStore) { 142 func (d *dataStoreData) setSnapshot(snap *memStore) {
148 d.rwlock.Lock() 143 d.rwlock.Lock()
149 defer d.rwlock.Unlock() 144 defer d.rwlock.Unlock()
150 » if d.consistent { 145 » if d.snap == nil {
146 » » // we're 'always consistent'
151 return 147 return
152 } 148 }
153 d.snap = snap 149 d.snap = snap
154 } 150 }
155 151
156 func (d *dataStoreData) catchupIndexes() { 152 func (d *dataStoreData) catchupIndexes() {
157 d.rwlock.Lock() 153 d.rwlock.Lock()
158 defer d.rwlock.Unlock() 154 defer d.rwlock.Unlock()
159 » if d.consistent { 155 » if d.snap == nil {
156 » » // we're 'always consistent'
160 return 157 return
161 } 158 }
162 d.snap = d.head.Snapshot() 159 d.snap = d.head.Snapshot()
163 } 160 }
164 161
165 /////////////////////////// indexes(dataStoreData) //////////////////////////// 162 /////////////////////////// indexes(dataStoreData) ////////////////////////////
166 163
167 func groupMetaKey(key *ds.Key) []byte { 164 func groupMetaKey(key *ds.Key) []byte {
168 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) 165 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) )
169 } 166 }
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
271 268
272 old := ents.Get(keyBytes(ret)) 269 old := ents.Get(keyBytes(ret))
273 oldPM := ds.PropertyMap(nil) 270 oldPM := ds.PropertyMap(nil)
274 if old != nil { 271 if old != nil {
275 if oldPM, err = rpmWoCtx(old, ns); err != nil { 272 if oldPM, err = rpmWoCtx(old, ns); err != nil {
276 return 273 return
277 } 274 }
278 } 275 }
279 updateIndexes(d.head, ret, oldPM, pmap) 276 updateIndexes(d.head, ret, oldPM, pmap)
280 ents.Set(keyBytes(ret), dataBytes) 277 ents.Set(keyBytes(ret), dataBytes)
281 if d.consistent {
282 d.snap = d.head.Snapshot()
283 }
284 return 278 return
285 }() 279 }()
286 if cb != nil { 280 if cb != nil {
287 cb(k, err) 281 cb(k, err)
288 } 282 }
289 } 283 }
290 } 284 }
291 285
292 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { 286 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error {
293 ents, err := getColl() 287 ents, err := getColl()
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 if !d.disableSpecialEntities { 329 if !d.disableSpecialEntities {
336 incrementLocked(ents, groupMetaKey(k), 1 ) 330 incrementLocked(ents, groupMetaKey(k), 1 )
337 } 331 }
338 if old := ents.Get(kb); old != nil { 332 if old := ents.Get(kb); old != nil {
339 oldPM, err := rpmWoCtx(old, ns) 333 oldPM, err := rpmWoCtx(old, ns)
340 if err != nil { 334 if err != nil {
341 return err 335 return err
342 } 336 }
343 updateIndexes(d.head, k, oldPM, nil) 337 updateIndexes(d.head, k, oldPM, nil)
344 ents.Delete(kb) 338 ents.Delete(kb)
345 if d.consistent {
346 d.snap = d.head.Snapshot()
347 }
348 } 339 }
349 return nil 340 return nil
350 }() 341 }()
351 if cb != nil { 342 if cb != nil {
352 cb(err) 343 cb(err)
353 } 344 }
354 } 345 }
355 } else if cb != nil { 346 } else if cb != nil {
356 for range keys { 347 for range keys {
357 cb(nil) 348 cb(nil)
(...skipping 200 matching lines...) Expand 10 before | Expand all | Expand 10 after
558 549
559 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { 550 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
560 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 551 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
561 serialize.WithoutContext, globalAppID, ns) 552 serialize.WithoutContext, globalAppID, ns)
562 } 553 }
563 554
564 func rpm(data []byte) (ds.PropertyMap, error) { 555 func rpm(data []byte) (ds.PropertyMap, error) {
565 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
566 serialize.WithContext, "", "") 557 serialize.WithContext, "", "")
567 } 558 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698