OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 | |
11 "github.com/luci/gkvlite" | |
12 ) | |
13 | |
14 // TODO(riannucci): Add a multi-iterator which allows: | |
15 // multiple collections | |
16 // sort smallest collection -> largest collection | |
17 // constant prefixes for each collection | |
18 // start + end partial suffixes | |
19 // * these are appended to the prefix for the collection to narrow the scan | |
20 // range. The collection scans will start at >= prefix+start suffix, and | |
21 // will scan until > prefix+end suffix (excluding this larger row). | |
22 // produces hits (a suffix+value) when all collections contain the same prefix | |
23 // and the same suffix | |
24 // * a hit value will have | |
25 // - a []byte suffix which is the matching suffix of every collection. | |
26 // The caller already knows the prefix for each collection. The caller | |
27 // may need to parse the suffix to separate the sort order(s) from the | |
28 // Key. | |
29 // dedup | |
30 // on each hit, the row values are concatenated and compared to the | |
31 // concatenated prefixes+startsuffix. If it's <=, it's skipped. | |
32 | |
33 type cmd struct { | |
34 targ []byte | |
35 cb func(*gkvlite.Item) | |
36 } | |
37 | |
38 type iterator struct { | |
39 stopped bool | |
40 prev []byte | |
41 ch chan<- *cmd | |
42 } | |
43 | |
44 func newIterable(coll *memCollection, end []byte) *iterator { | |
45 cmdChan := make(chan *cmd) | |
46 ret := &iterator{ | |
47 ch: cmdChan, | |
48 } | |
49 | |
50 go func() { | |
51 defer ret.stop() | |
52 c := (*cmd)(nil) | |
53 ensureCmd := func() bool { | |
54 if c == nil { | |
55 c = <-cmdChan | |
56 if c == nil { // stop() | |
57 return false | |
58 } | |
59 } | |
60 return true | |
61 } | |
62 for { | |
63 if !ensureCmd() { | |
64 return | |
65 } | |
66 previous := c.targ | |
67 needCallback := true | |
68 coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item ) bool { | |
69 if !ensureCmd() { | |
70 return false | |
71 } | |
72 if !bytes.Equal(previous, c.targ) { | |
73 // we need to start a new ascention func tion | |
74 needCallback = false | |
75 return false | |
76 } | |
77 if end != nil && bytes.Compare(i.Key, end) >= 0 { | |
78 // we hit our cap | |
79 ret.stop() | |
80 return false | |
81 } | |
82 c.cb(i) | |
83 previous = i.Key | |
84 c = nil | |
85 return true | |
86 }) | |
87 if c != nil && needCallback { | |
88 c.cb(nil) | |
89 c = nil | |
90 } | |
91 } | |
92 }() | |
93 | |
94 return ret | |
95 } | |
96 | |
97 func (t *iterator) stop() { | |
98 t.stopped = true | |
99 defer func() { recover() }() | |
dnj (Google)
2015/08/15 02:00:31
If your goal is to close the channel once, wrap it
iannucci
2015/08/15 02:10:10
Done.
| |
100 close(t.ch) | |
101 } | |
102 | |
103 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { | |
104 if t.stopped { | |
105 cb(nil) | |
106 return | |
107 } | |
108 | |
109 if targ == nil { | |
110 targ = t.prev | |
111 if targ == nil { | |
112 targ = []byte{} | |
113 } | |
114 } else if bytes.Compare(t.prev, targ) >= 0 { | |
dnj (Google)
2015/08/15 02:00:31
Is this a necessary check? Could be expensive doin
iannucci
2015/08/15 02:10:10
it's only when you skip
| |
115 panic(fmt.Errorf( | |
116 "iterator was instructed to go backwards!? %q -> %q", | |
117 string(t.prev), string(targ))) | |
118 } | |
119 | |
120 waiter := make(chan struct{}) | |
dnj (Google)
2015/08/15 02:10:39
Idea:
itemC := make(chan *gkvlite.Item)
t.ch <- &
| |
121 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | |
122 cb(i) | |
123 if i == nil { | |
124 t.stop() | |
125 } else { | |
126 t.prev = i.Key | |
127 } | |
128 close(waiter) | |
dnj (Google)
2015/08/15 02:00:31
Defer this close, dawg.
iannucci
2015/08/15 02:10:10
Done.
| |
129 }} | |
130 <-waiter | |
131 } | |
OLD | NEW |