Index: impl/memory/gkvlite_iter.go |
diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go |
index aa0e5b3343f0bf9d31e1810717a6cdf1517be669..417815da3574a1b4f1e1cbcf350a92e6e39cffc3 100644 |
--- a/impl/memory/gkvlite_iter.go |
+++ b/impl/memory/gkvlite_iter.go |
@@ -11,24 +11,98 @@ import ( |
"github.com/luci/gkvlite" |
) |
-// TODO(riannucci): Add a multi-iterator which allows: |
-// multiple collections |
-// sort smallest collection -> largest collection |
-// constant prefixes for each collection |
-// start + end partial suffixes |
-// * these are appended to the prefix for the collection to narrow the scan |
-// range. The collection scans will start at >= prefix+start suffix, and |
-// will scan until > prefix+end suffix (excluding this larger row). |
-// produces hits (a suffix+value) when all collections contain the same prefix |
-// and the same suffix |
-// * a hit value will have |
-// - a []byte suffix which is the matching suffix of every collection. |
-// The caller already knows the prefix for each collection. The caller |
-// may need to parse the suffix to separate the sort order(s) from the |
-// Key. |
-// dedup |
-// on each hit, the row values are concatenated and compared to the |
-// concatenated prefixes+startsuffix. If it's <=, it's skipped. |
+type iterDefinition struct { |
+ // The collection to iterate over |
+ c *memCollection |
+ |
+ // The prefix to always assert for every row. A nil prefix matches every row. |
+ prefix []byte |
+ |
+ // prefixLen is the number of prefix bytes that the caller cares about. It |
+ // may be <= len(prefix). When doing a multiIterator, this number will be used |
+ // to determine the amount of suffix to transfer accross iterators. This is |
+ // used specifically when using builtin indexes to service ancestor queries. |
+ // The builtin index represents the ancestor key with prefix bytes, but in a |
+ // multiIterator context, it wants the entire key to be included in the |
+ // suffix. |
+ prefixLen int |
+ |
+ // The start cursor. It's appended to prefix to find the first row. |
+ start []byte |
+ |
+ // The end cursor. It's appended to prefix to find the last row (which is not |
+ // included in the interation result). If this is nil, then there's no end |
+ // except the natural end of the collection. |
+ end []byte |
+} |
+ |
+func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { |
+ if len(defs) == 0 { |
+ return |
+ } |
+ |
+ ts := make([]*iterator, len(defs)) |
+ prefixLens := make([]int, len(defs)) |
+ for i, def := range defs { |
+ // bind i so that the defer below doesn't get goofed by the loop variable |
+ i := i |
+ ts[i] = def.mkIter() |
+ prefixLens[i] = def.prefixLen |
+ defer ts[i].stop() |
+ } |
+ |
+ suffix := []byte(nil) |
+ skip := -1 |
+ |
+ for { |
+ stop := false |
+ restart := false |
+ |
+ for idx, it := range ts { |
+ if skip >= 0 && skip == idx { |
+ continue |
+ } |
+ def := defs[idx] |
+ |
+ it.next(bjoin(def.prefix, suffix), func(itm *gkvlite.Item) { |
+ if itm == nil { |
+ // we hit the end of an iterator, we're now done with the whole |
+ // query. |
+ stop = true |
+ return |
+ } |
+ |
+ sfxRO := itm.Key[prefixLens[idx]:] |
+ |
+ if bytes.Compare(sfxRO, suffix) > 0 { |
+ // this row has a higher suffix than anything we've seen before. Set |
+ // ourself to be the skip, and resart this loop from the top. |
+ suffix = append(suffix[:0], sfxRO...) |
+ skip = idx |
+ if idx != 0 { |
+ // no point to restarting on the 0th index |
+ restart = true |
+ } |
+ } |
+ }) |
+ if stop || restart { |
+ break |
+ } |
+ } |
+ if stop { |
+ return |
+ } |
+ if restart { |
+ continue |
+ } |
+ |
+ if !cb(suffix) { |
+ return |
+ } |
+ suffix = nil |
+ skip = -1 |
+ } |
+} |
type cmd struct { |
targ []byte |
@@ -39,18 +113,28 @@ type iterator struct { |
stopper sync.Once |
stopped bool |
- prev []byte |
ch chan<- *cmd |
} |
-func newIterable(coll *memCollection, end []byte) *iterator { |
+func (def *iterDefinition) mkIter() *iterator { |
cmdChan := make(chan *cmd) |
ret := &iterator{ |
ch: cmdChan, |
} |
+ prefix := def.prefix |
+ collection := def.c |
+ |
+ // convert the suffixes from the iterDefinition into full rows for the |
+ // underlying storage. |
+ start := bjoin(prefix, def.start) |
+ |
+ end := []byte(nil) |
+ if def.end != nil { |
+ end = bjoin(prefix, def.end) |
+ } |
+ |
go func() { |
- defer ret.stop() |
c := (*cmd)(nil) |
ensureCmd := func() bool { |
if c == nil { |
@@ -61,32 +145,40 @@ func newIterable(coll *memCollection, end []byte) *iterator { |
} |
return true |
} |
+ if ensureCmd() { |
+ if bytes.Compare(c.targ, start) < 0 { |
+ c.targ = start |
+ } |
+ } |
+ |
+ defer ret.stop() |
for { |
if !ensureCmd() { |
return |
} |
- previous := c.targ |
- needCallback := true |
- coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool { |
+ terminalCallback := true |
+ collection.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool { |
if !ensureCmd() { |
return false |
} |
- if !bytes.Equal(previous, c.targ) { |
- // we need to start a new ascention function |
- needCallback = false |
+ if bytes.Compare(i.Key, c.targ) < 0 { |
+ // we need to start a new ascension function |
+ terminalCallback = false |
+ return false |
+ } |
+ if !bytes.HasPrefix(i.Key, prefix) { |
+ // we're no longer in prefix, terminate |
return false |
} |
if end != nil && bytes.Compare(i.Key, end) >= 0 { |
- // we hit our cap |
- ret.stop() |
+ // we hit our cap, terminate. |
return false |
} |
c.cb(i) |
- previous = i.Key |
c = nil |
return true |
}) |
- if c != nil && needCallback { |
+ if terminalCallback && ensureCmd() { |
c.cb(nil) |
c = nil |
} |
@@ -109,23 +201,14 @@ func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { |
return |
} |
- if targ == nil { |
- targ = t.prev |
- if targ == nil { |
- targ = []byte{} |
- } |
- } |
- |
waiter := make(chan struct{}) |
t.ch <- &cmd{targ, func(i *gkvlite.Item) { |
defer close(waiter) |
- cb(i) |
if i == nil { |
t.stop() |
- } else { |
- t.prev = i.Key |
} |
+ cb(i) |
}} |
<-waiter |
} |