| Index: impl/memory/gkvlite_iter.go
|
| diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..aa0e5b3343f0bf9d31e1810717a6cdf1517be669
|
| --- /dev/null
|
| +++ b/impl/memory/gkvlite_iter.go
|
| @@ -0,0 +1,131 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package memory
|
| +
|
| +import (
|
| + "bytes"
|
| + "sync"
|
| +
|
| + "github.com/luci/gkvlite"
|
| +)
|
| +
|
| +// TODO(riannucci): Add a multi-iterator which allows:
|
| +// multiple collections
|
| +// sort smallest collection -> largest collection
|
| +// constant prefixes for each collection
|
| +// start + end partial suffixes
|
| +// * these are appended to the prefix for the collection to narrow the scan
|
| +// range. The collection scans will start at >= prefix+start suffix, and
|
| +// will scan until > prefix+end suffix (excluding this larger row).
|
| +// produces hits (a suffix+value) when all collections contain the same prefix
|
| +// and the same suffix
|
| +// * a hit value will have
|
| +// - a []byte suffix which is the matching suffix of every collection.
|
| +// The caller already knows the prefix for each collection. The caller
|
| +// may need to parse the suffix to separate the sort order(s) from the
|
| +// Key.
|
| +// dedup
|
| +// on each hit, the row values are concatenated and compared to the
|
| +// concatenated prefixes+startsuffix. If it's <=, it's skipped.
|
| +
|
| +type cmd struct {
|
| + targ []byte
|
| + cb func(*gkvlite.Item)
|
| +}
|
| +
|
| +type iterator struct {
|
| + stopper sync.Once
|
| +
|
| + stopped bool
|
| + prev []byte
|
| + ch chan<- *cmd
|
| +}
|
| +
|
| +func newIterable(coll *memCollection, end []byte) *iterator {
|
| + cmdChan := make(chan *cmd)
|
| + ret := &iterator{
|
| + ch: cmdChan,
|
| + }
|
| +
|
| + go func() {
|
| + defer ret.stop()
|
| + c := (*cmd)(nil)
|
| + ensureCmd := func() bool {
|
| + if c == nil {
|
| + c = <-cmdChan
|
| + if c == nil { // stop()
|
| + return false
|
| + }
|
| + }
|
| + return true
|
| + }
|
| + for {
|
| + if !ensureCmd() {
|
| + return
|
| + }
|
| + previous := c.targ
|
| + needCallback := true
|
| + coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool {
|
| + if !ensureCmd() {
|
| + return false
|
| + }
|
| + if !bytes.Equal(previous, c.targ) {
|
| + // we need to start a new ascention function
|
| + needCallback = false
|
| + return false
|
| + }
|
| + if end != nil && bytes.Compare(i.Key, end) >= 0 {
|
| + // we hit our cap
|
| + ret.stop()
|
| + return false
|
| + }
|
| + c.cb(i)
|
| + previous = i.Key
|
| + c = nil
|
| + return true
|
| + })
|
| + if c != nil && needCallback {
|
| + c.cb(nil)
|
| + c = nil
|
| + }
|
| + }
|
| + }()
|
| +
|
| + return ret
|
| +}
|
| +
|
| +func (t *iterator) stop() {
|
| + t.stopper.Do(func() {
|
| + t.stopped = true
|
| + close(t.ch)
|
| + })
|
| +}
|
| +
|
| +func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) {
|
| + if t.stopped {
|
| + cb(nil)
|
| + return
|
| + }
|
| +
|
| + if targ == nil {
|
| + targ = t.prev
|
| + if targ == nil {
|
| + targ = []byte{}
|
| + }
|
| + }
|
| +
|
| + waiter := make(chan struct{})
|
| + t.ch <- &cmd{targ, func(i *gkvlite.Item) {
|
| + defer close(waiter)
|
| +
|
| + cb(i)
|
| + if i == nil {
|
| + t.stop()
|
| + } else {
|
| + t.prev = i.Key
|
| + }
|
| + }}
|
| + <-waiter
|
| +}
|
|
|