Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(367)

Side by Side Diff: impl/memory/datastore_data.go

Issue 1355783002: Refactor keys and queries in datastore service and implementation. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: appease errcheck Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sync" 10 "sync"
11 "sync/atomic" 11 "sync/atomic"
12 12
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/dskey"
15 "github.com/luci/gae/service/datastore/serialize" 14 "github.com/luci/gae/service/datastore/serialize"
16 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
17 "golang.org/x/net/context" 16 "golang.org/x/net/context"
18 ) 17 )
19 18
20 //////////////////////////////// dataStoreData ///////////////////////////////// 19 //////////////////////////////// dataStoreData /////////////////////////////////
21 20
22 type dataStoreData struct { 21 type dataStoreData struct {
23 rwlock sync.RWMutex 22 rwlock sync.RWMutex
24 // See README.md for head schema. 23 // See README.md for head schema.
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
74 } 73 }
75 74
76 func (d *dataStoreData) catchupIndexes() { 75 func (d *dataStoreData) catchupIndexes() {
77 d.rwlock.Lock() 76 d.rwlock.Lock()
78 defer d.rwlock.Unlock() 77 defer d.rwlock.Unlock()
79 d.snap = d.head.Snapshot() 78 d.snap = d.head.Snapshot()
80 } 79 }
81 80
82 /////////////////////////// indexes(dataStoreData) //////////////////////////// 81 /////////////////////////// indexes(dataStoreData) ////////////////////////////
83 82
84 func groupMetaKey(key ds.Key) []byte { 83 func groupMetaKey(key *ds.Key) []byte {
85 » return keyBytes(dskey.New("", "", "__entity_group__", "", 1, dskey.Root( key))) 84 » return keyBytes(ds.NewKey("", "", "__entity_group__", "", 1, key.Root()) )
86 } 85 }
87 86
88 func groupIDsKey(key ds.Key) []byte { 87 func groupIDsKey(key *ds.Key) []byte {
89 » return keyBytes(dskey.New("", "", "__entity_group_ids__", "", 1, dskey.R oot(key))) 88 » return keyBytes(ds.NewKey("", "", "__entity_group_ids__", "", 1, key.Roo t()))
90 } 89 }
91 90
92 func rootIDsKey(kind string) []byte { 91 func rootIDsKey(kind string) []byte {
93 » return keyBytes(dskey.New("", "", "__entity_root_ids__", kind, 0, nil)) 92 » return keyBytes(ds.NewKey("", "", "__entity_root_ids__", kind, 0, nil))
94 } 93 }
95 94
96 func curVersion(ents *memCollection, key []byte) int64 { 95 func curVersion(ents *memCollection, key []byte) int64 {
97 if ents != nil { 96 if ents != nil {
98 if v := ents.Get(key); v != nil { 97 if v := ents.Get(key); v != nil {
99 pm, err := rpm(v) 98 pm, err := rpm(v)
100 memoryCorruption(err) 99 memoryCorruption(err)
101 100
102 pl, ok := pm["__version__"] 101 pl, ok := pm["__version__"]
103 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt { 102 if ok && len(pl) > 0 && pl[0].Type() == ds.PTInt {
104 return pl[0].Value().(int64) 103 return pl[0].Value().(int64)
105 } 104 }
106 105
107 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm)) 106 memoryCorruption(fmt.Errorf("__version__ property missin g or wrong: %v", pm))
108 } 107 }
109 } 108 }
110 return 0 109 return 0
111 } 110 }
112 111
113 func incrementLocked(ents *memCollection, key []byte) int64 { 112 func incrementLocked(ents *memCollection, key []byte) int64 {
114 ret := curVersion(ents, key) + 1 113 ret := curVersion(ents, key) + 1
115 ents.Set(key, serialize.ToBytes(ds.PropertyMap{ 114 ents.Set(key, serialize.ToBytes(ds.PropertyMap{
116 "__version__": {ds.MkPropertyNI(ret)}, 115 "__version__": {ds.MkPropertyNI(ret)},
117 })) 116 }))
118 return ret 117 return ret
119 } 118 }
120 119
121 func (d *dataStoreData) entsKeyLocked(key ds.Key) (*memCollection, ds.Key) { 120 func (d *dataStoreData) entsKeyLocked(key *ds.Key) (*memCollection, *ds.Key) {
122 coll := "ents:" + key.Namespace() 121 coll := "ents:" + key.Namespace()
123 ents := d.head.GetCollection(coll) 122 ents := d.head.GetCollection(coll)
124 if ents == nil { 123 if ents == nil {
125 ents = d.head.SetCollection(coll, nil) 124 ents = d.head.SetCollection(coll, nil)
126 } 125 }
127 126
128 » if dskey.Incomplete(key) { 127 » if key.Incomplete() {
129 idKey := []byte(nil) 128 idKey := []byte(nil)
130 if key.Parent() == nil { 129 if key.Parent() == nil {
131 » » » idKey = rootIDsKey(key.Kind()) 130 » » » idKey = rootIDsKey(key.Last().Kind)
132 } else { 131 } else {
133 idKey = groupIDsKey(key) 132 idKey = groupIDsKey(key)
134 } 133 }
135 id := incrementLocked(ents, idKey) 134 id := incrementLocked(ents, idKey)
136 » » key = dskey.New(key.AppID(), key.Namespace(), key.Kind(), "", id , key.Parent()) 135 » » key = ds.NewKey(key.AppID(), key.Namespace(), key.Last().Kind, " ", id, key.Parent())
137 } 136 }
138 137
139 return ents, key 138 return ents, key
140 } 139 }
141 140
142 func (d *dataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.Put MultiCB) { 141 func (d *dataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.Pu tMultiCB) {
143 for i, k := range keys { 142 for i, k := range keys {
144 pmap, _ := vals[i].Save(false) 143 pmap, _ := vals[i].Save(false)
145 dataBytes := serialize.ToBytes(pmap) 144 dataBytes := serialize.ToBytes(pmap)
146 145
147 » » k, err := func() (ret ds.Key, err error) { 146 » » k, err := func() (ret *ds.Key, err error) {
148 d.rwlock.Lock() 147 d.rwlock.Lock()
149 defer d.rwlock.Unlock() 148 defer d.rwlock.Unlock()
150 149
151 ents, ret := d.entsKeyLocked(k) 150 ents, ret := d.entsKeyLocked(k)
152 incrementLocked(ents, groupMetaKey(ret)) 151 incrementLocked(ents, groupMetaKey(ret))
153 152
154 old := ents.Get(keyBytes(ret)) 153 old := ents.Get(keyBytes(ret))
155 oldPM := ds.PropertyMap(nil) 154 oldPM := ds.PropertyMap(nil)
156 if old != nil { 155 if old != nil {
157 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil { 156 if oldPM, err = rpmWoCtx(old, ret.Namespace()); err != nil {
158 return 157 return
159 } 158 }
160 } 159 }
161 updateIndexes(d.head, ret, oldPM, pmap) 160 updateIndexes(d.head, ret, oldPM, pmap)
162 ents.Set(keyBytes(ret), dataBytes) 161 ents.Set(keyBytes(ret), dataBytes)
163 return 162 return
164 }() 163 }()
165 if cb != nil { 164 if cb != nil {
166 cb(k, err) 165 cb(k, err)
167 } 166 }
168 } 167 }
169 } 168 }
170 169
171 func getMultiInner(keys []ds.Key, cb ds.GetMultiCB, getColl func() (*memCollecti on, error)) error { 170 func getMultiInner(keys []*ds.Key, cb ds.GetMultiCB, getColl func() (*memCollect ion, error)) error {
172 ents, err := getColl() 171 ents, err := getColl()
173 if err != nil { 172 if err != nil {
174 return err 173 return err
175 } 174 }
176 if ents == nil { 175 if ents == nil {
177 for range keys { 176 for range keys {
178 cb(nil, ds.ErrNoSuchEntity) 177 cb(nil, ds.ErrNoSuchEntity)
179 } 178 }
180 return nil 179 return nil
181 } 180 }
182 181
183 for _, k := range keys { 182 for _, k := range keys {
184 pdata := ents.Get(keyBytes(k)) 183 pdata := ents.Get(keyBytes(k))
185 if pdata == nil { 184 if pdata == nil {
186 cb(nil, ds.ErrNoSuchEntity) 185 cb(nil, ds.ErrNoSuchEntity)
187 continue 186 continue
188 } 187 }
189 cb(rpmWoCtx(pdata, k.Namespace())) 188 cb(rpmWoCtx(pdata, k.Namespace()))
190 } 189 }
191 return nil 190 return nil
192 } 191 }
193 192
194 func (d *dataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { 193 func (d *dataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
195 » getMultiInner(keys, cb, func() (*memCollection, error) { 194 » return getMultiInner(keys, cb, func() (*memCollection, error) {
196 s := d.takeSnapshot() 195 s := d.takeSnapshot()
197 196
198 return s.GetCollection("ents:" + keys[0].Namespace()), nil 197 return s.GetCollection("ents:" + keys[0].Namespace()), nil
199 }) 198 })
200 return nil
201 } 199 }
202 200
203 func (d *dataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) { 201 func (d *dataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) {
204 toDel := make([][]byte, 0, len(keys)) 202 toDel := make([][]byte, 0, len(keys))
205 for _, k := range keys { 203 for _, k := range keys {
206 toDel = append(toDel, keyBytes(k)) 204 toDel = append(toDel, keyBytes(k))
207 } 205 }
208 ns := keys[0].Namespace() 206 ns := keys[0].Namespace()
209 207
210 d.rwlock.Lock() 208 d.rwlock.Lock()
211 defer d.rwlock.Unlock() 209 defer d.rwlock.Unlock()
212 210
213 ents := d.head.GetCollection("ents:" + ns) 211 ents := d.head.GetCollection("ents:" + ns)
(...skipping 24 matching lines...) Expand all
238 // TODO(riannucci): implement with Flush/FlushRevert for persistance. 236 // TODO(riannucci): implement with Flush/FlushRevert for persistance.
239 237
240 txn := obj.(*txnDataStoreData) 238 txn := obj.(*txnDataStoreData)
241 for rk, muts := range txn.muts { 239 for rk, muts := range txn.muts {
242 if len(muts) == 0 { // read-only 240 if len(muts) == 0 { // read-only
243 continue 241 continue
244 } 242 }
245 prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "") 243 prop, err := serialize.ReadProperty(bytes.NewBufferString(rk), s erialize.WithContext, "", "")
246 memoryCorruption(err) 244 memoryCorruption(err)
247 245
248 » » k := prop.Value().(ds.Key) 246 » » k := prop.Value().(*ds.Key)
249 247
250 entKey := "ents:" + k.Namespace() 248 entKey := "ents:" + k.Namespace()
251 mkey := groupMetaKey(k) 249 mkey := groupMetaKey(k)
252 entsHead := d.head.GetCollection(entKey) 250 entsHead := d.head.GetCollection(entKey)
253 entsSnap := txn.snap.GetCollection(entKey) 251 entsSnap := txn.snap.GetCollection(entKey)
254 vHead := curVersion(entsHead, mkey) 252 vHead := curVersion(entsHead, mkey)
255 vSnap := curVersion(entsSnap, mkey) 253 vSnap := curVersion(entsSnap, mkey)
256 if vHead != vSnap { 254 if vHead != vSnap {
257 return false 255 return false
258 } 256 }
259 } 257 }
260 return true 258 return true
261 } 259 }
262 260
263 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) { 261 func (d *dataStoreData) applyTxn(c context.Context, obj memContextObj) {
264 txn := obj.(*txnDataStoreData) 262 txn := obj.(*txnDataStoreData)
265 for _, muts := range txn.muts { 263 for _, muts := range txn.muts {
266 if len(muts) == 0 { // read-only 264 if len(muts) == 0 { // read-only
267 continue 265 continue
268 } 266 }
269 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti 267 // TODO(riannucci): refactor to do just 1 putMulti, and 1 delMul ti
270 for _, m := range muts { 268 for _, m := range muts {
271 err := error(nil) 269 err := error(nil)
272 k := m.key 270 k := m.key
273 if m.data == nil { 271 if m.data == nil {
274 » » » » d.delMulti([]ds.Key{k}, 272 » » » » d.delMulti([]*ds.Key{k},
275 func(e error) { err = e }) 273 func(e error) { err = e })
276 } else { 274 } else {
277 » » » » d.putMulti([]ds.Key{m.key}, []ds.PropertyMap{m.d ata}, 275 » » » » d.putMulti([]*ds.Key{m.key}, []ds.PropertyMap{m. data},
278 » » » » » func(_ ds.Key, e error) { err = e }) 276 » » » » » func(_ *ds.Key, e error) { err = e })
279 } 277 }
280 impossible(err) 278 impossible(err)
281 } 279 }
282 } 280 }
283 } 281 }
284 282
285 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj { 283 func (d *dataStoreData) mkTxn(o *ds.TransactionOptions) memContextObj {
286 return &txnDataStoreData{ 284 return &txnDataStoreData{
287 // alias to the main datastore's so that testing code can have p rimitive 285 // alias to the main datastore's so that testing code can have p rimitive
288 // access to break features inside of transactions. 286 // access to break features inside of transactions.
289 parent: d, 287 parent: d,
290 isXG: o != nil && o.XG, 288 isXG: o != nil && o.XG,
291 snap: d.head.Snapshot(), 289 snap: d.head.Snapshot(),
292 muts: map[string][]txnMutation{}, 290 muts: map[string][]txnMutation{},
293 } 291 }
294 } 292 }
295 293
296 func (d *dataStoreData) endTxn() {} 294 func (d *dataStoreData) endTxn() {}
297 295
298 /////////////////////////////// txnDataStoreData /////////////////////////////// 296 /////////////////////////////// txnDataStoreData ///////////////////////////////
299 297
300 type txnMutation struct { 298 type txnMutation struct {
301 » key ds.Key 299 » key *ds.Key
302 data ds.PropertyMap 300 data ds.PropertyMap
303 } 301 }
304 302
305 type txnDataStoreData struct { 303 type txnDataStoreData struct {
306 sync.Mutex 304 sync.Mutex
307 305
308 parent *dataStoreData 306 parent *dataStoreData
309 307
310 // boolean 0 or 1, use atomic.*Int32 to access. 308 // boolean 0 or 1, use atomic.*Int32 to access.
311 closed int32 309 closed int32
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 // mutation. 349 // mutation.
352 // 350 //
353 // if getOnly is true, don't record the actual mutation data, just ensure that 351 // if getOnly is true, don't record the actual mutation data, just ensure that
354 // the key is in an included entity group (or add an empty entry for tha t 352 // the key is in an included entity group (or add an empty entry for tha t
355 // group). 353 // group).
356 // 354 //
357 // if !getOnly && data == nil, this counts as a deletion instead of a Put. 355 // if !getOnly && data == nil, this counts as a deletion instead of a Put.
358 // 356 //
359 // Returns an error if this key causes the transaction to cross too many entity 357 // Returns an error if this key causes the transaction to cross too many entity
360 // groups. 358 // groups.
361 func (td *txnDataStoreData) writeMutation(getOnly bool, key ds.Key, data ds.Prop ertyMap) error { 359 func (td *txnDataStoreData) writeMutation(getOnly bool, key *ds.Key, data ds.Pro pertyMap) error {
362 » rk := string(keyBytes(dskey.Root(key))) 360 » rk := string(keyBytes(key.Root()))
363 361
364 td.Lock() 362 td.Lock()
365 defer td.Unlock() 363 defer td.Unlock()
366 364
367 if _, ok := td.muts[rk]; !ok { 365 if _, ok := td.muts[rk]; !ok {
368 limit := 1 366 limit := 1
369 if td.isXG { 367 if td.isXG {
370 limit = xgEGLimit 368 limit = xgEGLimit
371 } 369 }
372 if len(td.muts)+1 > limit { 370 if len(td.muts)+1 > limit {
373 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)" 371 msg := "cross-group transaction need to be explicitly sp ecified (xg=True)"
374 if td.isXG { 372 if td.isXG {
375 msg = "operating on too many entity groups in a single transaction" 373 msg = "operating on too many entity groups in a single transaction"
376 } 374 }
377 return errors.New(msg) 375 return errors.New(msg)
378 } 376 }
379 td.muts[rk] = []txnMutation{} 377 td.muts[rk] = []txnMutation{}
380 } 378 }
381 if !getOnly { 379 if !getOnly {
382 td.muts[rk] = append(td.muts[rk], txnMutation{key, data}) 380 td.muts[rk] = append(td.muts[rk], txnMutation{key, data})
383 } 381 }
384 382
385 return nil 383 return nil
386 } 384 }
387 385
388 func (td *txnDataStoreData) putMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds .PutMultiCB) { 386 func (td *txnDataStoreData) putMulti(keys []*ds.Key, vals []ds.PropertyMap, cb d s.PutMultiCB) {
389 for i, k := range keys { 387 for i, k := range keys {
390 func() { 388 func() {
391 td.parent.Lock() 389 td.parent.Lock()
392 defer td.parent.Unlock() 390 defer td.parent.Unlock()
393 _, k = td.parent.entsKeyLocked(k) 391 _, k = td.parent.entsKeyLocked(k)
394 }() 392 }()
395 err := td.writeMutation(false, k, vals[i]) 393 err := td.writeMutation(false, k, vals[i])
396 if cb != nil { 394 if cb != nil {
397 cb(k, err) 395 cb(k, err)
398 } 396 }
399 } 397 }
400 } 398 }
401 399
402 func (td *txnDataStoreData) getMulti(keys []ds.Key, cb ds.GetMultiCB) error { 400 func (td *txnDataStoreData) getMulti(keys []*ds.Key, cb ds.GetMultiCB) error {
403 return getMultiInner(keys, cb, func() (*memCollection, error) { 401 return getMultiInner(keys, cb, func() (*memCollection, error) {
404 err := error(nil) 402 err := error(nil)
405 for _, key := range keys { 403 for _, key := range keys {
406 err = td.writeMutation(true, key, nil) 404 err = td.writeMutation(true, key, nil)
407 if err != nil { 405 if err != nil {
408 return nil, err 406 return nil, err
409 } 407 }
410 } 408 }
411 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil 409 return td.snap.GetCollection("ents:" + keys[0].Namespace()), nil
412 }) 410 })
413 } 411 }
414 412
415 func (td *txnDataStoreData) delMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { 413 func (td *txnDataStoreData) delMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
416 for _, k := range keys { 414 for _, k := range keys {
417 err := td.writeMutation(false, k, nil) 415 err := td.writeMutation(false, k, nil)
418 if cb != nil { 416 if cb != nil {
419 cb(err) 417 cb(err)
420 } 418 }
421 } 419 }
422 return nil 420 return nil
423 } 421 }
424 422
425 func keyBytes(key ds.Key) []byte { 423 func keyBytes(key *ds.Key) []byte {
426 return serialize.ToBytes(ds.MkProperty(key)) 424 return serialize.ToBytes(ds.MkProperty(key))
427 } 425 }
428 426
429 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) { 427 func rpmWoCtx(data []byte, ns string) (ds.PropertyMap, error) {
430 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 428 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
431 serialize.WithoutContext, globalAppID, ns) 429 serialize.WithoutContext, globalAppID, ns)
432 } 430 }
433 431
434 func rpm(data []byte) (ds.PropertyMap, error) { 432 func rpm(data []byte) (ds.PropertyMap, error) {
435 return serialize.ReadPropertyMap(bytes.NewBuffer(data), 433 return serialize.ReadPropertyMap(bytes.NewBuffer(data),
436 serialize.WithContext, "", "") 434 serialize.WithContext, "", "")
437 } 435 }
438
439 type keyitem interface {
440 Key() ds.Key
441 }
OLDNEW
« no previous file with comments | « impl/memory/datastore.go ('k') | impl/memory/datastore_index.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698