 Chromium Code Reviews
 Chromium Code Reviews Issue 1302813003:
  impl/memory: Implement Queries  (Closed) 
  Base URL: https://github.com/luci/gae.git@add_multi_iterator
    
  
    Issue 1302813003:
  impl/memory: Implement Queries  (Closed) 
  Base URL: https://github.com/luci/gae.git@add_multi_iterator| Index: impl/memory/datastore_index.go | 
| diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go | 
| index f22a970a8e9ef62593c2b60be332c08bd98086b6..4c37e7136b4aec16e0c167b6a2ce9be42cfaf059 100644 | 
| --- a/impl/memory/datastore_index.go | 
| +++ b/impl/memory/datastore_index.go | 
| @@ -14,8 +14,6 @@ import ( | 
| "github.com/luci/gkvlite" | 
| ) | 
| -var indexCreationDeterministic = false | 
| - | 
| type qIndexSlice []*ds.IndexDefinition | 
| func (s qIndexSlice) Len() int { return len(s) } | 
| @@ -39,15 +37,15 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { | 
| ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}}) | 
| ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}}) | 
| } | 
| - if indexCreationDeterministic { | 
| + if serializationDeterministic { | 
| sort.Sort(ret) | 
| } | 
| return ret | 
| } | 
| func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore { | 
| - sip := partiallySerialize(pm) | 
| - return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...)) | 
| + sip := partiallySerialize(k, pm) | 
| + return sip.indexEntries(k.Namespace(), append(defaultIndicies(k.Kind(), pm), complexIdxs...)) | 
| } | 
| // serializedPvals is all of the serialized DSProperty values in qASC order. | 
| @@ -58,29 +56,33 @@ func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | 
| func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 } | 
| // prop name -> [<serialized DSProperty>, ...] | 
| +// includes special values '__key__' and '__ancestor__' which contains all of | 
| +// the ancestor entries for this key. | 
| type serializedIndexablePmap map[string]serializedPvals | 
| -func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) { | 
| - if len(pm) == 0 { | 
| - return | 
| +func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePmap) { | 
| + ret = make(serializedIndexablePmap, len(pm)+2) | 
| + ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))} | 
| + for k != nil { | 
| + ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBytes(ds.MkProperty(k))) | 
| + k = k.Parent() | 
| } | 
| - | 
| - buf := &bytes.Buffer{} | 
| - ret = make(serializedIndexablePmap, len(pm)) | 
| for k, vals := range pm { | 
| + dups := map[string]struct{}{} | 
| newVals := make(serializedPvals, 0, len(vals)) | 
| for _, v := range vals { | 
| if v.IndexSetting() == ds.NoIndex { | 
| continue | 
| } | 
| - buf.Reset() | 
| - serialize.WriteProperty(buf, serialize.WithoutContext, v) | 
| - newVal := make([]byte, buf.Len()) | 
| - copy(newVal, buf.Bytes()) | 
| - newVals = append(newVals, newVal) | 
| + data := serialize.ToBytes(v) | 
| + dataS := string(data) | 
| + if _, ok := dups[dataS]; ok { | 
| + continue | 
| + } | 
| + dups[dataS] = struct{}{} | 
| + newVals = append(newVals, data) | 
| } | 
| if len(newVals) > 0 { | 
| - sort.Sort(newVals) | 
| ret[k] = newVals | 
| } | 
| } | 
| @@ -95,7 +97,7 @@ type indexRowGen struct { | 
| } | 
| // permute calls cb for each index row, in the sorted order of the rows. | 
| -func (s indexRowGen) permute(cb func([]byte)) { | 
| +func (s indexRowGen) permute(collSetFn func(k, v []byte)) { | 
| iVec := make([]int, len(s.propVec)) | 
| iVecLim := make([]int, len(s.propVec)) | 
| @@ -137,18 +139,13 @@ func (s indexRowGen) permute(cb func([]byte)) { | 
| for pvalSliceIdx, pvalIdx := range iVec { | 
| bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) | 
| } | 
| - buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) | 
| + buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz))) | 
| for pvalSliceIdx, pvalIdx := range iVec { | 
| data := s.propVec[pvalSliceIdx][pvalIdx] | 
| - if s.orders[pvalSliceIdx] == ds.ASCENDING { | 
| - buf.Write(data) | 
| - } else { | 
| - for _, b := range data { | 
| - buf.WriteByte(b ^ 0xFF) | 
| - } | 
| - } | 
| + buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING) | 
| + buf.Write(data) | 
| } | 
| - cb(buf.Bytes()) | 
| + collSetFn(buf.Bytes(), []byte{}) | 
| if !incPos() { | 
| break | 
| } | 
| @@ -162,13 +159,10 @@ type matcher struct { | 
| // matcher.match checks to see if the mapped, serialized property values | 
| // match the index. If they do, it returns a indexRowGen. Do not write or modify | 
| // the data in the indexRowGen. | 
| -func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) { | 
| +func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (indexRowGen, bool) { | 
| m.buf.propVec = m.buf.propVec[:0] | 
| m.buf.orders = m.buf.orders[:0] | 
| - for _, sb := range idx.SortBy { | 
| - if sb.Property == "__key__" { | 
| - panic("don't know how to build compound index on __key__") | 
| - } | 
| + for _, sb := range sortBy { | 
| if pv, ok := sip[sb.Property]; ok { | 
| m.buf.propVec = append(m.buf.propVec, pv) | 
| m.buf.orders = append(m.buf.orders, sb.Direction) | 
| @@ -179,97 +173,71 @@ func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i | 
| return m.buf, true | 
| } | 
| -func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore { | 
| +func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefinition) *memStore { | 
| ret := newMemStore() | 
| idxColl := ret.SetCollection("idx", nil) | 
| - // getIdxEnts retrieves an index collection or adds it if it's not there. | 
| - getIdxEnts := func(qi *ds.IndexDefinition) *memCollection { | 
| - b := serialize.ToBytes(*qi) | 
| - idxColl.Set(b, []byte{}) | 
| - return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil) | 
| - } | 
| - | 
| - keyData := serialize.ToBytes(k) | 
| - | 
| - walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) { | 
| - prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil. | 
| - irg.permute(func(data []byte) { | 
| - buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData))) | 
| - buf.Write(prefix) | 
| - buf.Write(data) | 
| - buf.Write(keyData) | 
| - ents.Set(buf.Bytes(), prev) | 
| - prev = data | 
| - }) | 
| - } | 
| mtch := matcher{} | 
| for _, idx := range idxs { | 
| - if irg, ok := mtch.match(idx, sip); ok { | 
| - idxEnts := getIdxEnts(idx) | 
| - if len(irg.propVec) == 0 { | 
| - idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil | 
| - } else if idx.Ancestor { | 
| - for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() { | 
| - walkPermutations(serialize.ToBytes(ancKey), irg, idxEnts) | 
| - } | 
| - } else { | 
| - walkPermutations(nil, irg, idxEnts) | 
| - } | 
| + idx = idx.Normalize() | 
| + if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok { | 
| + idxBin := serialize.ToBytes(*idx.PrepForIdxTable()) | 
| + idxColl.Set(idxBin, []byte{}) | 
| 
dnj (Google)
2015/08/28 17:54:21
Use []byte(nil) here, as []byte{} allocates an emp
 
iannucci
2015/08/28 19:48:55
If gkvlite didn't panic on nil values, I would tot
 | 
| + coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil) | 
| + irg.permute(coll.Set) | 
| } | 
| } | 
| return ret | 
| } | 
| -func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition { | 
| - // load all current complex query index definitions. | 
| - compIdx := []*ds.IndexDefinition{} | 
| - complexQueryPrefix := ds.IndexComplexQueryPrefix() | 
| - idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool { | 
| - if !bytes.HasPrefix(i.Key, complexQueryPrefix) { | 
| - return false | 
| - } | 
| - qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) | 
| - if err != nil { | 
| - panic(err) // memory corruption | 
| - } | 
| - compIdx = append(compIdx, &qi) | 
| - return true | 
| - }) | 
| - return compIdx | 
| -} | 
| - | 
| -func getIdxColl(store *memStore) *memCollection { | 
| +// walkCompIdxs walks the table of compound indexes in the store. If `endsWith` | 
| +// is provided, this will only walk over compound indexes which match | 
| +// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix. | 
| +func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) { | 
| idxColl := store.GetCollection("idx") | 
| if idxColl == nil { | 
| - idxColl = store.SetCollection("idx", nil) | 
| + return | 
| + } | 
| + itrDef := iterDefinition{c: idxColl} | 
| + | 
| + if endsWith != nil { | 
| + full := serialize.ToBytes(*endsWith.Flip()) | 
| + // chop off the null terminating byte | 
| + itrDef.prefix = full[:len(full)-1] | 
| + } | 
| + | 
| + for it := itrDef.mkIter(); !it.stopped; { | 
| + it.next(nil, func(i *gkvlite.Item) { | 
| + if i == nil { | 
| + return | 
| + } | 
| + qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key)) | 
| + memoryCorruption(err) | 
| + if !cb(qi.Flip()) { | 
| + it.stop() | 
| 
dnj (Google)
2015/08/28 17:54:21
It'd be better to arrange to do this via defer, as
 
iannucci
2015/08/28 19:48:55
I think you mean 'additionally do this via a defer
 | 
| + } | 
| + }) | 
| } | 
| - return idxColl | 
| } | 
| func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { | 
| - idxColl := getIdxColl(store) | 
| prefix := "idx:" + ns + ":" | 
| gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) { | 
| ks := prefix + string(k) | 
| 
dnj (Google)
2015/08/28 17:54:21
Since you do this a lot, you should probably alloc
 
iannucci
2015/08/28 19:48:55
done. though this is definitely a micro-optimizati
 | 
| - if idxColl.Get(k) == nil { | 
| - // avoids unnecessary mutation, otherwise the idx collection thrashes on | 
| - // every update. | 
| - idxColl.Set(k, []byte{}) | 
| - } | 
| coll := store.GetCollection(ks) | 
| if coll == nil { | 
| coll = store.SetCollection(ks, nil) | 
| } | 
| + | 
| oldColl := oldIdx.GetCollection(ks) | 
| newColl := newIdx.GetCollection(ks) | 
| switch { | 
| case ov == nil && nv != nil: // all additions | 
| newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool { | 
| - coll.Set(i.Key, i.Val) | 
| + coll.Set(i.Key, []byte{}) | 
| return true | 
| }) | 
| case ov != nil && nv == nil: // all deletions | 
| @@ -282,11 +250,11 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { | 
| if nv == nil { | 
| coll.Delete(k) | 
| } else { | 
| - coll.Set(k, nv) | 
| + coll.Set(k, []byte{}) | 
| } | 
| }) | 
| default: | 
| - panic("impossible") | 
| + impossible(fmt.Errorf("both values from gkvCollide were nil?")) | 
| } | 
| // TODO(riannucci): remove entries from idxColl and remove index collections | 
| // when there are no index entries for that index any more. | 
| @@ -294,24 +262,40 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) { | 
| } | 
| func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) { | 
| - store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { | 
| - pm, err := rpmWoCtx(i.Val, ns) | 
| - if err != nil { | 
| - panic(err) // memory corruption | 
| - } | 
| - k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns) | 
| - if err != nil { | 
| - panic(err) | 
| - } | 
| - sip := partiallySerialize(pm) | 
| - mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx)) | 
| - return true | 
| - }) | 
| + normalized := make([]*ds.IndexDefinition, len(compIdx)) | 
| + idxColl := store.SetCollection("idx", nil) | 
| + for i, idx := range compIdx { | 
| + normalized[i] = idx.Normalize() | 
| + idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{}) | 
| + } | 
| + | 
| + if allEnts := store.GetCollection("ents:" + ns); allEnts != nil { | 
| + allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { | 
| + pm, err := rpmWoCtx(i.Val, ns) | 
| + memoryCorruption(err) | 
| + | 
| + prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns) | 
| + memoryCorruption(err) | 
| + | 
| + k := prop.Value().(ds.Key) | 
| + | 
| + sip := partiallySerialize(k, pm) | 
| + | 
| + mergeIndexes(ns, store, | 
| + newMemStore(), | 
| + sip.indexEntries(ns, normalized)) | 
| + return true | 
| + }) | 
| + } | 
| } | 
| func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { | 
| // load all current complex query index definitions. | 
| - compIdx := getCompIdxs(getIdxColl(store)) | 
| + compIdx := []*ds.IndexDefinition{} | 
| + walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool { | 
| + compIdx = append(compIdx, i) | 
| + return true | 
| + }) | 
| mergeIndexes(key.Namespace(), store, | 
| indexEntriesWithBuiltins(key, oldEnt, compIdx), |