Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(640)

Unified Diff: impl/memory/gkvlite_iter.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: stringSet everywhere Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/memory/error_markers.go ('k') | impl/memory/gkvlite_iter_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/memory/gkvlite_iter.go
diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go
index aa0e5b3343f0bf9d31e1810717a6cdf1517be669..417815da3574a1b4f1e1cbcf350a92e6e39cffc3 100644
--- a/impl/memory/gkvlite_iter.go
+++ b/impl/memory/gkvlite_iter.go
@@ -11,24 +11,98 @@ import (
"github.com/luci/gkvlite"
)
-// TODO(riannucci): Add a multi-iterator which allows:
-// multiple collections
-// sort smallest collection -> largest collection
-// constant prefixes for each collection
-// start + end partial suffixes
-// * these are appended to the prefix for the collection to narrow the scan
-// range. The collection scans will start at >= prefix+start suffix, and
-// will scan until > prefix+end suffix (excluding this larger row).
-// produces hits (a suffix+value) when all collections contain the same prefix
-// and the same suffix
-// * a hit value will have
-// - a []byte suffix which is the matching suffix of every collection.
-// The caller already knows the prefix for each collection. The caller
-// may need to parse the suffix to separate the sort order(s) from the
-// Key.
-// dedup
-// on each hit, the row values are concatenated and compared to the
-// concatenated prefixes+startsuffix. If it's <=, it's skipped.
+type iterDefinition struct {
+ // The collection to iterate over
+ c *memCollection
+
+ // The prefix to always assert for every row. A nil prefix matches every row.
+ prefix []byte
+
+ // prefixLen is the number of prefix bytes that the caller cares about. It
+ // may be <= len(prefix). When doing a multiIterator, this number will be used
+ // to determine the amount of suffix to transfer accross iterators. This is
+ // used specifically when using builtin indexes to service ancestor queries.
+ // The builtin index represents the ancestor key with prefix bytes, but in a
+ // multiIterator context, it wants the entire key to be included in the
+ // suffix.
+ prefixLen int
+
+ // The start cursor. It's appended to prefix to find the first row.
+ start []byte
+
+ // The end cursor. It's appended to prefix to find the last row (which is not
+ // included in the interation result). If this is nil, then there's no end
+ // except the natural end of the collection.
+ end []byte
+}
+
+func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) {
+ if len(defs) == 0 {
+ return
+ }
+
+ ts := make([]*iterator, len(defs))
+ prefixLens := make([]int, len(defs))
+ for i, def := range defs {
+ // bind i so that the defer below doesn't get goofed by the loop variable
+ i := i
+ ts[i] = def.mkIter()
+ prefixLens[i] = def.prefixLen
+ defer ts[i].stop()
+ }
+
+ suffix := []byte(nil)
+ skip := -1
+
+ for {
+ stop := false
+ restart := false
+
+ for idx, it := range ts {
+ if skip >= 0 && skip == idx {
+ continue
+ }
+ def := defs[idx]
+
+ it.next(bjoin(def.prefix, suffix), func(itm *gkvlite.Item) {
+ if itm == nil {
+ // we hit the end of an iterator, we're now done with the whole
+ // query.
+ stop = true
+ return
+ }
+
+ sfxRO := itm.Key[prefixLens[idx]:]
+
+ if bytes.Compare(sfxRO, suffix) > 0 {
+ // this row has a higher suffix than anything we've seen before. Set
+ // ourself to be the skip, and resart this loop from the top.
+ suffix = append(suffix[:0], sfxRO...)
+ skip = idx
+ if idx != 0 {
+ // no point to restarting on the 0th index
+ restart = true
+ }
+ }
+ })
+ if stop || restart {
+ break
+ }
+ }
+ if stop {
+ return
+ }
+ if restart {
+ continue
+ }
+
+ if !cb(suffix) {
+ return
+ }
+ suffix = nil
+ skip = -1
+ }
+}
type cmd struct {
targ []byte
@@ -39,18 +113,28 @@ type iterator struct {
stopper sync.Once
stopped bool
- prev []byte
ch chan<- *cmd
}
-func newIterable(coll *memCollection, end []byte) *iterator {
+func (def *iterDefinition) mkIter() *iterator {
cmdChan := make(chan *cmd)
ret := &iterator{
ch: cmdChan,
}
+ prefix := def.prefix
+ collection := def.c
+
+ // convert the suffixes from the iterDefinition into full rows for the
+ // underlying storage.
+ start := bjoin(prefix, def.start)
+
+ end := []byte(nil)
+ if def.end != nil {
+ end = bjoin(prefix, def.end)
+ }
+
go func() {
- defer ret.stop()
c := (*cmd)(nil)
ensureCmd := func() bool {
if c == nil {
@@ -61,32 +145,40 @@ func newIterable(coll *memCollection, end []byte) *iterator {
}
return true
}
+ if ensureCmd() {
+ if bytes.Compare(c.targ, start) < 0 {
+ c.targ = start
+ }
+ }
+
+ defer ret.stop()
for {
if !ensureCmd() {
return
}
- previous := c.targ
- needCallback := true
- coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
+ terminalCallback := true
+ collection.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
if !ensureCmd() {
return false
}
- if !bytes.Equal(previous, c.targ) {
- // we need to start a new ascention function
- needCallback = false
+ if bytes.Compare(i.Key, c.targ) < 0 {
+ // we need to start a new ascension function
+ terminalCallback = false
+ return false
+ }
+ if !bytes.HasPrefix(i.Key, prefix) {
+ // we're no longer in prefix, terminate
return false
}
if end != nil && bytes.Compare(i.Key, end) >= 0 {
- // we hit our cap
- ret.stop()
+ // we hit our cap, terminate.
return false
}
c.cb(i)
- previous = i.Key
c = nil
return true
})
- if c != nil && needCallback {
+ if terminalCallback && ensureCmd() {
c.cb(nil)
c = nil
}
@@ -109,23 +201,14 @@ func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) {
return
}
- if targ == nil {
- targ = t.prev
- if targ == nil {
- targ = []byte{}
- }
- }
-
waiter := make(chan struct{})
t.ch <- &cmd{targ, func(i *gkvlite.Item) {
defer close(waiter)
- cb(i)
if i == nil {
t.stop()
- } else {
- t.prev = i.Key
}
+ cb(i)
}}
<-waiter
}
« no previous file with comments | « impl/memory/error_markers.go ('k') | impl/memory/gkvlite_iter_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698