OLD | NEW |
| (Empty) |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "bytes" | |
9 "sync" | |
10 | |
11 "github.com/luci/gae/service/datastore/serialize" | |
12 "github.com/luci/gkvlite" | |
13 ) | |
14 | |
15 type iterDefinition struct { | |
16 // The collection to iterate over | |
17 c memCollection | |
18 | |
19 // The prefix to always assert for every row. A nil prefix matches every
row. | |
20 prefix []byte | |
21 | |
22 // prefixLen is the number of prefix bytes that the caller cares about.
It | |
23 // may be <= len(prefix). When doing a multiIterator, this number will b
e used | |
24 // to determine the amount of suffix to transfer accross iterators. This
is | |
25 // used specifically when using builtin indexes to service ancestor quer
ies. | |
26 // The builtin index represents the ancestor key with prefix bytes, but
in a | |
27 // multiIterator context, it wants the entire key to be included in the | |
28 // suffix. | |
29 prefixLen int | |
30 | |
31 // The start cursor. It's appended to prefix to find the first row. | |
32 start []byte | |
33 | |
34 // The end cursor. It's appended to prefix to find the last row (which i
s not | |
35 // included in the interation result). If this is nil, then there's no e
nd | |
36 // except the natural end of the collection. | |
37 end []byte | |
38 } | |
39 | |
40 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) error) error { | |
41 if len(defs) == 0 { | |
42 return nil | |
43 } | |
44 | |
45 ts := make([]*iterator, len(defs)) | |
46 prefixLens := make([]int, len(defs)) | |
47 for i, def := range defs { | |
48 // bind i so that the defer below doesn't get goofed by the loop
variable | |
49 i := i | |
50 ts[i] = def.mkIter() | |
51 prefixLens[i] = def.prefixLen | |
52 defer ts[i].stop() | |
53 } | |
54 | |
55 suffix := []byte(nil) | |
56 skip := -1 | |
57 | |
58 for { | |
59 stop := false | |
60 restart := false | |
61 | |
62 for idx, it := range ts { | |
63 if skip >= 0 && skip == idx { | |
64 continue | |
65 } | |
66 def := defs[idx] | |
67 | |
68 pfxLen := prefixLens[idx] | |
69 it.next(serialize.Join(def.prefix[:pfxLen], suffix), fun
c(itm *gkvlite.Item) { | |
70 if itm == nil { | |
71 // we hit the end of an iterator, we're
now done with the whole | |
72 // query. | |
73 stop = true | |
74 return | |
75 } | |
76 | |
77 sfxRO := itm.Key[pfxLen:] | |
78 | |
79 if bytes.Compare(sfxRO, suffix) > 0 { | |
80 // this row has a higher suffix than any
thing we've seen before. Set | |
81 // ourself to be the skip, and resart th
is loop from the top. | |
82 suffix = append(suffix[:0], sfxRO...) | |
83 skip = idx | |
84 if idx != 0 { | |
85 // no point to restarting on the
0th index | |
86 restart = true | |
87 } | |
88 } | |
89 }) | |
90 if stop || restart { | |
91 break | |
92 } | |
93 } | |
94 if stop { | |
95 return nil | |
96 } | |
97 if restart { | |
98 continue | |
99 } | |
100 | |
101 if err := cb(suffix); err != nil { | |
102 return err | |
103 } | |
104 suffix = nil | |
105 skip = -1 | |
106 } | |
107 } | |
108 | |
109 type cmd struct { | |
110 targ []byte | |
111 cb func(*gkvlite.Item) | |
112 } | |
113 | |
114 type iterator struct { | |
115 stopper sync.Once | |
116 | |
117 stopped bool | |
118 ch chan<- *cmd | |
119 } | |
120 | |
121 func (def *iterDefinition) mkIter() *iterator { | |
122 if !def.c.IsReadOnly() { | |
123 panic("attempting to make an iterator with r/w collection") | |
124 } | |
125 | |
126 cmdChan := make(chan *cmd) | |
127 ret := &iterator{ | |
128 ch: cmdChan, | |
129 } | |
130 | |
131 prefix := def.prefix | |
132 collection := def.c | |
133 | |
134 // convert the suffixes from the iterDefinition into full rows for the | |
135 // underlying storage. | |
136 start := serialize.Join(prefix, def.start) | |
137 | |
138 end := []byte(nil) | |
139 if def.end != nil { | |
140 end = serialize.Join(prefix, def.end) | |
141 } | |
142 | |
143 go func() { | |
144 c := (*cmd)(nil) | |
145 ensureCmd := func() bool { | |
146 if c == nil { | |
147 c = <-cmdChan | |
148 if c == nil { // stop() | |
149 return false | |
150 } | |
151 } | |
152 return true | |
153 } | |
154 if ensureCmd() { | |
155 if bytes.Compare(c.targ, start) < 0 { | |
156 c.targ = start | |
157 } | |
158 } | |
159 | |
160 defer ret.stop() | |
161 for { | |
162 if !ensureCmd() { | |
163 return | |
164 } | |
165 terminalCallback := true | |
166 collection.VisitItemsAscend(c.targ, true, func(i *gkvlit
e.Item) bool { | |
167 if !ensureCmd() { | |
168 return false | |
169 } | |
170 if bytes.Compare(i.Key, c.targ) < 0 { | |
171 // we need to start a new ascension func
tion | |
172 terminalCallback = false | |
173 return false | |
174 } | |
175 if !bytes.HasPrefix(i.Key, prefix) { | |
176 // we're no longer in prefix, terminate | |
177 return false | |
178 } | |
179 if end != nil && bytes.Compare(i.Key, end) >= 0
{ | |
180 // we hit our cap, terminate. | |
181 return false | |
182 } | |
183 c.cb(i) | |
184 c = nil | |
185 return true | |
186 }) | |
187 if terminalCallback && ensureCmd() { | |
188 c.cb(nil) | |
189 c = nil | |
190 } | |
191 } | |
192 }() | |
193 | |
194 return ret | |
195 } | |
196 | |
197 func (t *iterator) stop() { | |
198 t.stopper.Do(func() { | |
199 t.stopped = true | |
200 close(t.ch) | |
201 }) | |
202 } | |
203 | |
204 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { | |
205 if t.stopped { | |
206 cb(nil) | |
207 return | |
208 } | |
209 | |
210 waiter := make(chan struct{}) | |
211 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | |
212 defer close(waiter) | |
213 | |
214 if i == nil { | |
215 t.stop() | |
216 } | |
217 cb(i) | |
218 }} | |
219 <-waiter | |
220 } | |
OLD | NEW |