| Index: impl/memory/gkvlite_iter.go
|
| diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go
|
| index aa0e5b3343f0bf9d31e1810717a6cdf1517be669..417815da3574a1b4f1e1cbcf350a92e6e39cffc3 100644
|
| --- a/impl/memory/gkvlite_iter.go
|
| +++ b/impl/memory/gkvlite_iter.go
|
| @@ -11,24 +11,98 @@ import (
|
| "github.com/luci/gkvlite"
|
| )
|
|
|
| -// TODO(riannucci): Add a multi-iterator which allows:
|
| -// multiple collections
|
| -// sort smallest collection -> largest collection
|
| -// constant prefixes for each collection
|
| -// start + end partial suffixes
|
| -// * these are appended to the prefix for the collection to narrow the scan
|
| -// range. The collection scans will start at >= prefix+start suffix, and
|
| -// will scan until > prefix+end suffix (excluding this larger row).
|
| -// produces hits (a suffix+value) when all collections contain the same prefix
|
| -// and the same suffix
|
| -// * a hit value will have
|
| -// - a []byte suffix which is the matching suffix of every collection.
|
| -// The caller already knows the prefix for each collection. The caller
|
| -// may need to parse the suffix to separate the sort order(s) from the
|
| -// Key.
|
| -// dedup
|
| -// on each hit, the row values are concatenated and compared to the
|
| -// concatenated prefixes+startsuffix. If it's <=, it's skipped.
|
| +type iterDefinition struct {
|
| + // The collection to iterate over
|
| + c *memCollection
|
| +
|
| + // The prefix to always assert for every row. A nil prefix matches every row.
|
| + prefix []byte
|
| +
|
| + // prefixLen is the number of prefix bytes that the caller cares about. It
|
| + // may be <= len(prefix). When doing a multiIterator, this number will be used
|
| + // to determine the amount of suffix to transfer accross iterators. This is
|
| + // used specifically when using builtin indexes to service ancestor queries.
|
| + // The builtin index represents the ancestor key with prefix bytes, but in a
|
| + // multiIterator context, it wants the entire key to be included in the
|
| + // suffix.
|
| + prefixLen int
|
| +
|
| + // The start cursor. It's appended to prefix to find the first row.
|
| + start []byte
|
| +
|
| + // The end cursor. It's appended to prefix to find the last row (which is not
|
| + // included in the interation result). If this is nil, then there's no end
|
| + // except the natural end of the collection.
|
| + end []byte
|
| +}
|
| +
|
| +func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) {
|
| + if len(defs) == 0 {
|
| + return
|
| + }
|
| +
|
| + ts := make([]*iterator, len(defs))
|
| + prefixLens := make([]int, len(defs))
|
| + for i, def := range defs {
|
| + // bind i so that the defer below doesn't get goofed by the loop variable
|
| + i := i
|
| + ts[i] = def.mkIter()
|
| + prefixLens[i] = def.prefixLen
|
| + defer ts[i].stop()
|
| + }
|
| +
|
| + suffix := []byte(nil)
|
| + skip := -1
|
| +
|
| + for {
|
| + stop := false
|
| + restart := false
|
| +
|
| + for idx, it := range ts {
|
| + if skip >= 0 && skip == idx {
|
| + continue
|
| + }
|
| + def := defs[idx]
|
| +
|
| + it.next(bjoin(def.prefix, suffix), func(itm *gkvlite.Item) {
|
| + if itm == nil {
|
| + // we hit the end of an iterator, we're now done with the whole
|
| + // query.
|
| + stop = true
|
| + return
|
| + }
|
| +
|
| + sfxRO := itm.Key[prefixLens[idx]:]
|
| +
|
| + if bytes.Compare(sfxRO, suffix) > 0 {
|
| + // this row has a higher suffix than anything we've seen before. Set
|
| + // ourself to be the skip, and resart this loop from the top.
|
| + suffix = append(suffix[:0], sfxRO...)
|
| + skip = idx
|
| + if idx != 0 {
|
| + // no point to restarting on the 0th index
|
| + restart = true
|
| + }
|
| + }
|
| + })
|
| + if stop || restart {
|
| + break
|
| + }
|
| + }
|
| + if stop {
|
| + return
|
| + }
|
| + if restart {
|
| + continue
|
| + }
|
| +
|
| + if !cb(suffix) {
|
| + return
|
| + }
|
| + suffix = nil
|
| + skip = -1
|
| + }
|
| +}
|
|
|
| type cmd struct {
|
| targ []byte
|
| @@ -39,18 +113,28 @@ type iterator struct {
|
| stopper sync.Once
|
|
|
| stopped bool
|
| - prev []byte
|
| ch chan<- *cmd
|
| }
|
|
|
| -func newIterable(coll *memCollection, end []byte) *iterator {
|
| +func (def *iterDefinition) mkIter() *iterator {
|
| cmdChan := make(chan *cmd)
|
| ret := &iterator{
|
| ch: cmdChan,
|
| }
|
|
|
| + prefix := def.prefix
|
| + collection := def.c
|
| +
|
| + // convert the suffixes from the iterDefinition into full rows for the
|
| + // underlying storage.
|
| + start := bjoin(prefix, def.start)
|
| +
|
| + end := []byte(nil)
|
| + if def.end != nil {
|
| + end = bjoin(prefix, def.end)
|
| + }
|
| +
|
| go func() {
|
| - defer ret.stop()
|
| c := (*cmd)(nil)
|
| ensureCmd := func() bool {
|
| if c == nil {
|
| @@ -61,32 +145,40 @@ func newIterable(coll *memCollection, end []byte) *iterator {
|
| }
|
| return true
|
| }
|
| + if ensureCmd() {
|
| + if bytes.Compare(c.targ, start) < 0 {
|
| + c.targ = start
|
| + }
|
| + }
|
| +
|
| + defer ret.stop()
|
| for {
|
| if !ensureCmd() {
|
| return
|
| }
|
| - previous := c.targ
|
| - needCallback := true
|
| - coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
|
| + terminalCallback := true
|
| + collection.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
|
| if !ensureCmd() {
|
| return false
|
| }
|
| - if !bytes.Equal(previous, c.targ) {
|
| - // we need to start a new ascention function
|
| - needCallback = false
|
| + if bytes.Compare(i.Key, c.targ) < 0 {
|
| + // we need to start a new ascension function
|
| + terminalCallback = false
|
| + return false
|
| + }
|
| + if !bytes.HasPrefix(i.Key, prefix) {
|
| + // we're no longer in prefix, terminate
|
| return false
|
| }
|
| if end != nil && bytes.Compare(i.Key, end) >= 0 {
|
| - // we hit our cap
|
| - ret.stop()
|
| + // we hit our cap, terminate.
|
| return false
|
| }
|
| c.cb(i)
|
| - previous = i.Key
|
| c = nil
|
| return true
|
| })
|
| - if c != nil && needCallback {
|
| + if terminalCallback && ensureCmd() {
|
| c.cb(nil)
|
| c = nil
|
| }
|
| @@ -109,23 +201,14 @@ func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) {
|
| return
|
| }
|
|
|
| - if targ == nil {
|
| - targ = t.prev
|
| - if targ == nil {
|
| - targ = []byte{}
|
| - }
|
| - }
|
| -
|
| waiter := make(chan struct{})
|
| t.ch <- &cmd{targ, func(i *gkvlite.Item) {
|
| defer close(waiter)
|
|
|
| - cb(i)
|
| if i == nil {
|
| t.stop()
|
| - } else {
|
| - t.prev = i.Key
|
| }
|
| + cb(i)
|
| }}
|
| <-waiter
|
| }
|
|
|