Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(608)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1366793002: Add Consistent(bool) to Testing interface (Closed) Base URL: https://github.com/luci/gae.git@move_serialization_helpers
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 ) 17 )
18 18
19 //////////////////////////////// dataStoreData ///////////////////////////////// 19 //////////////////////////////// dataStoreData /////////////////////////////////
20 20
21 type dataStoreData struct { 21 type dataStoreData struct {
22 rwlock sync.RWMutex 22 rwlock sync.RWMutex
23 // See README.md for head schema. 23 // See README.md for head schema.
24 head *memStore 24 head *memStore
25 snap *memStore 25 snap *memStore
26 // For testing, see SetTransactionRetryCount. 26 // For testing, see SetTransactionRetryCount.
27 txnFakeRetry int 27 txnFakeRetry int
28 // true means that head always == snap
29 consistent bool
28 } 30 }
29 31
30 var ( 32 var (
31 _ = memContextObj((*dataStoreData)(nil)) 33 _ = memContextObj((*dataStoreData)(nil))
32 _ = sync.Locker((*dataStoreData)(nil)) 34 _ = sync.Locker((*dataStoreData)(nil))
33 ) 35 )
34 36
35 func newDataStoreData() *dataStoreData { 37 func newDataStoreData() *dataStoreData {
36 head := newMemStore() 38 head := newMemStore()
37 return &dataStoreData{ 39 return &dataStoreData{
38 head: head, 40 head: head,
39 snap: head.Snapshot(), // empty but better than a nil pointer. 41 snap: head.Snapshot(), // empty but better than a nil pointer.
40 } 42 }
41 } 43 }
42 44
43 func (d *dataStoreData) Lock() { 45 func (d *dataStoreData) Lock() {
44 d.rwlock.Lock() 46 d.rwlock.Lock()
45 } 47 }
46 48
47 func (d *dataStoreData) Unlock() { 49 func (d *dataStoreData) Unlock() {
48 d.rwlock.Unlock() 50 d.rwlock.Unlock()
49 } 51 }
50 52
53 func (d *dataStoreData) setTxnRetry(count int) {
54 d.Lock()
Vadim Sh. 2015/09/24 18:40:11 oops.. didn't know it's under lock
iannucci 2015/09/24 18:59:30 is ok :)
55 defer d.Unlock()
56 d.txnFakeRetry = count
57 }
58
59 func (d *dataStoreData) setConsistent(always bool) {
60 d.Lock()
61 defer d.Unlock()
62
63 d.consistent = always
64 if d.consistent {
65 d.snap = d.head.Snapshot()
66 }
67 }
68
51 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) { 69 func (d *dataStoreData) getQuerySnaps(consistent bool) (idx, head *memStore) {
52 d.rwlock.RLock() 70 d.rwlock.RLock()
53 defer d.rwlock.RUnlock() 71 defer d.rwlock.RUnlock()
72 if d.consistent {
73 // snap is already a consistent snapshot of head
74 return d.snap, d.snap
75 }
76
54 head = d.head.Snapshot() 77 head = d.head.Snapshot()
55 if consistent { 78 if consistent {
56 idx = head 79 idx = head
57 } else { 80 } else {
58 idx = d.snap 81 idx = d.snap
59 } 82 }
60 return 83 return
61 } 84 }
62 85
63 func (d *dataStoreData) takeSnapshot() *memStore { 86 func (d *dataStoreData) takeSnapshot() *memStore {
64 d.rwlock.RLock() 87 d.rwlock.RLock()
65 defer d.rwlock.RUnlock() 88 defer d.rwlock.RUnlock()
89 if d.consistent {
90 return d.snap
91 }
66 return d.head.Snapshot() 92 return d.head.Snapshot()
67 } 93 }
68 94
69 func (d *dataStoreData) setSnapshot(snap *memStore) { 95 func (d *dataStoreData) setSnapshot(snap *memStore) {
70 d.rwlock.Lock() 96 d.rwlock.Lock()
71 defer d.rwlock.Unlock() 97 defer d.rwlock.Unlock()
98 if d.consistent {
99 return
100 }
72 d.snap = snap 101 d.snap = snap
73 } 102 }
74 103
75 func (d *dataStoreData) catchupIndexes() { 104 func (d *dataStoreData) catchupIndexes() {
76 d.rwlock.Lock() 105 d.rwlock.Lock()
77 defer d.rwlock.Unlock() 106 defer d.rwlock.Unlock()
107 if d.consistent {
108 return
109 }
78 d.snap = d.head.Snapshot() 110 d.snap = d.head.Snapshot()
79 } 111 }
80 112
81 /////////////////////////// indexes(dataStoreData) //////////////////////////// 113 /////////////////////////// indexes(dataStoreData) ////////////////////////////
82 114
83 func groupMetaKey(key *ds.Key) []byte { 115 func groupMetaKey(key *ds.Key) []byte {
84 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) ) 116 return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) )
85 } 117 }
86 118
87 func groupIDsKey(key *ds.Key) []byte { 119 func groupIDsKey(key *ds.Key) []byte {
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
152 184
153 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key { 185 func (d *dataStoreData) fixKeyLocked(ents *memCollection, key *ds.Key) *ds.Key {
154 if key.Incomplete() { 186 if key.Incomplete() {
155 id := d.allocateIDsLocked(ents, key, 1) 187 id := d.allocateIDsLocked(ents, key, 1)
156 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 188 key = ds.NewKey(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent())
157 } 189 }
158 return key 190 return key
159 } 191 }
160 192
161 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) { 193 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) {
162 » ents := d.mutableEnts(keys[0].Namespace()) 194 » ns := keys[0].Namespace()
195 » ents := d.mutableEnts(ns)
163 196
164 for i, k := range keys { 197 for i, k := range keys {
165 pmap, _ := vals[i].Save(false) 198 pmap, _ := vals[i].Save(false)
166 dataBytes := serialize.ToBytes(pmap) 199 dataBytes := serialize.ToBytes(pmap)
167 200
168 k, err := func() (ret *ds.Key, err error) { 201 k, err := func() (ret *ds.Key, err error) {
169 d.Lock() 202 d.Lock()
170 defer d.Unlock() 203 defer d.Unlock()
171 204
172 ret = d.fixKeyLocked(ents, k) 205 ret = d.fixKeyLocked(ents, k)
173 incrementLocked(ents, groupMetaKey(ret), 1) 206 incrementLocked(ents, groupMetaKey(ret), 1)
174 207
175 old := ents.Get(keyBytes(ret)) 208 old := ents.Get(keyBytes(ret))
176 oldPM := ds.PropertyMap(nil) 209 oldPM := ds.PropertyMap(nil)
177 if old != nil { 210 if old != nil {
178 » » » » if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { 211 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil {
179 return 212 return
180 } 213 }
181 } 214 }
182 updateIndexes(d.head, ret, oldPM, pmap) 215 updateIndexes(d.head, ret, oldPM, pmap)
183 ents.Set(keyBytes(ret), dataBytes) 216 ents.Set(keyBytes(ret), dataBytes)
217 if d.consistent {
218 d.snap = d.head.Snapshot()
219 }
184 return 220 return
185 }() 221 }()
186 if cb != nil { 222 if cb != nil {
187 cb(k, err) 223 cb(k, err)
188 } 224 }
189 } 225 }
190 } 226 }
191 227
192 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error { 228 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error {
193 ents, err := getColl() 229 ents, err := getColl()
(...skipping 20 matching lines...) Expand all
214 250
215 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { 251 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
216 return getMultiInner(keys, cb, func() (*memCollection, error) { 252 return getMultiInner(keys, cb, func() (*memCollection, error) {
217 s := d.takeSnapshot() 253 s := d.takeSnapshot()
218 254
219 return s.GetCollection("ents:" + keys[0].Namespace()), nil 255 return s.GetCollection("ents:" + keys[0].Namespace()), nil
220 }) 256 })
221 } 257 }
222 258
223 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { 259 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
224 toDel := make([][]byte, 0, len(keys))
225 for _, k := range keys {
226 toDel = append(toDel, keyBytes(k))
227 }
228 ns := keys[0].Namespace() 260 ns := keys[0].Namespace()
261 ents := d.mutableEnts(ns)
229 262
230 » d.rwlock.Lock() 263 » if ents != nil {
231 » defer d.rwlock.Unlock() 264 » » for _, k := range keys {
265 » » » err := func() error {
266 » » » » kb := keyBytes(k)
232 267
233 » ents := d.head.GetCollection("ents:" + ns) 268 » » » » d.Lock()
269 » » » » defer d.Unlock()
234 270
235 » for i, k := range keys { 271 » » » » incrementLocked(ents, groupMetaKey(k), 1)
236 » » if ents != nil { 272 » » » » if old := ents.Get(kb); old != nil {
237 » » » incrementLocked(ents, groupMetaKey(k), 1) 273 » » » » » oldPM, err := rpmWoCtx(old, ns)
238 » » » kb := toDel[i] 274 » » » » » if err != nil {
239 » » » if old := ents.Get(kb); old != nil { 275 » » » » » » return err
240 » » » » oldPM, err := rpmWoCtx(old, ns)
241 » » » » if err != nil {
242 » » » » » if cb != nil {
243 » » » » » » cb(err)
244 } 276 }
245 » » » » » continue 277 » » » » » updateIndexes(d.head, k, oldPM, nil)
278 » » » » » ents.Delete(kb)
279 » » » » » if d.consistent {
280 » » » » » » d.snap = d.head.Snapshot()
281 » » » » » }
246 } 282 }
247 » » » » updateIndexes(d.head, k, oldPM, nil) 283 » » » » return nil
248 » » » » ents.Delete(kb) 284 » » » }()
285 » » » if cb != nil {
Vadim Sh. 2015/09/24 18:40:11 I think you need to call cb() len(keys) number of
iannucci 2015/09/24 18:59:30 oh, good catch.
286 » » » » cb(err)
249 } 287 }
250 } 288 }
251 if cb != nil {
252 cb(nil)
253 }
254 } 289 }
255 } 290 }
256 291
257 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool { 292 func (d *dataStoreData) canApplyTxn(obj memContextObj) bool {
258 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 293 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
259 294
260 txn := obj.(*txnDataStoreData) 295 txn := obj.(*txnDataStoreData)
261 for rk, muts := range txn.muts { 296 for rk, muts := range txn.muts {
262 if len(muts) == 0 { // read-only 297 if len(muts) == 0 { // read-only
263 continue 298 continue
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
450 485
451 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { 486 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
452 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 487 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
453 serialize.WithoutContext, globalAppID, ns) 488 serialize.WithoutContext, globalAppID, ns)
454 } 489 }
455 490
456 func rpm(data []byte) (ds.PropertyMap, error) { 491 func rpm(data []byte) (ds.PropertyMap, error) {
457 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 492 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
458 serialize.WithContext, "", "") 493 serialize.WithContext, "", "")
459 } 494 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698