Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(82)

Side by Side Diff: impl/memory/datastore_index.go

Issue 2604943002: impl/memory: Replace gkvlite with "treapstore". (Closed)
Patch Set: Comments. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/README.md ('k') | impl/memory/datastore_index_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sort" 10 "sort"
11 11
12 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize" 13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/gkvlite"
15 ) 14 )
16 15
17 type qIndexSlice []*ds.IndexDefinition 16 type qIndexSlice []*ds.IndexDefinition
18 17
19 func (s qIndexSlice) Len() int { return len(s) } 18 func (s qIndexSlice) Len() int { return len(s) }
20 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 19 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
21 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } 20 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
22 21
23 func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { 22 func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
24 ret := make(qIndexSlice, 0, 2*len(pmap)+1) 23 ret := make(qIndexSlice, 0, 2*len(pmap)+1)
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 } 175 }
177 itrDef := iterDefinition{c: idxColl} 176 itrDef := iterDefinition{c: idxColl}
178 177
179 if endsWith != nil { 178 if endsWith != nil {
180 full := serialize.ToBytes(*endsWith.Flip()) 179 full := serialize.ToBytes(*endsWith.Flip())
181 // chop off the null terminating byte 180 // chop off the null terminating byte
182 itrDef.prefix = full[:len(full)-1] 181 itrDef.prefix = full[:len(full)-1]
183 } 182 }
184 183
185 it := itrDef.mkIter() 184 it := itrDef.mkIter()
186 » defer it.stop() 185 » for ent := it.next(); ent != nil; ent = it.next() {
187 » for !it.stopped { 186 » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(ent.key ))
188 » » it.next(nil, func(i *gkvlite.Item) { 187 » » memoryCorruption(err)
189 » » » if i == nil { 188 » » if !cb(qi.Flip()) {
190 » » » » return 189 » » » break
191 » » » } 190 » » }
192 » » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer (i.Key))
193 » » » memoryCorruption(err)
194 » » » if !cb(qi.Flip()) {
195 » » » » it.stop()
196 » » » }
197 » » })
198 } 191 }
199 } 192 }
200 193
201 func mergeIndexes(ns string, store, oldIdx, newIdx memStore) { 194 func mergeIndexes(ns string, store, oldIdx, newIdx memStore) {
202 prefixBuf := []byte("idx:" + ns + ":") 195 prefixBuf := []byte("idx:" + ns + ":")
203 origPrefixBufLen := len(prefixBuf) 196 origPrefixBufLen := len(prefixBuf)
204 197
205 oldIdx = oldIdx.Snapshot() 198 oldIdx = oldIdx.Snapshot()
206 newIdx = newIdx.Snapshot() 199 newIdx = newIdx.Snapshot()
207 200
208 » gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun c(k, ov, nv []byte) { 201 » memStoreCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx") , func(k, ov, nv []byte) {
209 prefixBuf = append(prefixBuf[:origPrefixBufLen], k...) 202 prefixBuf = append(prefixBuf[:origPrefixBufLen], k...)
210 ks := string(prefixBuf) 203 ks := string(prefixBuf)
211 204
212 coll := store.GetOrCreateCollection(ks) 205 coll := store.GetOrCreateCollection(ks)
213 206
214 oldColl := oldIdx.GetCollection(ks) 207 oldColl := oldIdx.GetCollection(ks)
215 newColl := newIdx.GetCollection(ks) 208 newColl := newIdx.GetCollection(ks)
216 209
217 switch { 210 switch {
218 case ov == nil && nv != nil: // all additions 211 case ov == nil && nv != nil: // all additions
219 » » » newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite m) bool { 212 » » » newColl.ForEachItem(func(k, _ []byte) bool {
220 » » » » coll.Set(i.Key, []byte{}) 213 » » » » coll.Set(k, []byte{})
221 return true 214 return true
222 }) 215 })
223 case ov != nil && nv == nil: // all deletions 216 case ov != nil && nv == nil: // all deletions
224 » » » oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite m) bool { 217 » » » oldColl.ForEachItem(func(k, _ []byte) bool {
225 » » » » coll.Delete(i.Key) 218 » » » » coll.Delete(k)
226 return true 219 return true
227 }) 220 })
228 case ov != nil && nv != nil: // merge 221 case ov != nil && nv != nil: // merge
229 » » » gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { 222 » » » memStoreCollide(oldColl, newColl, func(k, ov, nv []byte) {
230 if nv == nil { 223 if nv == nil {
231 coll.Delete(k) 224 coll.Delete(k)
232 } else { 225 } else {
233 coll.Set(k, []byte{}) 226 coll.Set(k, []byte{})
234 } 227 }
235 }) 228 })
236 default: 229 default:
237 » » » impossible(fmt.Errorf("both values from gkvCollide were nil?")) 230 » » » impossible(fmt.Errorf("both values from memStoreCollide were nil?"))
238 } 231 }
239 // TODO(riannucci): remove entries from idxColl and remove index collections 232 // TODO(riannucci): remove entries from idxColl and remove index collections
240 // when there are no index entries for that index any more. 233 // when there are no index entries for that index any more.
241 }) 234 })
242 } 235 }
243 236
244 func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) { 237 func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) {
245 normalized := make([]*ds.IndexDefinition, len(compIdx)) 238 normalized := make([]*ds.IndexDefinition, len(compIdx))
246 idxColl := store.GetOrCreateCollection("idx") 239 idxColl := store.GetOrCreateCollection("idx")
247 for i, idx := range compIdx { 240 for i, idx := range compIdx {
248 normalized[i] = idx.Normalize() 241 normalized[i] = idx.Normalize()
249 idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{}) 242 idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
250 } 243 }
251 244
252 for _, ns := range namespaces(store) { 245 for _, ns := range namespaces(store) {
246 kctx := ds.MkKeyContext(aid, ns)
253 if allEnts := store.Snapshot().GetCollection("ents:" + ns); allE nts != nil { 247 if allEnts := store.Snapshot().GetCollection("ents:" + ns); allE nts != nil {
254 » » » allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item ) bool { 248 » » » allEnts.ForEachItem(func(ik, iv []byte) bool {
255 » » » » pm, err := rpm(i.Val) 249 » » » » pm, err := rpm(iv)
256 memoryCorruption(err) 250 memoryCorruption(err)
257 251
258 » » » » prop, err := serialize.ReadProperty(bytes.NewBuf fer(i.Key), serialize.WithoutContext, ds.MkKeyContext(aid, ns)) 252 » » » » prop, err := serialize.ReadProperty(bytes.NewBuf fer(ik), serialize.WithoutContext, kctx)
259 memoryCorruption(err) 253 memoryCorruption(err)
260 254
261 k := prop.Value().(*ds.Key) 255 k := prop.Value().(*ds.Key)
262 256
263 sip := serialize.PropertyMapPartially(k, pm) 257 sip := serialize.PropertyMapPartially(k, pm)
264 258
265 mergeIndexes(ns, store, 259 mergeIndexes(ns, store,
266 newMemStore(), 260 newMemStore(),
267 indexEntries(k, sip, normalized)) 261 indexEntries(k, sip, normalized))
268 return true 262 return true
(...skipping 12 matching lines...) Expand all
281 var compIdx []*ds.IndexDefinition 275 var compIdx []*ds.IndexDefinition
282 walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool { 276 walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool {
283 compIdx = append(compIdx, i) 277 compIdx = append(compIdx, i)
284 return true 278 return true
285 }) 279 })
286 280
287 mergeIndexes(key.Namespace(), store, 281 mergeIndexes(key.Namespace(), store,
288 indexEntriesWithBuiltins(key, oldEnt, compIdx), 282 indexEntriesWithBuiltins(key, oldEnt, compIdx),
289 indexEntriesWithBuiltins(key, newEnt, compIdx)) 283 indexEntriesWithBuiltins(key, newEnt, compIdx))
290 } 284 }
OLDNEW
« no previous file with comments | « impl/memory/README.md ('k') | impl/memory/datastore_index_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698