| Index: impl/memory/datastore_index.go
 | 
| diff --git a/impl/memory/datastore_index.go b/impl/memory/datastore_index.go
 | 
| index f22a970a8e9ef62593c2b60be332c08bd98086b6..f4db119e6bd8af872caa037749419d62181a2e38 100644
 | 
| --- a/impl/memory/datastore_index.go
 | 
| +++ b/impl/memory/datastore_index.go
 | 
| @@ -14,15 +14,13 @@ import (
 | 
|  	"github.com/luci/gkvlite"
 | 
|  )
 | 
|  
 | 
| -var indexCreationDeterministic = false
 | 
| -
 | 
|  type qIndexSlice []*ds.IndexDefinition
 | 
|  
 | 
|  func (s qIndexSlice) Len() int           { return len(s) }
 | 
|  func (s qIndexSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 | 
|  func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
 | 
|  
 | 
| -func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
 | 
| +func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
 | 
|  	ret := make(qIndexSlice, 0, 2*len(pmap)+1)
 | 
|  	ret = append(ret, &ds.IndexDefinition{Kind: kind})
 | 
|  	for name, pvals := range pmap {
 | 
| @@ -39,15 +37,15 @@ func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
 | 
|  		ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name}}})
 | 
|  		ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.IndexColumn{{Property: name, Direction: ds.DESCENDING}}})
 | 
|  	}
 | 
| -	if indexCreationDeterministic {
 | 
| +	if serializationDeterministic {
 | 
|  		sort.Sort(ret)
 | 
|  	}
 | 
|  	return ret
 | 
|  }
 | 
|  
 | 
|  func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.IndexDefinition) *memStore {
 | 
| -	sip := partiallySerialize(pm)
 | 
| -	return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complexIdxs...))
 | 
| +	sip := partiallySerialize(k, pm)
 | 
| +	return sip.indexEntries(k.Namespace(), append(defaultIndexes(k.Kind(), pm), complexIdxs...))
 | 
|  }
 | 
|  
 | 
|  // serializedPvals is all of the serialized DSProperty values in qASC order.
 | 
| @@ -58,29 +56,32 @@ func (s serializedPvals) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
 | 
|  func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 }
 | 
|  
 | 
|  // prop name -> [<serialized DSProperty>, ...]
 | 
| +// includes special values '__key__' and '__ancestor__' which contains all of
 | 
| +// the ancestor entries for this key.
 | 
|  type serializedIndexablePmap map[string]serializedPvals
 | 
|  
 | 
| -func partiallySerialize(pm ds.PropertyMap) (ret serializedIndexablePmap) {
 | 
| -	if len(pm) == 0 {
 | 
| -		return
 | 
| +func partiallySerialize(k ds.Key, pm ds.PropertyMap) (ret serializedIndexablePmap) {
 | 
| +	ret = make(serializedIndexablePmap, len(pm)+2)
 | 
| +	ret["__key__"] = [][]byte{serialize.ToBytes(ds.MkProperty(k))}
 | 
| +	for k != nil {
 | 
| +		ret["__ancestor__"] = append(ret["__ancestor__"], serialize.ToBytes(ds.MkProperty(k)))
 | 
| +		k = k.Parent()
 | 
|  	}
 | 
| -
 | 
| -	buf := &bytes.Buffer{}
 | 
| -	ret = make(serializedIndexablePmap, len(pm))
 | 
|  	for k, vals := range pm {
 | 
| +		dups := stringSet{}
 | 
|  		newVals := make(serializedPvals, 0, len(vals))
 | 
|  		for _, v := range vals {
 | 
|  			if v.IndexSetting() == ds.NoIndex {
 | 
|  				continue
 | 
|  			}
 | 
| -			buf.Reset()
 | 
| -			serialize.WriteProperty(buf, serialize.WithoutContext, v)
 | 
| -			newVal := make([]byte, buf.Len())
 | 
| -			copy(newVal, buf.Bytes())
 | 
| -			newVals = append(newVals, newVal)
 | 
| +			data := serialize.ToBytes(v)
 | 
| +			dataS := string(data)
 | 
| +			if !dups.add(dataS) {
 | 
| +				continue
 | 
| +			}
 | 
| +			newVals = append(newVals, data)
 | 
|  		}
 | 
|  		if len(newVals) > 0 {
 | 
| -			sort.Sort(newVals)
 | 
|  			ret[k] = newVals
 | 
|  		}
 | 
|  	}
 | 
| @@ -95,7 +96,7 @@ type indexRowGen struct {
 | 
|  }
 | 
|  
 | 
|  // permute calls cb for each index row, in the sorted order of the rows.
 | 
| -func (s indexRowGen) permute(cb func([]byte)) {
 | 
| +func (s indexRowGen) permute(collSetFn func(k, v []byte)) {
 | 
|  	iVec := make([]int, len(s.propVec))
 | 
|  	iVecLim := make([]int, len(s.propVec))
 | 
|  
 | 
| @@ -137,18 +138,13 @@ func (s indexRowGen) permute(cb func([]byte)) {
 | 
|  		for pvalSliceIdx, pvalIdx := range iVec {
 | 
|  			bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx])
 | 
|  		}
 | 
| -		buf := bytes.NewBuffer(make([]byte, 0, bufsiz))
 | 
| +		buf := serialize.Invertible(bytes.NewBuffer(make([]byte, 0, bufsiz)))
 | 
|  		for pvalSliceIdx, pvalIdx := range iVec {
 | 
|  			data := s.propVec[pvalSliceIdx][pvalIdx]
 | 
| -			if s.orders[pvalSliceIdx] == ds.ASCENDING {
 | 
| -				buf.Write(data)
 | 
| -			} else {
 | 
| -				for _, b := range data {
 | 
| -					buf.WriteByte(b ^ 0xFF)
 | 
| -				}
 | 
| -			}
 | 
| +			buf.SetInvert(s.orders[pvalSliceIdx] == ds.DESCENDING)
 | 
| +			buf.Write(data)
 | 
|  		}
 | 
| -		cb(buf.Bytes())
 | 
| +		collSetFn(buf.Bytes(), []byte{})
 | 
|  		if !incPos() {
 | 
|  			break
 | 
|  		}
 | 
| @@ -162,13 +158,10 @@ type matcher struct {
 | 
|  // matcher.match checks to see if the mapped, serialized property values
 | 
|  // match the index. If they do, it returns a indexRowGen. Do not write or modify
 | 
|  // the data in the indexRowGen.
 | 
| -func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (indexRowGen, bool) {
 | 
| +func (m *matcher) match(sortBy []ds.IndexColumn, sip serializedIndexablePmap) (indexRowGen, bool) {
 | 
|  	m.buf.propVec = m.buf.propVec[:0]
 | 
|  	m.buf.orders = m.buf.orders[:0]
 | 
| -	for _, sb := range idx.SortBy {
 | 
| -		if sb.Property == "__key__" {
 | 
| -			panic("don't know how to build compound index on __key__")
 | 
| -		}
 | 
| +	for _, sb := range sortBy {
 | 
|  		if pv, ok := sip[sb.Property]; ok {
 | 
|  			m.buf.propVec = append(m.buf.propVec, pv)
 | 
|  			m.buf.orders = append(m.buf.orders, sb.Direction)
 | 
| @@ -179,97 +172,75 @@ func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i
 | 
|  	return m.buf, true
 | 
|  }
 | 
|  
 | 
| -func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefinition) *memStore {
 | 
| +func (sip serializedIndexablePmap) indexEntries(ns string, idxs []*ds.IndexDefinition) *memStore {
 | 
|  	ret := newMemStore()
 | 
|  	idxColl := ret.SetCollection("idx", nil)
 | 
| -	// getIdxEnts retrieves an index collection or adds it if it's not there.
 | 
| -	getIdxEnts := func(qi *ds.IndexDefinition) *memCollection {
 | 
| -		b := serialize.ToBytes(*qi)
 | 
| -		idxColl.Set(b, []byte{})
 | 
| -		return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil)
 | 
| -	}
 | 
| -
 | 
| -	keyData := serialize.ToBytes(k)
 | 
| -
 | 
| -	walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) {
 | 
| -		prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil.
 | 
| -		irg.permute(func(data []byte) {
 | 
| -			buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData)))
 | 
| -			buf.Write(prefix)
 | 
| -			buf.Write(data)
 | 
| -			buf.Write(keyData)
 | 
| -			ents.Set(buf.Bytes(), prev)
 | 
| -			prev = data
 | 
| -		})
 | 
| -	}
 | 
|  
 | 
|  	mtch := matcher{}
 | 
|  	for _, idx := range idxs {
 | 
| -		if irg, ok := mtch.match(idx, sip); ok {
 | 
| -			idxEnts := getIdxEnts(idx)
 | 
| -			if len(irg.propVec) == 0 {
 | 
| -				idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil
 | 
| -			} else if idx.Ancestor {
 | 
| -				for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() {
 | 
| -					walkPermutations(serialize.ToBytes(ancKey), irg, idxEnts)
 | 
| -				}
 | 
| -			} else {
 | 
| -				walkPermutations(nil, irg, idxEnts)
 | 
| -			}
 | 
| +		idx = idx.Normalize()
 | 
| +		if irg, ok := mtch.match(idx.GetFullSortOrder(), sip); ok {
 | 
| +			idxBin := serialize.ToBytes(*idx.PrepForIdxTable())
 | 
| +			idxColl.Set(idxBin, []byte{})
 | 
| +			coll := ret.SetCollection(fmt.Sprintf("idx:%s:%s", ns, idxBin), nil)
 | 
| +			irg.permute(coll.Set)
 | 
|  		}
 | 
|  	}
 | 
|  
 | 
|  	return ret
 | 
|  }
 | 
|  
 | 
| -func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition {
 | 
| -	// load all current complex query index definitions.
 | 
| -	compIdx := []*ds.IndexDefinition{}
 | 
| -	complexQueryPrefix := ds.IndexComplexQueryPrefix()
 | 
| -	idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool {
 | 
| -		if !bytes.HasPrefix(i.Key, complexQueryPrefix) {
 | 
| -			return false
 | 
| -		}
 | 
| -		qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
 | 
| -		if err != nil {
 | 
| -			panic(err) // memory corruption
 | 
| -		}
 | 
| -		compIdx = append(compIdx, &qi)
 | 
| -		return true
 | 
| -	})
 | 
| -	return compIdx
 | 
| -}
 | 
| -
 | 
| -func getIdxColl(store *memStore) *memCollection {
 | 
| +// walkCompIdxs walks the table of compound indexes in the store. If `endsWith`
 | 
| +// is provided, this will only walk over compound indexes which match
 | 
| +// Kind, Ancestor, and whose SortBy has `endsWith.SortBy` as a suffix.
 | 
| +func walkCompIdxs(store *memStore, endsWith *ds.IndexDefinition, cb func(*ds.IndexDefinition) bool) {
 | 
|  	idxColl := store.GetCollection("idx")
 | 
|  	if idxColl == nil {
 | 
| -		idxColl = store.SetCollection("idx", nil)
 | 
| +		return
 | 
| +	}
 | 
| +	itrDef := iterDefinition{c: idxColl}
 | 
| +
 | 
| +	if endsWith != nil {
 | 
| +		full := serialize.ToBytes(*endsWith.Flip())
 | 
| +		// chop off the null terminating byte
 | 
| +		itrDef.prefix = full[:len(full)-1]
 | 
| +	}
 | 
| +
 | 
| +	it := itrDef.mkIter()
 | 
| +	defer it.stop()
 | 
| +	for !it.stopped {
 | 
| +		it.next(nil, func(i *gkvlite.Item) {
 | 
| +			if i == nil {
 | 
| +				return
 | 
| +			}
 | 
| +			qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(i.Key))
 | 
| +			memoryCorruption(err)
 | 
| +			if !cb(qi.Flip()) {
 | 
| +				it.stop()
 | 
| +			}
 | 
| +		})
 | 
|  	}
 | 
| -	return idxColl
 | 
|  }
 | 
|  
 | 
|  func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
 | 
| -	idxColl := getIdxColl(store)
 | 
| -	prefix := "idx:" + ns + ":"
 | 
| +	prefixBuf := []byte("idx:" + ns + ":")
 | 
| +	origPrefixBufLen := len(prefixBuf)
 | 
|  	gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) {
 | 
| -		ks := prefix + string(k)
 | 
| -		if idxColl.Get(k) == nil {
 | 
| -			// avoids unnecessary mutation, otherwise the idx collection thrashes on
 | 
| -			// every update.
 | 
| -			idxColl.Set(k, []byte{})
 | 
| -		}
 | 
| +		prefixBuf = append(prefixBuf[:origPrefixBufLen], k...)
 | 
| +		ks := string(prefixBuf)
 | 
|  
 | 
|  		coll := store.GetCollection(ks)
 | 
|  		if coll == nil {
 | 
|  			coll = store.SetCollection(ks, nil)
 | 
|  		}
 | 
| +
 | 
|  		oldColl := oldIdx.GetCollection(ks)
 | 
|  		newColl := newIdx.GetCollection(ks)
 | 
|  
 | 
|  		switch {
 | 
|  		case ov == nil && nv != nil: // all additions
 | 
|  			newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool {
 | 
| -				coll.Set(i.Key, i.Val)
 | 
| +				coll.Set(i.Key, []byte{})
 | 
|  				return true
 | 
|  			})
 | 
|  		case ov != nil && nv == nil: // all deletions
 | 
| @@ -282,11 +253,11 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
 | 
|  				if nv == nil {
 | 
|  					coll.Delete(k)
 | 
|  				} else {
 | 
| -					coll.Set(k, nv)
 | 
| +					coll.Set(k, []byte{})
 | 
|  				}
 | 
|  			})
 | 
|  		default:
 | 
| -			panic("impossible")
 | 
| +			impossible(fmt.Errorf("both values from gkvCollide were nil?"))
 | 
|  		}
 | 
|  		// TODO(riannucci): remove entries from idxColl and remove index collections
 | 
|  		// when there are no index entries for that index any more.
 | 
| @@ -294,24 +265,40 @@ func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
 | 
|  }
 | 
|  
 | 
|  func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) {
 | 
| -	store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
 | 
| -		pm, err := rpmWoCtx(i.Val, ns)
 | 
| -		if err != nil {
 | 
| -			panic(err) // memory corruption
 | 
| -		}
 | 
| -		k, err := serialize.ReadKey(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns)
 | 
| -		if err != nil {
 | 
| -			panic(err)
 | 
| -		}
 | 
| -		sip := partiallySerialize(pm)
 | 
| -		mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compIdx))
 | 
| -		return true
 | 
| -	})
 | 
| +	normalized := make([]*ds.IndexDefinition, len(compIdx))
 | 
| +	idxColl := store.SetCollection("idx", nil)
 | 
| +	for i, idx := range compIdx {
 | 
| +		normalized[i] = idx.Normalize()
 | 
| +		idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()), []byte{})
 | 
| +	}
 | 
| +
 | 
| +	if allEnts := store.GetCollection("ents:" + ns); allEnts != nil {
 | 
| +		allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool {
 | 
| +			pm, err := rpmWoCtx(i.Val, ns)
 | 
| +			memoryCorruption(err)
 | 
| +
 | 
| +			prop, err := serialize.ReadProperty(bytes.NewBuffer(i.Key), serialize.WithoutContext, globalAppID, ns)
 | 
| +			memoryCorruption(err)
 | 
| +
 | 
| +			k := prop.Value().(ds.Key)
 | 
| +
 | 
| +			sip := partiallySerialize(k, pm)
 | 
| +
 | 
| +			mergeIndexes(ns, store,
 | 
| +				newMemStore(),
 | 
| +				sip.indexEntries(ns, normalized))
 | 
| +			return true
 | 
| +		})
 | 
| +	}
 | 
|  }
 | 
|  
 | 
| -func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
 | 
| +func updateIndexes(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
 | 
|  	// load all current complex query index definitions.
 | 
| -	compIdx := getCompIdxs(getIdxColl(store))
 | 
| +	compIdx := []*ds.IndexDefinition{}
 | 
| +	walkCompIdxs(store, nil, func(i *ds.IndexDefinition) bool {
 | 
| +		compIdx = append(compIdx, i)
 | 
| +		return true
 | 
| +	})
 | 
|  
 | 
|  	mergeIndexes(key.Namespace(), store,
 | 
|  		indexEntriesWithBuiltins(key, oldEnt, compIdx),
 | 
| 
 |