OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "sync" | 9 "sync" |
10 | 10 |
11 "github.com/luci/gkvlite" | 11 "github.com/luci/gkvlite" |
12 ) | 12 ) |
13 | 13 |
14 // TODO(riannucci): Add a multi-iterator which allows: | 14 type iterDefinition struct { |
15 // multiple collections | 15 » // The collection to iterate over |
16 // sort smallest collection -> largest collection | 16 » c *memCollection |
17 // constant prefixes for each collection | 17 |
18 // start + end partial suffixes | 18 » // The prefix to always assert for every row. A nil prefix matches every
row. |
19 // * these are appended to the prefix for the collection to narrow the scan | 19 » prefix []byte |
20 // range. The collection scans will start at >= prefix+start suffix, and | 20 |
21 // will scan until > prefix+end suffix (excluding this larger row). | 21 » // prefixLen is the number of prefix bytes that the caller cares about.
It |
22 // produces hits (a suffix+value) when all collections contain the same prefix | 22 » // may be <= len(prefix). When doing a multiIterator, this number will b
e used |
23 // and the same suffix | 23 » // to determine the amount of suffix to transfer accross iterators. This
is |
24 // * a hit value will have | 24 » // used specifically when using builtin indexes to service ancestor quer
ies. |
25 // - a []byte suffix which is the matching suffix of every collection. | 25 » // The builtin index represents the ancestor key with prefix bytes, but
in a |
26 // The caller already knows the prefix for each collection. The caller | 26 » // multiIterator context, it wants the entire key to be included in the |
27 // may need to parse the suffix to separate the sort order(s) from the | 27 » // suffix. |
28 // Key. | 28 » prefixLen int |
29 // dedup | 29 |
30 // on each hit, the row values are concatenated and compared to the | 30 » // The start cursor. It's appended to prefix to find the first row. |
31 //» » concatenated prefixes+startsuffix. If it's <=, it's skipped. | 31 » start []byte |
| 32 |
| 33 » // The end cursor. It's appended to prefix to find the last row (which i
s not |
| 34 » // included in the interation result). If this is nil, then there's no e
nd |
| 35 » // except the natural end of the collection. |
| 36 » end []byte |
| 37 } |
| 38 |
| 39 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) { |
| 40 » if len(defs) == 0 { |
| 41 » » return |
| 42 » } |
| 43 |
| 44 » ts := make([]*iterator, len(defs)) |
| 45 » prefixLens := make([]int, len(defs)) |
| 46 » for i, def := range defs { |
| 47 » » // bind i so that the defer below doesn't get goofed by the loop
variable |
| 48 » » i := i |
| 49 » » ts[i] = def.mkIter() |
| 50 » » prefixLens[i] = def.prefixLen |
| 51 » » defer ts[i].stop() |
| 52 » } |
| 53 |
| 54 » suffix := []byte(nil) |
| 55 » skip := -1 |
| 56 |
| 57 » for { |
| 58 » » stop := false |
| 59 » » restart := false |
| 60 |
| 61 » » for idx, it := range ts { |
| 62 » » » if skip >= 0 && skip == idx { |
| 63 » » » » continue |
| 64 » » » } |
| 65 » » » def := defs[idx] |
| 66 |
| 67 » » » it.next(bjoin(def.prefix, suffix), func(itm *gkvlite.Ite
m) { |
| 68 » » » » if itm == nil { |
| 69 » » » » » // we hit the end of an iterator, we're
now done with the whole |
| 70 » » » » » // query. |
| 71 » » » » » stop = true |
| 72 » » » » » return |
| 73 » » » » } |
| 74 |
| 75 » » » » sfxRO := itm.Key[prefixLens[idx]:] |
| 76 |
| 77 » » » » if bytes.Compare(sfxRO, suffix) > 0 { |
| 78 » » » » » // this row has a higher suffix than any
thing we've seen before. Set |
| 79 » » » » » // ourself to be the skip, and resart th
is loop from the top. |
| 80 » » » » » suffix = append(suffix[:0], sfxRO...) |
| 81 » » » » » skip = idx |
| 82 » » » » » if idx != 0 { |
| 83 » » » » » » // no point to restarting on the
0th index |
| 84 » » » » » » restart = true |
| 85 » » » » » } |
| 86 » » » » } |
| 87 » » » }) |
| 88 » » » if stop || restart { |
| 89 » » » » break |
| 90 » » » } |
| 91 » » } |
| 92 » » if stop { |
| 93 » » » return |
| 94 » » } |
| 95 » » if restart { |
| 96 » » » continue |
| 97 » » } |
| 98 |
| 99 » » if !cb(suffix) { |
| 100 » » » return |
| 101 » » } |
| 102 » » suffix = nil |
| 103 » » skip = -1 |
| 104 » } |
| 105 } |
32 | 106 |
33 type cmd struct { | 107 type cmd struct { |
34 targ []byte | 108 targ []byte |
35 cb func(*gkvlite.Item) | 109 cb func(*gkvlite.Item) |
36 } | 110 } |
37 | 111 |
38 type iterator struct { | 112 type iterator struct { |
39 stopper sync.Once | 113 stopper sync.Once |
40 | 114 |
41 stopped bool | 115 stopped bool |
42 prev []byte | |
43 ch chan<- *cmd | 116 ch chan<- *cmd |
44 } | 117 } |
45 | 118 |
46 func newIterable(coll *memCollection, end []byte) *iterator { | 119 func (def *iterDefinition) mkIter() *iterator { |
47 cmdChan := make(chan *cmd) | 120 cmdChan := make(chan *cmd) |
48 ret := &iterator{ | 121 ret := &iterator{ |
49 ch: cmdChan, | 122 ch: cmdChan, |
50 } | 123 } |
51 | 124 |
| 125 prefix := def.prefix |
| 126 collection := def.c |
| 127 |
| 128 // convert the suffixes from the iterDefinition into full rows for the |
| 129 // underlying storage. |
| 130 start := bjoin(prefix, def.start) |
| 131 |
| 132 end := []byte(nil) |
| 133 if def.end != nil { |
| 134 end = bjoin(prefix, def.end) |
| 135 } |
| 136 |
52 go func() { | 137 go func() { |
53 defer ret.stop() | |
54 c := (*cmd)(nil) | 138 c := (*cmd)(nil) |
55 ensureCmd := func() bool { | 139 ensureCmd := func() bool { |
56 if c == nil { | 140 if c == nil { |
57 c = <-cmdChan | 141 c = <-cmdChan |
58 if c == nil { // stop() | 142 if c == nil { // stop() |
59 return false | 143 return false |
60 } | 144 } |
61 } | 145 } |
62 return true | 146 return true |
63 } | 147 } |
| 148 if ensureCmd() { |
| 149 if bytes.Compare(c.targ, start) < 0 { |
| 150 c.targ = start |
| 151 } |
| 152 } |
| 153 |
| 154 defer ret.stop() |
64 for { | 155 for { |
65 if !ensureCmd() { | 156 if !ensureCmd() { |
66 return | 157 return |
67 } | 158 } |
68 » » » previous := c.targ | 159 » » » terminalCallback := true |
69 » » » needCallback := true | 160 » » » collection.VisitItemsAscend(c.targ, true, func(i *gkvlit
e.Item) bool { |
70 » » » coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item
) bool { | |
71 if !ensureCmd() { | 161 if !ensureCmd() { |
72 return false | 162 return false |
73 } | 163 } |
74 » » » » if !bytes.Equal(previous, c.targ) { | 164 » » » » if bytes.Compare(i.Key, c.targ) < 0 { |
75 » » » » » // we need to start a new ascention func
tion | 165 » » » » » // we need to start a new ascension func
tion |
76 » » » » » needCallback = false | 166 » » » » » terminalCallback = false |
| 167 » » » » » return false |
| 168 » » » » } |
| 169 » » » » if !bytes.HasPrefix(i.Key, prefix) { |
| 170 » » » » » // we're no longer in prefix, terminate |
77 return false | 171 return false |
78 } | 172 } |
79 if end != nil && bytes.Compare(i.Key, end) >= 0
{ | 173 if end != nil && bytes.Compare(i.Key, end) >= 0
{ |
80 » » » » » // we hit our cap | 174 » » » » » // we hit our cap, terminate. |
81 » » » » » ret.stop() | |
82 return false | 175 return false |
83 } | 176 } |
84 c.cb(i) | 177 c.cb(i) |
85 previous = i.Key | |
86 c = nil | 178 c = nil |
87 return true | 179 return true |
88 }) | 180 }) |
89 » » » if c != nil && needCallback { | 181 » » » if terminalCallback && ensureCmd() { |
90 c.cb(nil) | 182 c.cb(nil) |
91 c = nil | 183 c = nil |
92 } | 184 } |
93 } | 185 } |
94 }() | 186 }() |
95 | 187 |
96 return ret | 188 return ret |
97 } | 189 } |
98 | 190 |
99 func (t *iterator) stop() { | 191 func (t *iterator) stop() { |
100 t.stopper.Do(func() { | 192 t.stopper.Do(func() { |
101 t.stopped = true | 193 t.stopped = true |
102 close(t.ch) | 194 close(t.ch) |
103 }) | 195 }) |
104 } | 196 } |
105 | 197 |
106 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { | 198 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { |
107 if t.stopped { | 199 if t.stopped { |
108 cb(nil) | 200 cb(nil) |
109 return | 201 return |
110 } | 202 } |
111 | 203 |
112 if targ == nil { | |
113 targ = t.prev | |
114 if targ == nil { | |
115 targ = []byte{} | |
116 } | |
117 } | |
118 | |
119 waiter := make(chan struct{}) | 204 waiter := make(chan struct{}) |
120 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | 205 t.ch <- &cmd{targ, func(i *gkvlite.Item) { |
121 defer close(waiter) | 206 defer close(waiter) |
122 | 207 |
123 cb(i) | |
124 if i == nil { | 208 if i == nil { |
125 t.stop() | 209 t.stop() |
126 } else { | |
127 t.prev = i.Key | |
128 } | 210 } |
| 211 cb(i) |
129 }} | 212 }} |
130 <-waiter | 213 <-waiter |
131 } | 214 } |
OLD | NEW |