Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(262)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1369353003: Make impl/memory have an AppID. (Closed) Base URL: https://github.com/luci/gae.git@fix_consistent
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 ) 17 )
18 18
19 //////////////////////////////// dataStoreData ///////////////////////////////// 19 //////////////////////////////// dataStoreData /////////////////////////////////
20 20
21 type dataStoreData struct { 21 type dataStoreData struct {
22 rwlock sync.RWMutex 22 rwlock sync.RWMutex
23
24 // the 'appid' of this datastore
25 aid string
26
23 // See README.md for head schema. 27 // See README.md for head schema.
24 head *memStore 28 head *memStore
25 // if snap is nil, that means that this is always-consistent, and 29 // if snap is nil, that means that this is always-consistent, and
26 // getQuerySnaps will return (head, head) 30 // getQuerySnaps will return (head, head)
27 snap *memStore 31 snap *memStore
28 // For testing, see SetTransactionRetryCount. 32 // For testing, see SetTransactionRetryCount.
29 txnFakeRetry int 33 txnFakeRetry int
30 // true means that queries with insufficent indexes will pause to add th em 34 // true means that queries with insufficent indexes will pause to add th em
31 // and then continue instead of failing. 35 // and then continue instead of failing.
32 autoIndex bool 36 autoIndex bool
33 // true means that all of the __...__ keys which are normally automatica lly 37 // true means that all of the __...__ keys which are normally automatica lly
34 // maintained will be omitted. This also means that Put with an incomple te 38 // maintained will be omitted. This also means that Put with an incomple te
35 // key will become an error. 39 // key will become an error.
36 disableSpecialEntities bool 40 disableSpecialEntities bool
37 } 41 }
38 42
39 var ( 43 var (
40 _ = memContextObj((*dataStoreData)(nil)) 44 _ = memContextObj((*dataStoreData)(nil))
41 _ = sync.Locker((*dataStoreData)(nil)) 45 _ = sync.Locker((*dataStoreData)(nil))
42 ) 46 )
43 47
44 func newDataStoreData() *dataStoreData { 48 func newDataStoreData(aid string) *dataStoreData {
45 head := newMemStore() 49 head := newMemStore()
46 return &dataStoreData{ 50 return &dataStoreData{
51 aid: aid,
47 head: head, 52 head: head,
48 snap: head.Snapshot(), // empty but better than a nil pointer. 53 snap: head.Snapshot(), // empty but better than a nil pointer.
49 } 54 }
50 } 55 }
51 56
52 func (d *dataStoreData) Lock() { 57 func (d *dataStoreData) Lock() {
53 d.rwlock.Lock() 58 d.rwlock.Lock()
54 } 59 }
55 60
56 func (d *dataStoreData) Unlock() { 61 func (d *dataStoreData) Unlock() {
(...skipping 13 matching lines...) Expand all
70 if always { 75 if always {
71 d.snap = nil 76 d.snap = nil
72 } else { 77 } else {
73 d.snap = d.head.Snapshot() 78 d.snap = d.head.Snapshot()
74 } 79 }
75 } 80 }
76 81
77 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) { 82 func (d *dataStoreData) addIndexes(ns string, idxs []*ds.IndexDefinition) {
78 d.Lock() 83 d.Lock()
79 defer d.Unlock() 84 defer d.Unlock()
80 » addIndexes(d.head, ns, idxs) 85 » addIndexes(d.head, d.aid, ns, idxs)
81 } 86 }
82 87
83 func (d *dataStoreData) setAutoIndex(enable bool) { 88 func (d *dataStoreData) setAutoIndex(enable bool) {
84 d.Lock() 89 d.Lock()
85 defer d.Unlock() 90 defer d.Unlock()
86 d.autoIndex = enable 91 d.autoIndex = enable
87 } 92 }
88 93
89 func (d *dataStoreData) maybeAutoIndex(err error) bool { 94 func (d *dataStoreData) maybeAutoIndex(err error) bool {
90 mi, ok := err.(*ErrMissingIndex) 95 mi, ok := err.(*ErrMissingIndex)
(...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 if err != nil { 267 if err != nil {
263 return 268 return
264 } 269 }
265 if !d.disableSpecialEntities { 270 if !d.disableSpecialEntities {
266 incrementLocked(ents, groupMetaKey(ret), 1) 271 incrementLocked(ents, groupMetaKey(ret), 1)
267 } 272 }
268 273
269 old := ents.Get(keyBytes(ret)) 274 old := ents.Get(keyBytes(ret))
270 oldPM := ds.PropertyMap(nil) 275 oldPM := ds.PropertyMap(nil)
271 if old != nil { 276 if old != nil {
272 » » » » if oldPM, err = rpmWoCtx(old, ns); err != nil { 277 » » » » if oldPM, err = rpm(old); err != nil {
273 return 278 return
274 } 279 }
275 } 280 }
276 updateIndexes(d.head, ret, oldPM, pmap) 281 updateIndexes(d.head, ret, oldPM, pmap)
277 ents.Set(keyBytes(ret), dataBytes) 282 ents.Set(keyBytes(ret), dataBytes)
278 return 283 return
279 }() 284 }()
280 if cb != nil { 285 if cb != nil {
281 cb(k, err) 286 cb(k, err)
282 } 287 }
(...skipping 11 matching lines...) Expand all
294 } 299 }
295 return nil 300 return nil
296 } 301 }
297 302
298 for _, k := range keys { 303 for _, k := range keys {
299 pdata := ents.Get(keyBytes(k)) 304 pdata := ents.Get(keyBytes(k))
300 if pdata == nil { 305 if pdata == nil {
301 cb(nil, ds.ErrNoSuchEntity) 306 cb(nil, ds.ErrNoSuchEntity)
302 continue 307 continue
303 } 308 }
304 » » cb(rpmWoCtx(pdata, k.Namespace())) 309 » » cb(rpm(pdata))
305 } 310 }
306 return nil 311 return nil
307 } 312 }
308 313
309 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error { 314 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
310 return getMultiInner(keys, cb, func() (*memCollection, error) { 315 return getMultiInner(keys, cb, func() (*memCollection, error) {
311 s := d.takeSnapshot() 316 s := d.takeSnapshot()
312 317
313 return s.GetCollection("ents:" + keys[0].Namespace()), nil 318 return s.GetCollection("ents:" + keys[0].Namespace()), nil
314 }) 319 })
315 } 320 }
316 321
317 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) { 322 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
318 ns := keys[0].Namespace() 323 ns := keys[0].Namespace()
319 ents := d.mutableEnts(ns) 324 ents := d.mutableEnts(ns)
320 325
321 if ents != nil { 326 if ents != nil {
322 for _, k := range keys { 327 for _, k := range keys {
323 err := func() error { 328 err := func() error {
324 kb := keyBytes(k) 329 kb := keyBytes(k)
325 330
326 d.Lock() 331 d.Lock()
327 defer d.Unlock() 332 defer d.Unlock()
328 333
329 if !d.disableSpecialEntities { 334 if !d.disableSpecialEntities {
330 incrementLocked(ents, groupMetaKey(k), 1 ) 335 incrementLocked(ents, groupMetaKey(k), 1 )
331 } 336 }
332 if old := ents.Get(kb); old != nil { 337 if old := ents.Get(kb); old != nil {
333 » » » » » oldPM, err := rpmWoCtx(old, ns) 338 » » » » » oldPM, err := rpm(old)
334 if err != nil { 339 if err != nil {
335 return err 340 return err
336 } 341 }
337 updateIndexes(d.head, k, oldPM, nil) 342 updateIndexes(d.head, k, oldPM, nil)
338 ents.Delete(kb) 343 ents.Delete(kb)
339 } 344 }
340 return nil 345 return nil
341 }() 346 }()
342 if cb != nil { 347 if cb != nil {
343 cb(err) 348 cb(err)
(...skipping 196 matching lines...) Expand 10 before | Expand all | Expand 10 after
540 cb(err) 545 cb(err)
541 } 546 }
542 } 547 }
543 return nil 548 return nil
544 } 549 }
545 550
546 func keyBytes(key *ds.Key) []byte { 551 func keyBytes(key *ds.Key) []byte {
547 return serialize.ToBytes(ds.MkProperty(key)) 552 return serialize.ToBytes(ds.MkProperty(key))
548 } 553 }
549 554
550 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
551 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
552 serialize.WithoutContext, globalAppID, ns)
553 }
554
555 func rpm(data []byte) (ds.PropertyMap, error) { 555 func rpm(data []byte) (ds.PropertyMap, error) {
556 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 556 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
557 serialize.WithContext, "", "") 557 serialize.WithContext, "", "")
558 } 558 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698