Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Unified Diff: impl/memory/datastore_index.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: stringSet everywhere Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/memory/datastore_data.go ('k') | impl/memory/datastore_index_selection.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/memory/datastore_index.go
diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go
index f22a970a8e9ef62593c2b60be332c08bd98086b6..f4db119e6bd8af872caa037749419d62181a2e38 100644
--- a/impl/memory/datastore_index.go
+++ b/impl/memory/datastore_index.go
@@ -14,15 +14,13 @@ import (
"github.com/luci/gkvlite"
)
-var indexCreationDeterministic = false
-
type qIndexSlice []*ds.IndexDefinition
func (s qIndexSlice) Len() int { return len(s) }
func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
-func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
+func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
ret := make(qIndexSlice, 0, 2*len(pmap)+1)
ret = append(ret, &ds.IndexDefinition{Kind: kind})
for name, pvals := range pmap {
@@ -39,15 +37,15 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}})
ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}})
}
- if indexCreationDeterministic {
+ if serializationDeterministic {
sort.Sort(ret)
}
return ret
}
func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore {
- sip := partiallySerialize(pm)
- return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...))
+ sip := partiallySerialize(k, pm)
+ return sip.indexEntries(k.Namespace(), append(defaultIndexes(k.Kind(), pm), complexIdxs...))
}
// serializedPvals is all of the serialized DSProperty values in qASC order.
@@ -58,29 +56,32 @@ func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 }
// prop name -> [<serialized DSProperty>, ...]
+// includes special values '__key__' and '__ancestor__' which contains all of
+// the ancestor entries for this key.
type serializedIndexablePmap map[string]serializedPvals
-func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) {
- if len(pm) == 0 {
- return
+func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePmap) {
+ ret = make(serializedIndexablePmap, len(pm)+2)
+ ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))}
+ for k != nil {
+ ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBytes(ds.MkProperty(k)))
+ k = k.Parent()
}
-
- buf := &bytes.Buffer{}
- ret = make(serializedIndexablePmap, len(pm))
for k, vals := range pm {
+ dups := stringSet{}
newVals := make(serializedPvals, 0, len(vals))
for _, v := range vals {
if v.IndexSetting() == ds.NoIndex {
continue
}
- buf.Reset()
- serialize.WriteProperty(buf, serialize.WithoutContext, v)
- newVal := make([]byte, buf.Len())
- copy(newVal, buf.Bytes())
- newVals = append(newVals, newVal)
+ data := serialize.ToBytes(v)
+ dataS := string(data)
+ if !dups.add(dataS) {
+ continue
+ }
+ newVals = append(newVals, data)
}
if len(newVals) > 0 {
- sort.Sort(newVals)
ret[k] = newVals
}
}
@@ -95,7 +96,7 @@ type indexRowGen struct {
}
// permute calls cb for each index row, in the sorted order of the rows.
-func (s indexRowGen) permute(cb func([]byte)) {
+func (s indexRowGen) permute(collSetFn func(k, v []byte)) {
iVec := make([]int, len(s.propVec))
iVecLim := make([]int, len(s.propVec))
@@ -137,18 +138,13 @@ func (s indexRowGen) permute(cb func([]byte)) {
for pvalSliceIdx, pvalIdx := range iVec {
bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx])
}
- buf := bytes.NewBuffer(make([]byte, 0, bufsiz))
+ buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz)))
for pvalSliceIdx, pvalIdx := range iVec {
data := s.propVec[pvalSliceIdx][pvalIdx]
- if s.orders[pvalSliceIdx] == ds.ASCENDING {
- buf.Write(data)
- } else {
- for _, b := range data {
- buf.WriteByte(b ^ 0xFF)
- }
- }
+ buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING)
+ buf.Write(data)
}
- cb(buf.Bytes())
+ collSetFn(buf.Bytes(), []byte{})
if !incPos() {
break
}
@@ -162,13 +158,10 @@ type matcher struct {
// matcher.match checks to see if the mapped, serialized property values
// match the index. If they do, it returns a indexRowGen. Do not write or modify
// the data in the indexRowGen.
-func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) {
+func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (indexRowGen, bool) {
m.buf.propVec = m.buf.propVec[:0]
m.buf.orders = m.buf.orders[:0]
- for _, sb := range idx.SortBy {
- if sb.Property == "__key__" {
- panic("don't know how to build compound index on __key__")
- }
+ for _, sb := range sortBy {
if pv, ok := sip[sb.Property]; ok {
m.buf.propVec = append(m.buf.propVec, pv)
m.buf.orders = append(m.buf.orders, sb.Direction)
@@ -179,97 +172,75 @@ func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i
return m.buf, true
}
-func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore {
+func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefinition) *memStore {
ret := newMemStore()
idxColl := ret.SetCollection("idx", nil)
- // getIdxEnts retrieves an index collection or adds it if it's not there.
- getIdxEnts := func(qi *ds.IndexDefinition) *memCollection {
- b := serialize.ToBytes(*qi)
- idxColl.Set(b, []byte{})
- return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil)
- }
-
- keyData := serialize.ToBytes(k)
-
- walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) {
- prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil.
- irg.permute(func(data []byte) {
- buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData)))
- buf.Write(prefix)
- buf.Write(data)
- buf.Write(keyData)
- ents.Set(buf.Bytes(), prev)
- prev = data
- })
- }
mtch := matcher{}
for _, idx := range idxs {
- if irg, ok := mtch.match(idx, sip); ok {
- idxEnts := getIdxEnts(idx)
- if len(irg.propVec) == 0 {
- idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil
- } else if idx.Ancestor {
- for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() {
- walkPermutations(serialize.ToBytes(ancKey), irg, idxEnts)
- }
- } else {
- walkPermutations(nil, irg, idxEnts)
- }
+ idx = idx.Normalize()
+ if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok {
+ idxBin := serialize.ToBytes(*idx.PrepForIdxTable())
+ idxColl.Set(idxBin, []byte{})
+ coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil)
+ irg.permute(coll.Set)
}
}
return ret
}
-func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition {
- // load all current complex query index definitions.
- compIdx := []*ds.IndexDefinition{}
- complexQueryPrefix := ds.IndexComplexQueryPrefix()
- idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool {
- if !bytes.HasPrefix(i.Key, complexQueryPrefix) {
- return false
- }
- qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
- if err != nil {
- panic(err) // memory corruption
- }
- compIdx = append(compIdx, &qi)
- return true
- })
- return compIdx
-}
-
-func getIdxColl(store *memStore) *memCollection {
+// walkCompIdxs walks the table of compound indexes in the store. If `endsWith`
+// is provided, this will only walk over compound indexes which match
+// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix.
+func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) {
idxColl := store.GetCollection("idx")
if idxColl == nil {
- idxColl = store.SetCollection("idx", nil)
+ return
+ }
+ itrDef := iterDefinition{c: idxColl}
+
+ if endsWith != nil {
+ full := serialize.ToBytes(*endsWith.Flip())
+ // chop off the null terminating byte
+ itrDef.prefix = full[:len(full)-1]
+ }
+
+ it := itrDef.mkIter()
+ defer it.stop()
+ for !it.stopped {
+ it.next(nil, func(i *gkvlite.Item) {
+ if i == nil {
+ return
+ }
+ qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
+ memoryCorruption(err)
+ if !cb(qi.Flip()) {
+ it.stop()
+ }
+ })
}
- return idxColl
}
func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
- idxColl := getIdxColl(store)
- prefix := "idx:" + ns + ":"
+ prefixBuf := []byte("idx:" + ns + ":")
+ origPrefixBufLen := len(prefixBuf)
gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) {
- ks := prefix + string(k)
- if idxColl.Get(k) == nil {
- // avoids unnecessary mutation, otherwise the idx collection thrashes on
- // every update.
- idxColl.Set(k, []byte{})
- }
+ prefixBuf = append(prefixBuf[:origPrefixBufLen], k...)
+ ks := string(prefixBuf)
coll := store.GetCollection(ks)
if coll == nil {
coll = store.SetCollection(ks, nil)
}
+
oldColl := oldIdx.GetCollection(ks)
newColl := newIdx.GetCollection(ks)
switch {
case ov == nil && nv != nil: // all additions
newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool {
- coll.Set(i.Key, i.Val)
+ coll.Set(i.Key, []byte{})
return true
})
case ov != nil && nv == nil: // all deletions
@@ -282,11 +253,11 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
if nv == nil {
coll.Delete(k)
} else {
- coll.Set(k, nv)
+ coll.Set(k, []byte{})
}
})
default:
- panic("impossible")
+ impossible(fmt.Errorf("both values from gkvCollide were nil?"))
}
// TODO(riannucci): remove entries from idxColl and remove index collections
// when there are no index entries for that index any more.
@@ -294,24 +265,40 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
}
func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) {
- store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
- pm, err := rpmWoCtx(i.Val, ns)
- if err != nil {
- panic(err) // memory corruption
- }
- k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns)
- if err != nil {
- panic(err)
- }
- sip := partiallySerialize(pm)
- mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx))
- return true
- })
+ normalized := make([]*ds.IndexDefinition, len(compIdx))
+ idxColl := store.SetCollection("idx", nil)
+ for i, idx := range compIdx {
+ normalized[i] = idx.Normalize()
+ idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
+ }
+
+ if allEnts := store.GetCollection("ents:" + ns); allEnts != nil {
+ allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
+ pm, err := rpmWoCtx(i.Val, ns)
+ memoryCorruption(err)
+
+ prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns)
+ memoryCorruption(err)
+
+ k := prop.Value().(ds.Key)
+
+ sip := partiallySerialize(k, pm)
+
+ mergeIndexes(ns, store,
+ newMemStore(),
+ sip.indexEntries(ns, normalized))
+ return true
+ })
+ }
}
-func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
+func updateIndexes(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
// load all current complex query index definitions.
- compIdx := getCompIdxs(getIdxColl(store))
+ compIdx := []*ds.IndexDefinition{}
+ walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool {
+ compIdx = append(compIdx, i)
+ return true
+ })
mergeIndexes(key.Namespace(), store,
indexEntriesWithBuiltins(key, oldEnt, compIdx),
« no previous file with comments | « impl/memory/datastore_data.go ('k') | impl/memory/datastore_index_selection.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698