Chromium Code Reviews| Index: impl/memory/datastore_index.go |
| diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go |
| index f22a970a8e9ef62593c2b60be332c08bd98086b6..4c37e7136b4aec16e0c167b6a2ce9be42cfaf059 100644 |
| --- a/impl/memory/datastore_index.go |
| +++ b/impl/memory/datastore_index.go |
| @@ -14,8 +14,6 @@ import ( |
| "github.com/luci/gkvlite" |
| ) |
| -var indexCreationDeterministic = false |
| - |
| type qIndexSlice []*ds.IndexDefinition |
| func (s qIndexSlice) Len() int { return len(s) } |
| @@ -39,15 +37,15 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { |
| ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}}) |
| ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}}) |
| } |
| - if indexCreationDeterministic { |
| + if serializationDeterministic { |
| sort.Sort(ret) |
| } |
| return ret |
| } |
| func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore { |
| - sip := partiallySerialize(pm) |
| - return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...)) |
| + sip := partiallySerialize(k, pm) |
| + return sip.indexEntries(k.Namespace(), append(defaultIndicies(k.Kind(), pm), complexIdxs...)) |
| } |
| // serializedPvals is all of the serialized DSProperty values in qASC order. |
| @@ -58,29 +56,33 @@ func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
| func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 } |
| // prop name -> [<serialized DSProperty>, ...] |
| +// includes special values '__key__' and '__ancestor__' which contains all of |
| +// the ancestor entries for this key. |
| type serializedIndexablePmap map[string]serializedPvals |
| -func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) { |
| - if len(pm) == 0 { |
| - return |
| +func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePmap) { |
| + ret = make(serializedIndexablePmap, len(pm)+2) |
| + ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))} |
| + for k != nil { |
| + ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBytes(ds.MkProperty(k))) |
| + k = k.Parent() |
| } |
| - |
| - buf := &bytes.Buffer{} |
| - ret = make(serializedIndexablePmap, len(pm)) |
| for k, vals := range pm { |
| + dups := map[string]struct{}{} |
| newVals := make(serializedPvals, 0, len(vals)) |
| for _, v := range vals { |
| if v.IndexSetting() == ds.NoIndex { |
| continue |
| } |
| - buf.Reset() |
| - serialize.WriteProperty(buf, serialize.WithoutContext, v) |
| - newVal := make([]byte, buf.Len()) |
| - copy(newVal, buf.Bytes()) |
| - newVals = append(newVals, newVal) |
| + data := serialize.ToBytes(v) |
| + dataS := string(data) |
| + if _, ok := dups[dataS]; ok { |
| + continue |
| + } |
| + dups[dataS] = struct{}{} |
| + newVals = append(newVals, data) |
| } |
| if len(newVals) > 0 { |
| - sort.Sort(newVals) |
| ret[k] = newVals |
| } |
| } |
| @@ -95,7 +97,7 @@ type indexRowGen struct { |
| } |
| // permute calls cb for each index row, in the sorted order of the rows. |
| -func (s indexRowGen) permute(cb func([]byte)) { |
| +func (s indexRowGen) permute(collSetFn func(k, v []byte)) { |
| iVec := make([]int, len(s.propVec)) |
| iVecLim := make([]int, len(s.propVec)) |
| @@ -137,18 +139,13 @@ func (s indexRowGen) permute(cb func([]byte)) { |
| for pvalSliceIdx, pvalIdx := range iVec { |
| bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) |
| } |
| - buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) |
| + buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz))) |
| for pvalSliceIdx, pvalIdx := range iVec { |
| data := s.propVec[pvalSliceIdx][pvalIdx] |
| - if s.orders[pvalSliceIdx] == ds.ASCENDING { |
| - buf.Write(data) |
| - } else { |
| - for _, b := range data { |
| - buf.WriteByte(b ^ 0xFF) |
| - } |
| - } |
| + buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING) |
| + buf.Write(data) |
| } |
| - cb(buf.Bytes()) |
| + collSetFn(buf.Bytes(), []byte{}) |
| if !incPos() { |
| break |
| } |
| @@ -162,13 +159,10 @@ type matcher struct { |
| // matcher.match checks to see if the mapped, serialized property values |
| // match the index. If they do, it returns a indexRowGen. Do not write or modify |
| // the data in the indexRowGen. |
| -func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) { |
| +func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (indexRowGen, bool) { |
| m.buf.propVec = m.buf.propVec[:0] |
| m.buf.orders = m.buf.orders[:0] |
| - for _, sb := range idx.SortBy { |
| - if sb.Property == "__key__" { |
| - panic("don't know how to build compound index on __key__") |
| - } |
| + for _, sb := range sortBy { |
| if pv, ok := sip[sb.Property]; ok { |
| m.buf.propVec = append(m.buf.propVec, pv) |
| m.buf.orders = append(m.buf.orders, sb.Direction) |
| @@ -179,97 +173,71 @@ func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i |
| return m.buf, true |
| } |
| -func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore { |
| +func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefinition) *memStore { |
| ret := newMemStore() |
| idxColl := ret.SetCollection("idx", nil) |
| - // getIdxEnts retrieves an index collection or adds it if it's not there. |
| - getIdxEnts := func(qi *ds.IndexDefinition) *memCollection { |
| - b := serialize.ToBytes(*qi) |
| - idxColl.Set(b, []byte{}) |
| - return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil) |
| - } |
| - |
| - keyData := serialize.ToBytes(k) |
| - |
| - walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) { |
| - prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil. |
| - irg.permute(func(data []byte) { |
| - buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData))) |
| - buf.Write(prefix) |
| - buf.Write(data) |
| - buf.Write(keyData) |
| - ents.Set(buf.Bytes(), prev) |
| - prev = data |
| - }) |
| - } |
| mtch := matcher{} |
| for _, idx := range idxs { |
| - if irg, ok := mtch.match(idx, sip); ok { |
| - idxEnts := getIdxEnts(idx) |
| - if len(irg.propVec) == 0 { |
| - idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil |
| - } else if idx.Ancestor { |
| - for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() { |
| - walkPermutations(serialize.ToBytes(ancKey), irg, idxEnts) |
| - } |
| - } else { |
| - walkPermutations(nil, irg, idxEnts) |
| - } |
| + idx = idx.Normalize() |
| + if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok { |
| + idxBin := serialize.ToBytes(*idx.PrepForIdxTable()) |
| + idxColl.Set(idxBin, []byte{}) |
|
dnj (Google)
2015/08/28 17:54:21
Use []byte(nil) here, as []byte{} allocates an emp
iannucci
2015/08/28 19:48:55
If gkvlite didn't panic on nil values, I would tot
|
| + coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil) |
| + irg.permute(coll.Set) |
| } |
| } |
| return ret |
| } |
| -func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { |
| - // load all current complex query index definitions. |
| - compIdx := []*ds.IndexDefinition{} |
| - complexQueryPrefix := ds.IndexComplexQueryPrefix() |
| - idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool { |
| - if !bytes.HasPrefix(i.Key, complexQueryPrefix) { |
| - return false |
| - } |
| - qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) |
| - if err != nil { |
| - panic(err) // memory corruption |
| - } |
| - compIdx = append(compIdx, &qi) |
| - return true |
| - }) |
| - return compIdx |
| -} |
| - |
| -func getIdxColl(store *memStore) *memCollection { |
| +// walkCompIdxs walks the table of compound indexes in the store. If `endsWith` |
| +// is provided, this will only walk over compound indexes which match |
| +// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix. |
| +func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) { |
| idxColl := store.GetCollection("idx") |
| if idxColl == nil { |
| - idxColl = store.SetCollection("idx", nil) |
| + return |
| + } |
| + itrDef := iterDefinition{c: idxColl} |
| + |
| + if endsWith != nil { |
| + full := serialize.ToBytes(*endsWith.Flip()) |
| + // chop off the null terminating byte |
| + itrDef.prefix = full[:len(full)-1] |
| + } |
| + |
| + for it := itrDef.mkIter(); !it.stopped; { |
| + it.next(nil, func(i *gkvlite.Item) { |
| + if i == nil { |
| + return |
| + } |
| + qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) |
| + memoryCorruption(err) |
| + if !cb(qi.Flip()) { |
| + it.stop() |
|
dnj (Google)
2015/08/28 17:54:21
It'd be better to arrange to do this via defer, as
iannucci
2015/08/28 19:48:55
I think you mean 'additionally do this via a defer
|
| + } |
| + }) |
| } |
| - return idxColl |
| } |
| func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
| - idxColl := getIdxColl(store) |
| prefix := "idx:" + ns + ":" |
| gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) { |
| ks := prefix + string(k) |
|
dnj (Google)
2015/08/28 17:54:21
Since you do this a lot, you should probably alloc
iannucci
2015/08/28 19:48:55
done. though this is definitely a micro-optimizati
|
| - if idxColl.Get(k) == nil { |
| - // avoids unnecessary mutation, otherwise the idx collection thrashes on |
| - // every update. |
| - idxColl.Set(k, []byte{}) |
| - } |
| coll := store.GetCollection(ks) |
| if coll == nil { |
| coll = store.SetCollection(ks, nil) |
| } |
| + |
| oldColl := oldIdx.GetCollection(ks) |
| newColl := newIdx.GetCollection(ks) |
| switch { |
| case ov == nil && nv != nil: // all additions |
| newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool { |
| - coll.Set(i.Key, i.Val) |
| + coll.Set(i.Key, []byte{}) |
| return true |
| }) |
| case ov != nil && nv == nil: // all deletions |
| @@ -282,11 +250,11 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
| if nv == nil { |
| coll.Delete(k) |
| } else { |
| - coll.Set(k, nv) |
| + coll.Set(k, []byte{}) |
| } |
| }) |
| default: |
| - panic("impossible") |
| + impossible(fmt.Errorf("both values from gkvCollide were nil?")) |
| } |
| // TODO(riannucci): remove entries from idxColl and remove index collections |
| // when there are no index entries for that index any more. |
| @@ -294,24 +262,40 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { |
| } |
| func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { |
| - store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { |
| - pm, err := rpmWoCtx(i.Val, ns) |
| - if err != nil { |
| - panic(err) // memory corruption |
| - } |
| - k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns) |
| - if err != nil { |
| - panic(err) |
| - } |
| - sip := partiallySerialize(pm) |
| - mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx)) |
| - return true |
| - }) |
| + normalized := make([]*ds.IndexDefinition, len(compIdx)) |
| + idxColl := store.SetCollection("idx", nil) |
| + for i, idx := range compIdx { |
| + normalized[i] = idx.Normalize() |
| + idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{}) |
| + } |
| + |
| + if allEnts := store.GetCollection("ents:" + ns); allEnts != nil { |
| + allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { |
| + pm, err := rpmWoCtx(i.Val, ns) |
| + memoryCorruption(err) |
| + |
| + prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns) |
| + memoryCorruption(err) |
| + |
| + k := prop.Value().(ds.Key) |
| + |
| + sip := partiallySerialize(k, pm) |
| + |
| + mergeIndexes(ns, store, |
| + newMemStore(), |
| + sip.indexEntries(ns, normalized)) |
| + return true |
| + }) |
| + } |
| } |
| func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { |
| // load all current complex query index definitions. |
| - compIdx := getCompIdxs(getIdxColl(store)) |
| + compIdx := []*ds.IndexDefinition{} |
| + walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool { |
| + compIdx = append(compIdx, i) |
| + return true |
| + }) |
| mergeIndexes(key.Namespace(), store, |
| indexEntriesWithBuiltins(key, oldEnt, compIdx), |