| Index: impl/memory/datastore_index.go
|
| diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go
|
| index 879258316e9cb7ed309d8d96086aed267a609a82..46c2ec5173c92f6b283ba55edebd77ba8ef02c92 100644
|
| --- a/impl/memory/datastore_index.go
|
| +++ b/impl/memory/datastore_index.go
|
| @@ -15,15 +15,15 @@ import (
|
|
|
| var indexCreationDeterministic = false
|
|
|
| -type qIndexSlice []*qIndex
|
| +type qIndexSlice []*ds.IndexDefinition
|
|
|
| func (s qIndexSlice) Len() int { return len(s) }
|
| func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
| func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
|
|
|
| -func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex {
|
| +func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
|
| ret := make(qIndexSlice, 0, 2*len(pmap)+1)
|
| - ret = append(ret, &qIndex{kind, false, nil})
|
| + ret = append(ret, &ds.IndexDefinition{Kind: kind})
|
| for name, pvals := range pmap {
|
| needsIndex := false
|
| for _, v := range pvals {
|
| @@ -35,8 +35,8 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex {
|
| if !needsIndex {
|
| continue
|
| }
|
| - ret = append(ret, &qIndex{kind, false, []qSortBy{{name, qASC}}})
|
| - ret = append(ret, &qIndex{kind, false, []qSortBy{{name, qDEC}}})
|
| + ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}})
|
| + ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}})
|
| }
|
| if indexCreationDeterministic {
|
| sort.Sort(ret)
|
| @@ -44,7 +44,7 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex {
|
| return ret
|
| }
|
|
|
| -func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*qIndex) *memStore {
|
| +func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore {
|
| sip := partiallySerialize(pm)
|
| return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...))
|
| }
|
| @@ -87,10 +87,10 @@ func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) {
|
| }
|
|
|
| // indexRowGen contains enough information to generate all of the index rows which
|
| -// correspond with a propertyList and a qIndex.
|
| +// correspond with a propertyList and a ds.IndexDefinition.
|
| type indexRowGen struct {
|
| propVec []serializedPvals
|
| - orders []qDirection
|
| + orders []ds.IndexDirection
|
| }
|
|
|
| // permute calls cb for each index row, in the sorted order of the rows.
|
| @@ -102,7 +102,7 @@ func (s indexRowGen) permute(cb func([]byte)) {
|
| for i := len(iVec) - 1; i >= 0; i-- {
|
| var done bool
|
| var newVal int
|
| - if s.orders[i] == qASC {
|
| + if s.orders[i] == ds.ASCENDING {
|
| newVal = (iVec[i] + 1) % iVecLim[i]
|
| done = newVal != 0
|
| } else {
|
| @@ -126,7 +126,7 @@ func (s indexRowGen) permute(cb func([]byte)) {
|
| }
|
|
|
| for i := range iVec {
|
| - if s.orders[i] == qDEC {
|
| + if s.orders[i] == ds.DESCENDING {
|
| iVec[i] = iVecLim[i] - 1
|
| }
|
| }
|
| @@ -139,7 +139,7 @@ func (s indexRowGen) permute(cb func([]byte)) {
|
| buf := bytes.NewBuffer(make([]byte, 0, bufsiz))
|
| for pvalSliceIdx, pvalIdx := range iVec {
|
| data := s.propVec[pvalSliceIdx][pvalIdx]
|
| - if s.orders[pvalSliceIdx] == qASC {
|
| + if s.orders[pvalSliceIdx] == ds.ASCENDING {
|
| buf.Write(data)
|
| } else {
|
| for _, b := range data {
|
| @@ -161,13 +161,13 @@ type matcher struct {
|
| // matcher.match checks to see if the mapped, serialized property values
|
| // match the index. If they do, it returns a indexRowGen. Do not write or modify
|
| // the data in the indexRowGen.
|
| -func (m *matcher) match(idx *qIndex, sip serializedIndexablePmap) (indexRowGen, bool) {
|
| +func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) {
|
| m.buf.propVec = m.buf.propVec[:0]
|
| m.buf.orders = m.buf.orders[:0]
|
| - for _, sb := range idx.sortby {
|
| - if pv, ok := sip[sb.prop]; ok {
|
| + for _, sb := range idx.SortBy {
|
| + if pv, ok := sip[sb.Property]; ok {
|
| m.buf.propVec = append(m.buf.propVec, pv)
|
| - m.buf.orders = append(m.buf.orders, sb.dir)
|
| + m.buf.orders = append(m.buf.orders, sb.Direction)
|
| } else {
|
| return indexRowGen{}, false
|
| }
|
| @@ -175,13 +175,13 @@ func (m *matcher) match(idx *qIndex, sip serializedIndexablePmap) (indexRowGen,
|
| return m.buf, true
|
| }
|
|
|
| -func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memStore {
|
| +func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore {
|
| ret := newMemStore()
|
| idxColl := ret.SetCollection("idx", nil)
|
| // getIdxEnts retrieves an index collection or adds it if it's not there.
|
| - getIdxEnts := func(qi *qIndex) *memCollection {
|
| + getIdxEnts := func(qi *ds.IndexDefinition) *memCollection {
|
| buf := &bytes.Buffer{}
|
| - qi.WriteBinary(buf)
|
| + qi.Write(buf)
|
| b := buf.Bytes()
|
| idxColl.Set(b, []byte{})
|
| return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil)
|
| @@ -209,7 +209,7 @@ func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memSt
|
| idxEnts := getIdxEnts(idx)
|
| if len(irg.propVec) == 0 {
|
| idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil
|
| - } else if idx.ancestor {
|
| + } else if idx.Ancestor {
|
| for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() {
|
| buf := &bytes.Buffer{}
|
| ds.WriteKey(buf, ds.WithoutContext, ancKey)
|
| @@ -224,35 +224,42 @@ func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memSt
|
| return ret
|
| }
|
|
|
| -func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
|
| - idxColl := store.GetCollection("idx")
|
| - if idxColl == nil {
|
| - idxColl = store.SetCollection("idx", nil)
|
| - }
|
| -
|
| +func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition {
|
| // load all current complex query index definitions.
|
| - compIdx := []*qIndex{}
|
| + compIdx := []*ds.IndexDefinition{}
|
| + complexQueryPrefix := ds.IndexComplexQueryPrefix()
|
| idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool {
|
| if !bytes.HasPrefix(i.Key, complexQueryPrefix) {
|
| return false
|
| }
|
| - qi := &qIndex{}
|
| - if err := qi.ReadBinary(bytes.NewBuffer(i.Key)); err != nil {
|
| + qi := &ds.IndexDefinition{}
|
| + if err := qi.Read(bytes.NewBuffer(i.Key)); err != nil {
|
| panic(err) // memory corruption
|
| }
|
| compIdx = append(compIdx, qi)
|
| return true
|
| })
|
| + return compIdx
|
| +}
|
|
|
| - oldIdx := indexEntriesWithBuiltins(key, oldEnt, compIdx)
|
| -
|
| - newIdx := indexEntriesWithBuiltins(key, newEnt, compIdx)
|
| -
|
| - prefix := "idx:" + key.Namespace() + ":"
|
| +func getIdxColl(store *memStore) *memCollection {
|
| + idxColl := store.GetCollection("idx")
|
| + if idxColl == nil {
|
| + idxColl = store.SetCollection("idx", nil)
|
| + }
|
| + return idxColl
|
| +}
|
|
|
| +func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
|
| + idxColl := getIdxColl(store)
|
| + prefix := "idx:" + ns + ":"
|
| gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) {
|
| ks := prefix + string(k)
|
| - idxColl.Set(k, []byte{})
|
| + if idxColl.Get(k) == nil {
|
| + // avoids unnecessary mutation, otherwise the idx collection thrashes on
|
| + // every update.
|
| + idxColl.Set(k, []byte{})
|
| + }
|
|
|
| coll := store.GetCollection(ks)
|
| if coll == nil {
|
| @@ -287,3 +294,28 @@ func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap)
|
| // when there are no index entries for that index any more.
|
| })
|
| }
|
| +
|
| +func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) {
|
| + store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
|
| + pm, err := rpmWoCtx(i.Val, ns)
|
| + if err != nil {
|
| + panic(err) // memory corruption
|
| + }
|
| + k, err := ds.ReadKey(bytes.NewBuffer(i.Key), ds.WithoutContext, globalAppID, ns)
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| + sip := partiallySerialize(pm)
|
| + mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx))
|
| + return true
|
| + })
|
| +}
|
| +
|
| +func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
|
| + // load all current complex query index definitions.
|
| + compIdx := getCompIdxs(getIdxColl(store))
|
| +
|
| + mergeIndexes(key.Namespace(), store,
|
| + indexEntriesWithBuiltins(key, oldEnt, compIdx),
|
| + indexEntriesWithBuiltins(key, newEnt, compIdx))
|
| +}
|
|
|