Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Side by Side Diff: impl/memory/gkvlite_iter.go

Issue 2604943002: impl/memory: Replace gkvlite with "treapstore". (Closed)
Patch Set: Comments. Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/doc.go ('k') | impl/memory/gkvlite_iter_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package memory
6
7 import (
8 "bytes"
9 "sync"
10
11 "github.com/luci/gae/service/datastore/serialize"
12 "github.com/luci/gkvlite"
13 )
14
15 type iterDefinition struct {
16 // The collection to iterate over
17 c memCollection
18
19 // The prefix to always assert for every row. A nil prefix matches every row.
20 prefix []byte
21
22 // prefixLen is the number of prefix bytes that the caller cares about. It
23 // may be <= len(prefix). When doing a multiIterator, this number will b e used
24 // to determine the amount of suffix to transfer accross iterators. This is
25 // used specifically when using builtin indexes to service ancestor quer ies.
26 // The builtin index represents the ancestor key with prefix bytes, but in a
27 // multiIterator context, it wants the entire key to be included in the
28 // suffix.
29 prefixLen int
30
31 // The start cursor. It's appended to prefix to find the first row.
32 start []byte
33
34 // The end cursor. It's appended to prefix to find the last row (which i s not
35 // included in the interation result). If this is nil, then there's no e nd
36 // except the natural end of the collection.
37 end []byte
38 }
39
40 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) error) error {
41 if len(defs) == 0 {
42 return nil
43 }
44
45 ts := make([]*iterator, len(defs))
46 prefixLens := make([]int, len(defs))
47 for i, def := range defs {
48 // bind i so that the defer below doesn't get goofed by the loop variable
49 i := i
50 ts[i] = def.mkIter()
51 prefixLens[i] = def.prefixLen
52 defer ts[i].stop()
53 }
54
55 suffix := []byte(nil)
56 skip := -1
57
58 for {
59 stop := false
60 restart := false
61
62 for idx, it := range ts {
63 if skip >= 0 && skip == idx {
64 continue
65 }
66 def := defs[idx]
67
68 pfxLen := prefixLens[idx]
69 it.next(serialize.Join(def.prefix[:pfxLen], suffix), fun c(itm *gkvlite.Item) {
70 if itm == nil {
71 // we hit the end of an iterator, we're now done with the whole
72 // query.
73 stop = true
74 return
75 }
76
77 sfxRO := itm.Key[pfxLen:]
78
79 if bytes.Compare(sfxRO, suffix) > 0 {
80 // this row has a higher suffix than any thing we've seen before. Set
81 // ourself to be the skip, and resart th is loop from the top.
82 suffix = append(suffix[:0], sfxRO...)
83 skip = idx
84 if idx != 0 {
85 // no point to restarting on the 0th index
86 restart = true
87 }
88 }
89 })
90 if stop || restart {
91 break
92 }
93 }
94 if stop {
95 return nil
96 }
97 if restart {
98 continue
99 }
100
101 if err := cb(suffix); err != nil {
102 return err
103 }
104 suffix = nil
105 skip = -1
106 }
107 }
108
109 type cmd struct {
110 targ []byte
111 cb func(*gkvlite.Item)
112 }
113
114 type iterator struct {
115 stopper sync.Once
116
117 stopped bool
118 ch chan<- *cmd
119 }
120
121 func (def *iterDefinition) mkIter() *iterator {
122 if !def.c.IsReadOnly() {
123 panic("attempting to make an iterator with r/w collection")
124 }
125
126 cmdChan := make(chan *cmd)
127 ret := &iterator{
128 ch: cmdChan,
129 }
130
131 prefix := def.prefix
132 collection := def.c
133
134 // convert the suffixes from the iterDefinition into full rows for the
135 // underlying storage.
136 start := serialize.Join(prefix, def.start)
137
138 end := []byte(nil)
139 if def.end != nil {
140 end = serialize.Join(prefix, def.end)
141 }
142
143 go func() {
144 c := (*cmd)(nil)
145 ensureCmd := func() bool {
146 if c == nil {
147 c = <-cmdChan
148 if c == nil { // stop()
149 return false
150 }
151 }
152 return true
153 }
154 if ensureCmd() {
155 if bytes.Compare(c.targ, start) < 0 {
156 c.targ = start
157 }
158 }
159
160 defer ret.stop()
161 for {
162 if !ensureCmd() {
163 return
164 }
165 terminalCallback := true
166 collection.VisitItemsAscend(c.targ, true, func(i *gkvlit e.Item) bool {
167 if !ensureCmd() {
168 return false
169 }
170 if bytes.Compare(i.Key, c.targ) < 0 {
171 // we need to start a new ascension func tion
172 terminalCallback = false
173 return false
174 }
175 if !bytes.HasPrefix(i.Key, prefix) {
176 // we're no longer in prefix, terminate
177 return false
178 }
179 if end != nil && bytes.Compare(i.Key, end) >= 0 {
180 // we hit our cap, terminate.
181 return false
182 }
183 c.cb(i)
184 c = nil
185 return true
186 })
187 if terminalCallback && ensureCmd() {
188 c.cb(nil)
189 c = nil
190 }
191 }
192 }()
193
194 return ret
195 }
196
197 func (t *iterator) stop() {
198 t.stopper.Do(func() {
199 t.stopped = true
200 close(t.ch)
201 })
202 }
203
204 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) {
205 if t.stopped {
206 cb(nil)
207 return
208 }
209
210 waiter := make(chan struct{})
211 t.ch <- &cmd{targ, func(i *gkvlite.Item) {
212 defer close(waiter)
213
214 if i == nil {
215 t.stop()
216 }
217 cb(i)
218 }}
219 <-waiter
220 }
OLDNEW
« no previous file with comments | « impl/memory/doc.go ('k') | impl/memory/gkvlite_iter_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698