Index: go/src/infra/gae/libs/wrapper/memory/plist.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/plist.go b/go/src/infra/gae/libs/wrapper/memory/plist.go |
index bfbb4be91d55d879fc0e1a7d26acebe971e958f4..211759eb3dd4f175b1f66c995459cb88f4c3e8ad 100644 |
--- a/go/src/infra/gae/libs/wrapper/memory/plist.go |
+++ b/go/src/infra/gae/libs/wrapper/memory/plist.go |
@@ -8,12 +8,14 @@ import ( |
"bytes" |
"fmt" |
"reflect" |
+ "sort" |
"time" |
- "github.com/luci/luci-go/common/funnybase" |
- |
"appengine" |
"appengine/datastore" |
+ |
+ "github.com/luci/gkvlite" |
+ "github.com/luci/luci-go/common/funnybase" |
) |
type typData struct { |
@@ -52,13 +54,13 @@ func newTypData(noIndex bool, v interface{}) (ret *typData, err error) { |
typ = pvKey |
} |
if typ == pvUNKNOWN { |
- err = fmt.Errorf("propValTypeOf: unknown type of %#v", v) |
+ err = fmt.Errorf("propValTypeOf: unknown type of %T: %#v", v, v) |
} |
return &typData{noIndex, typ, v}, err |
} |
-func (td *typData) WriteBinary(buf *bytes.Buffer) error { |
+func (td *typData) WriteBinary(buf *bytes.Buffer, nso nsOption) error { |
typb := byte(td.typ) |
if td.noIndex { |
typb |= 0x80 |
@@ -80,14 +82,11 @@ func (td *typData) WriteBinary(buf *bytes.Buffer) error { |
writeBytes(buf, td.data.(datastore.ByteString)) |
} |
case pvTime: |
- t := td.data.(time.Time) |
- funnybase.WriteUint(buf, uint64(t.Unix())*1e6+uint64(t.Nanosecond()/1e3)) |
+ writeTime(buf, td.data.(time.Time)) |
case pvGeoPoint: |
- t := td.data.(appengine.GeoPoint) |
- writeFloat64(buf, t.Lat) |
- writeFloat64(buf, t.Lng) |
+ writeGeoPoint(buf, td.data.(appengine.GeoPoint)) |
case pvKey: |
- writeKey(buf, withNS, td.data.(*datastore.Key)) |
+ writeKey(buf, nso, td.data.(*datastore.Key)) |
case pvBlobKey: |
writeString(buf, string(td.data.(appengine.BlobKey))) |
default: |
@@ -96,7 +95,7 @@ func (td *typData) WriteBinary(buf *bytes.Buffer) error { |
return nil |
} |
-func (td *typData) ReadBinary(buf *bytes.Buffer) error { |
+func (td *typData) ReadBinary(buf *bytes.Buffer, nso nsOption, ns string) error { |
typb, err := buf.ReadByte() |
if err != nil { |
return err |
@@ -111,24 +110,14 @@ func (td *typData) ReadBinary(buf *bytes.Buffer) error { |
case pvBoolFalse: |
td.data = false |
case pvInt: |
- v, err := funnybase.Read(buf) |
- if err != nil { |
- return err |
- } |
- td.data = v |
+ td.data, err = funnybase.Read(buf) |
case pvFloat: |
td.data, err = readFloat64(buf) |
- if err != nil { |
- return err |
- } |
case pvStr: |
td.data, err = readString(buf) |
- if err != nil { |
- return err |
- } |
case pvBytes: |
- b, err := readBytes(buf) |
- if err != nil { |
+ b := []byte(nil) |
+ if b, err = readBytes(buf); err != nil { |
return err |
} |
if td.noIndex { |
@@ -137,30 +126,14 @@ func (td *typData) ReadBinary(buf *bytes.Buffer) error { |
td.data = datastore.ByteString(b) |
} |
case pvTime: |
- v, err := funnybase.ReadUint(buf) |
- if err != nil { |
- return err |
- } |
- td.data = time.Unix(int64(v/1e6), int64((v%1e6)*1e3)) |
+ td.data, err = readTime(buf) |
case pvGeoPoint: |
- pt := appengine.GeoPoint{} |
- pt.Lat, err = readFloat64(buf) |
- if err != nil { |
- return err |
- } |
- pt.Lng, err = readFloat64(buf) |
- if err != nil { |
- return err |
- } |
- td.data = pt |
+ td.data, err = readGeoPoint(buf) |
case pvKey: |
- td.data, err = readKey(buf, true) |
- if err != nil { |
- return err |
- } |
+ td.data, err = readKey(buf, nso, ns) |
case pvBlobKey: |
- s, err := readString(buf) |
- if err != nil { |
+ s := "" |
+ if s, err = readString(buf); err != nil { |
return err |
} |
td.data = appengine.BlobKey(s) |
@@ -168,13 +141,12 @@ func (td *typData) ReadBinary(buf *bytes.Buffer) error { |
return fmt.Errorf("read: unknown type! %v", td) |
} |
- return nil |
+ return err |
} |
-type pval struct { |
- name string |
- multi bool |
- vals []*typData |
+type pvals struct { |
+ name string |
+ vals []*typData |
} |
type propertyList []datastore.Property |
@@ -189,50 +161,347 @@ func (pl *propertyList) Save(ch chan<- datastore.Property) error { |
return (*datastore.PropertyList)(pl).Save(ch) |
} |
-func (pl *propertyList) collate() ([]*pval, error) { |
+// collatedProperties is the reduction of a *propertyList such that each entry |
+// in a collatedProperties has a unique name. For example, collating this: |
+// pl := &propertyList{ |
+// datastore.Property{Name: "wat", Val: "hello"}, |
+// datastore.Property{Name: "other", Val: 100}, |
+// datastore.Property{Name: "wat", Val: "goodbye", noIndex: true}, |
+// } |
+// |
+// Would get a collatedProperties which looked like: |
+// c := collatedProperties{ |
+// &pvals{"wat", []*typData{&{false, pvStr, "hello"}, |
+// &{true, pvStr, "goodbye"}}}, |
+// &pvals{"other", []*typData{&{false, pvInt, 100}}} |
+// } |
+type collatedProperties []*pvals |
+ |
+func (c collatedProperties) defaultIndicies(kind string) []*qIndex { |
+ ret := make([]*qIndex, 0, 2*len(c)+1) |
+ ret = append(ret, &qIndex{kind, false, nil}) |
+ for _, pvals := range c { |
+ needsIndex := false |
+ for _, v := range pvals.vals { |
+ if !v.noIndex { |
+ needsIndex = true |
+ break |
+ } |
+ } |
+ if !needsIndex { |
+ continue |
+ } |
+ ret = append(ret, &qIndex{kind, false, []qSortBy{{pvals.name, qASC}}}) |
+ ret = append(ret, &qIndex{kind, false, []qSortBy{{pvals.name, qDEC}}}) |
+ } |
+ return ret |
+} |
+ |
+// serializedPval is a single pvals.vals entry which has been serialized (in |
+// qASC order). |
+type serializedPval []byte |
+ |
+// serializedPvals is all of the pvals.vals entries from a single pvals (in qASC |
+// order). It does not include the pvals.name field. |
+type serializedPvals []serializedPval |
+ |
+func (s serializedPvals) Len() int { return len(s) } |
+func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
+func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 } |
+ |
+type mappedPlist map[string]serializedPvals |
+ |
+func (c collatedProperties) indexableMap() (mappedPlist, error) { |
+ ret := make(mappedPlist, len(c)) |
+ for _, pv := range c { |
+ data := make(serializedPvals, 0, len(pv.vals)) |
+ for _, v := range pv.vals { |
+ if v.noIndex { |
+ continue |
+ } |
+ buf := &bytes.Buffer{} |
+ if err := v.WriteBinary(buf, noNS); err != nil { |
+ return nil, err |
+ } |
+ data = append(data, buf.Bytes()) |
+ } |
+ if len(data) == 0 { |
+ continue |
+ } |
+ sort.Sort(data) |
+ ret[pv.name] = data |
+ } |
+ return ret, nil |
+} |
+ |
+// indexRowGen contains enough information to generate all of the index rows which |
+// correspond with a propertyList and a qIndex. |
+type indexRowGen struct { |
+ propVec []serializedPvals |
+ orders []qDirection |
+} |
+ |
+// permute calls cb for each index row, in the sorted order of the rows. |
+func (s indexRowGen) permute(cb func([]byte)) { |
+ iVec := make([]int, len(s.propVec)) |
+ iVecLim := make([]int, len(s.propVec)) |
+ |
+ incPos := func() bool { |
+ for i := len(iVec) - 1; i >= 0; i-- { |
+ var done bool |
+ var newVal int |
+ if s.orders[i] == qASC { |
+ newVal = (iVec[i] + 1) % iVecLim[i] |
+ done = newVal != 0 |
+ } else { |
+ newVal = (iVec[i] - 1) |
+ if newVal < 0 { |
+ newVal = iVecLim[i] - 1 |
+ } else { |
+ done = true |
+ } |
+ } |
+ iVec[i] = newVal |
+ if done { |
+ return true |
+ } |
+ } |
+ return false |
+ } |
+ |
+ for i, sps := range s.propVec { |
+ iVecLim[i] = len(sps) |
+ } |
+ |
+ for i := range iVec { |
+ if s.orders[i] == qDEC { |
+ iVec[i] = iVecLim[i] - 1 |
+ } |
+ } |
+ |
+ for { |
+ bufsiz := 0 |
+ for pvalSliceIdx, pvalIdx := range iVec { |
+ bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) |
+ } |
+ buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) |
+ for pvalSliceIdx, pvalIdx := range iVec { |
+ data := s.propVec[pvalSliceIdx][pvalIdx] |
+ if s.orders[pvalSliceIdx] == qASC { |
+ buf.Write(data) |
+ } else { |
+ for _, b := range data { |
+ buf.WriteByte(b ^ 0xFF) |
+ } |
+ } |
+ } |
+ cb(buf.Bytes()) |
+ if !incPos() { |
+ break |
+ } |
+ } |
+} |
+ |
+type matcher struct { |
+ buf indexRowGen |
+} |
+ |
+// matcher.match checks to see if the mapped, serialized property values |
+// match the index. If they do, it returns a indexRowGen. Do not write or modify |
+// the data in the indexRowGen. |
+func (m *matcher) match(idx *qIndex, mpvals mappedPlist) (indexRowGen, bool) { |
+ m.buf.propVec = m.buf.propVec[:0] |
+ m.buf.orders = m.buf.orders[:0] |
+ for _, sb := range idx.sortby { |
+ if pv, ok := mpvals[sb.prop]; ok { |
+ m.buf.propVec = append(m.buf.propVec, pv) |
+ m.buf.orders = append(m.buf.orders, sb.dir) |
+ } else { |
+ return indexRowGen{}, false |
+ } |
+ } |
+ return m.buf, true |
+} |
+ |
+func (c collatedProperties) indexEntries(k *datastore.Key, idxs []*qIndex) (*memStore, error) { |
+ m, err := c.indexableMap() |
+ if err != nil { |
+ return nil, err |
+ } |
+ |
+ ret := newMemStore() |
+ idxColl := ret.SetCollection("idx", nil) |
+ // getIdxEnts retrieves an index collection or adds it if it's not there. |
+ getIdxEnts := func(qi *qIndex) *memCollection { |
+ buf := &bytes.Buffer{} |
+ qi.WriteBinary(buf) |
+ b := buf.Bytes() |
+ idxColl.Set(b, []byte{}) |
+ return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil) |
+ } |
+ |
+ buf := &bytes.Buffer{} |
+ writeKey(buf, noNS, k) // ns is in idxEnts collection name. |
+ keyData := buf.Bytes() |
+ |
+ walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollection) { |
+ prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil. |
+ irg.permute(func(data []byte) { |
+ buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(data)+len(keyData))) |
+ buf.Write(prefix) |
+ buf.Write(data) |
+ buf.Write(keyData) |
+ ents.Set(buf.Bytes(), prev) |
+ prev = data |
+ }) |
+ } |
+ |
+ mtch := matcher{} |
+ for _, idx := range idxs { |
+ if irg, ok := mtch.match(idx, m); ok { |
+ idxEnts := getIdxEnts(idx) |
+ if len(irg.propVec) == 0 { |
+ idxEnts.Set(keyData, []byte{}) // propless index, e.g. kind -> key = nil |
+ } else if idx.ancestor { |
+ for ancKey := k; ancKey != nil; ancKey = ancKey.Parent() { |
+ buf := &bytes.Buffer{} |
+ writeKey(buf, noNS, ancKey) |
+ walkPermutations(buf.Bytes(), irg, idxEnts) |
+ } |
+ } else { |
+ walkPermutations(nil, irg, idxEnts) |
+ } |
+ } |
+ } |
+ |
+ return ret, nil |
+} |
+ |
+func (pl *propertyList) indexEntriesWithBuiltins(k *datastore.Key, complexIdxs []*qIndex) (ret *memStore, err error) { |
+ c, err := pl.collate() |
+ if err == nil { |
+ ret, err = c.indexEntries(k, append(c.defaultIndicies(k.Kind()), complexIdxs...)) |
+ } |
+ return |
+} |
+ |
+func (pl *propertyList) collate() (collatedProperties, error) { |
if pl == nil || len(*pl) == 0 { |
return nil, nil |
} |
- cols := []*pval{} |
+ cols := []*pvals{} |
colIdx := map[string]int{} |
for _, p := range *pl { |
- if idx, ok := colIdx[p.Name]; !ok { |
- colIdx[p.Name] = len(cols) |
+ if idx, ok := colIdx[p.Name]; ok { |
+ c := cols[idx] |
td, err := newTypData(p.NoIndex, p.Value) |
if err != nil { |
return nil, err |
} |
- cols = append(cols, &pval{p.Name, p.Multiple, []*typData{td}}) |
+ c.vals = append(c.vals, td) |
} else { |
- c := cols[idx] |
- if c.multi != p.Multiple { |
- return nil, fmt.Errorf( |
- "propertyList.MarshalBinary: field %q has conflicting values of Multiple", p.Name) |
- } |
+ colIdx[p.Name] = len(cols) |
td, err := newTypData(p.NoIndex, p.Value) |
if err != nil { |
return nil, err |
} |
- c.vals = append(c.vals, td) |
+ cols = append(cols, &pvals{p.Name, []*typData{td}}) |
} |
} |
return cols, nil |
} |
-func (pl *propertyList) addCollated(pv *pval) { |
+func (pl *propertyList) addCollated(pv *pvals) { |
for _, v := range pv.vals { |
*pl = append(*pl, datastore.Property{ |
Name: pv.name, |
- Multiple: pv.multi, |
+ Multiple: len(pv.vals) > 1, |
NoIndex: v.noIndex, |
Value: v.data, |
}) |
} |
} |
+func updateIndicies(store *memStore, key *datastore.Key, oldEnt, newEnt *propertyList) error { |
+ var err error |
+ |
+ idxColl := store.GetCollection("idx") |
+ if idxColl == nil { |
+ idxColl = store.SetCollection("idx", nil) |
+ } |
+ |
+ // load all current complex query index definitions. |
+ compIdx := []*qIndex{} |
+ idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item) bool { |
+ if !bytes.HasPrefix(i.Key, complexQueryPrefix) { |
+ return false |
+ } |
+ qi := &qIndex{} |
+ if err = qi.ReadBinary(bytes.NewBuffer(i.Key)); err != nil { |
+ return false |
+ } |
+ compIdx = append(compIdx, qi) |
+ return true |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ oldIdx, err := oldEnt.indexEntriesWithBuiltins(key, compIdx) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ newIdx, err := newEnt.indexEntriesWithBuiltins(key, compIdx) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ prefix := "idx:" + key.Namespace() + ":" |
+ |
+ gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), func(k, ov, nv []byte) { |
+ ks := prefix + string(k) |
+ idxColl.Set(k, []byte{}) |
+ |
+ coll := store.GetCollection(ks) |
+ if coll == nil { |
+ coll = store.SetCollection(ks, nil) |
+ } |
+ oldColl := oldIdx.GetCollection(ks) |
+ newColl := newIdx.GetCollection(ks) |
+ |
+ switch { |
+ case ov == nil && nv != nil: // all additions |
+ newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool { |
+ coll.Set(i.Key, i.Val) |
+ return true |
+ }) |
+ case ov != nil && nv == nil: // all deletions |
+ oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Item) bool { |
+ coll.Delete(i.Key) |
+ return true |
+ }) |
+ case ov != nil && nv != nil: // merge |
+ gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { |
+ if nv == nil { |
+ coll.Delete(k) |
+ } else { |
+ coll.Set(k, nv) |
+ } |
+ }) |
+ default: |
+ panic("impossible") |
+ } |
+ // TODO(riannucci): remove entries from idxColl and remove index collections |
+ // when there are no index entries for that index any more. |
+ }) |
+ |
+ return nil |
+} |
+ |
func (pl *propertyList) MarshalBinary() ([]byte, error) { |
cols, err := pl.collate() |
if err != nil || len(cols) == 0 { |
@@ -261,7 +530,7 @@ func (pl *propertyList) UnmarshalBinary(data []byte) error { |
return err |
} |
- pv := &pval{name: name} |
+ pv := &pvals{name: name} |
err = pv.ReadBinary(buf) |
if err != nil { |
return err |
@@ -300,6 +569,8 @@ var byteSliceType = reflect.TypeOf([]byte(nil)) |
// These constants are in the order described by |
// https://cloud.google.com/appengine/docs/go/datastore/entities#Go_Value_type_ordering |
// with a slight divergence for the Int/Time split. |
+// NOTE: this enum can only occupy 7 bits, because we use the high bit to encode |
+// indexed/non-indexed. See typData.WriteBinary. |
const ( |
pvNull propValType = iota |
pvInt |
@@ -337,17 +608,16 @@ const ( |
pvUNKNOWN |
) |
-func (p *pval) ReadBinary(buf *bytes.Buffer) error { |
+func (p *pvals) ReadBinary(buf *bytes.Buffer) error { |
n, err := funnybase.ReadUint(buf) |
if err != nil { |
return err |
} |
- p.multi = n > 1 |
p.vals = make([]*typData, n) |
for i := range p.vals { |
p.vals[i] = &typData{} |
- err := p.vals[i].ReadBinary(buf) |
+ err := p.vals[i].ReadBinary(buf, withNS, "") |
if err != nil { |
return err |
} |
@@ -356,10 +626,10 @@ func (p *pval) ReadBinary(buf *bytes.Buffer) error { |
return nil |
} |
-func (p *pval) WriteBinary(buf *bytes.Buffer) error { |
+func (p *pvals) WriteBinary(buf *bytes.Buffer) error { |
funnybase.WriteUint(buf, uint64(len(p.vals))) |
for _, v := range p.vals { |
- if err := v.WriteBinary(buf); err != nil { |
+ if err := v.WriteBinary(buf, withNS); err != nil { |
return err |
} |
} |