Index: impl/memory/gkvlite_iter.go |
diff --git a/impl/memory/gkvlite_iter.go b/impl/memory/gkvlite_iter.go |
deleted file mode 100644 |
index fbfdc30629a8b4d6abda5575ceda445ec7cf20a9..0000000000000000000000000000000000000000 |
--- a/impl/memory/gkvlite_iter.go |
+++ /dev/null |
@@ -1,220 +0,0 @@ |
-// Copyright 2015 The LUCI Authors. All rights reserved. |
-// Use of this source code is governed under the Apache License, Version 2.0 |
-// that can be found in the LICENSE file. |
- |
-package memory |
- |
-import ( |
- "bytes" |
- "sync" |
- |
- "github.com/luci/gae/service/datastore/serialize" |
- "github.com/luci/gkvlite" |
-) |
- |
-type iterDefinition struct { |
- // The collection to iterate over |
- c memCollection |
- |
- // The prefix to always assert for every row. A nil prefix matches every row. |
- prefix []byte |
- |
- // prefixLen is the number of prefix bytes that the caller cares about. It |
- // may be <= len(prefix). When doing a multiIterator, this number will be used |
- // to determine the amount of suffix to transfer accross iterators. This is |
- // used specifically when using builtin indexes to service ancestor queries. |
- // The builtin index represents the ancestor key with prefix bytes, but in a |
- // multiIterator context, it wants the entire key to be included in the |
- // suffix. |
- prefixLen int |
- |
- // The start cursor. It's appended to prefix to find the first row. |
- start []byte |
- |
- // The end cursor. It's appended to prefix to find the last row (which is not |
- // included in the interation result). If this is nil, then there's no end |
- // except the natural end of the collection. |
- end []byte |
-} |
- |
-func multiIterate(defs []*iterDefinition, cb func(suffix []byte) error) error { |
- if len(defs) == 0 { |
- return nil |
- } |
- |
- ts := make([]*iterator, len(defs)) |
- prefixLens := make([]int, len(defs)) |
- for i, def := range defs { |
- // bind i so that the defer below doesn't get goofed by the loop variable |
- i := i |
- ts[i] = def.mkIter() |
- prefixLens[i] = def.prefixLen |
- defer ts[i].stop() |
- } |
- |
- suffix := []byte(nil) |
- skip := -1 |
- |
- for { |
- stop := false |
- restart := false |
- |
- for idx, it := range ts { |
- if skip >= 0 && skip == idx { |
- continue |
- } |
- def := defs[idx] |
- |
- pfxLen := prefixLens[idx] |
- it.next(serialize.Join(def.prefix[:pfxLen], suffix), func(itm *gkvlite.Item) { |
- if itm == nil { |
- // we hit the end of an iterator, we're now done with the whole |
- // query. |
- stop = true |
- return |
- } |
- |
- sfxRO := itm.Key[pfxLen:] |
- |
- if bytes.Compare(sfxRO, suffix) > 0 { |
- // this row has a higher suffix than anything we've seen before. Set |
- // ourself to be the skip, and resart this loop from the top. |
- suffix = append(suffix[:0], sfxRO...) |
- skip = idx |
- if idx != 0 { |
- // no point to restarting on the 0th index |
- restart = true |
- } |
- } |
- }) |
- if stop || restart { |
- break |
- } |
- } |
- if stop { |
- return nil |
- } |
- if restart { |
- continue |
- } |
- |
- if err := cb(suffix); err != nil { |
- return err |
- } |
- suffix = nil |
- skip = -1 |
- } |
-} |
- |
-type cmd struct { |
- targ []byte |
- cb func(*gkvlite.Item) |
-} |
- |
-type iterator struct { |
- stopper sync.Once |
- |
- stopped bool |
- ch chan<- *cmd |
-} |
- |
-func (def *iterDefinition) mkIter() *iterator { |
- if !def.c.IsReadOnly() { |
- panic("attempting to make an iterator with r/w collection") |
- } |
- |
- cmdChan := make(chan *cmd) |
- ret := &iterator{ |
- ch: cmdChan, |
- } |
- |
- prefix := def.prefix |
- collection := def.c |
- |
- // convert the suffixes from the iterDefinition into full rows for the |
- // underlying storage. |
- start := serialize.Join(prefix, def.start) |
- |
- end := []byte(nil) |
- if def.end != nil { |
- end = serialize.Join(prefix, def.end) |
- } |
- |
- go func() { |
- c := (*cmd)(nil) |
- ensureCmd := func() bool { |
- if c == nil { |
- c = <-cmdChan |
- if c == nil { // stop() |
- return false |
- } |
- } |
- return true |
- } |
- if ensureCmd() { |
- if bytes.Compare(c.targ, start) < 0 { |
- c.targ = start |
- } |
- } |
- |
- defer ret.stop() |
- for { |
- if !ensureCmd() { |
- return |
- } |
- terminalCallback := true |
- collection.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item) bool { |
- if !ensureCmd() { |
- return false |
- } |
- if bytes.Compare(i.Key, c.targ) < 0 { |
- // we need to start a new ascension function |
- terminalCallback = false |
- return false |
- } |
- if !bytes.HasPrefix(i.Key, prefix) { |
- // we're no longer in prefix, terminate |
- return false |
- } |
- if end != nil && bytes.Compare(i.Key, end) >= 0 { |
- // we hit our cap, terminate. |
- return false |
- } |
- c.cb(i) |
- c = nil |
- return true |
- }) |
- if terminalCallback && ensureCmd() { |
- c.cb(nil) |
- c = nil |
- } |
- } |
- }() |
- |
- return ret |
-} |
- |
-func (t *iterator) stop() { |
- t.stopper.Do(func() { |
- t.stopped = true |
- close(t.ch) |
- }) |
-} |
- |
-func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { |
- if t.stopped { |
- cb(nil) |
- return |
- } |
- |
- waiter := make(chan struct{}) |
- t.ch <- &cmd{targ, func(i *gkvlite.Item) { |
- defer close(waiter) |
- |
- if i == nil { |
- t.stop() |
- } |
- cb(i) |
- }} |
- <-waiter |
-} |