Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(86)

Side by Side Diff: impl/memory/gkvlite_iter.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: stringSet everywhere Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/error_markers.go ('k') | impl/memory/gkvlite_iter_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sync" 9 "sync"
10 10
11 "github.com/luci/gkvlite" 11 "github.com/luci/gkvlite"
12 ) 12 )
13 13
14 // TODO(riannucci): Add a multi-iterator which allows: 14 type iterDefinition struct {
15 // multiple collections 15 » // The collection to iterate over
16 // sort smallest collection -> largest collection 16 » c *memCollection
17 // constant prefixes for each collection 17
18 // start + end partial suffixes 18 » // The prefix to always assert for every row. A nil prefix matches every row.
19 // * these are appended to the prefix for the collection to narrow the scan 19 » prefix []byte
20 // range. The collection scans will start at >= prefix+start suffix, and 20
21 // will scan until > prefix+end suffix (excluding this larger row). 21 » // prefixLen is the number of prefix bytes that the caller cares about. It
22 // produces hits (a suffix+value) when all collections contain the same prefix 22 » // may be <= len(prefix). When doing a multiIterator, this number will b e used
23 // and the same suffix 23 » // to determine the amount of suffix to transfer accross iterators. This is
24 // * a hit value will have 24 » // used specifically when using builtin indexes to service ancestor quer ies.
25 // - a []byte suffix which is the matching suffix of every collection. 25 » // The builtin index represents the ancestor key with prefix bytes, but in a
26 // The caller already knows the prefix for each collection. The caller 26 » // multiIterator context, it wants the entire key to be included in the
27 // may need to parse the suffix to separate the sort order(s) from the 27 » // suffix.
28 // Key. 28 » prefixLen int
29 // dedup 29
30 // on each hit, the row values are concatenated and compared to the 30 » // The start cursor. It's appended to prefix to find the first row.
31 //» » concatenated prefixes+startsuffix. If it's <=, it's skipped. 31 » start []byte
32
33 » // The end cursor. It's appended to prefix to find the last row (which i s not
34 » // included in the interation result). If this is nil, then there's no e nd
35 » // except the natural end of the collection.
36 » end []byte
37 }
38
39 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) bool) {
40 » if len(defs) == 0 {
41 » » return
42 » }
43
44 » ts := make([]*iterator, len(defs))
45 » prefixLens := make([]int, len(defs))
46 » for i, def := range defs {
47 » » // bind i so that the defer below doesn't get goofed by the loop variable
48 » » i := i
49 » » ts[i] = def.mkIter()
50 » » prefixLens[i] = def.prefixLen
51 » » defer ts[i].stop()
52 » }
53
54 » suffix := []byte(nil)
55 » skip := -1
56
57 » for {
58 » » stop := false
59 » » restart := false
60
61 » » for idx, it := range ts {
62 » » » if skip >= 0 && skip == idx {
63 » » » » continue
64 » » » }
65 » » » def := defs[idx]
66
67 » » » it.next(bjoin(def.prefix, suffix), func(itm *gkvlite.Ite m) {
68 » » » » if itm == nil {
69 » » » » » // we hit the end of an iterator, we're now done with the whole
70 » » » » » // query.
71 » » » » » stop = true
72 » » » » » return
73 » » » » }
74
75 » » » » sfxRO := itm.Key[prefixLens[idx]:]
76
77 » » » » if bytes.Compare(sfxRO, suffix) > 0 {
78 » » » » » // this row has a higher suffix than any thing we've seen before. Set
79 » » » » » // ourself to be the skip, and resart th is loop from the top.
80 » » » » » suffix = append(suffix[:0], sfxRO...)
81 » » » » » skip = idx
82 » » » » » if idx != 0 {
83 » » » » » » // no point to restarting on the 0th index
84 » » » » » » restart = true
85 » » » » » }
86 » » » » }
87 » » » })
88 » » » if stop || restart {
89 » » » » break
90 » » » }
91 » » }
92 » » if stop {
93 » » » return
94 » » }
95 » » if restart {
96 » » » continue
97 » » }
98
99 » » if !cb(suffix) {
100 » » » return
101 » » }
102 » » suffix = nil
103 » » skip = -1
104 » }
105 }
32 106
33 type cmd struct { 107 type cmd struct {
34 targ []byte 108 targ []byte
35 cb func(*gkvlite.Item) 109 cb func(*gkvlite.Item)
36 } 110 }
37 111
38 type iterator struct { 112 type iterator struct {
39 stopper sync.Once 113 stopper sync.Once
40 114
41 stopped bool 115 stopped bool
42 prev []byte
43 ch chan<- *cmd 116 ch chan<- *cmd
44 } 117 }
45 118
46 func newIterable(coll *memCollection, end []byte) *iterator { 119 func (def *iterDefinition) mkIter() *iterator {
47 cmdChan := make(chan *cmd) 120 cmdChan := make(chan *cmd)
48 ret := &iterator{ 121 ret := &iterator{
49 ch: cmdChan, 122 ch: cmdChan,
50 } 123 }
51 124
125 prefix := def.prefix
126 collection := def.c
127
128 // convert the suffixes from the iterDefinition into full rows for the
129 // underlying storage.
130 start := bjoin(prefix, def.start)
131
132 end := []byte(nil)
133 if def.end != nil {
134 end = bjoin(prefix, def.end)
135 }
136
52 go func() { 137 go func() {
53 defer ret.stop()
54 c := (*cmd)(nil) 138 c := (*cmd)(nil)
55 ensureCmd := func() bool { 139 ensureCmd := func() bool {
56 if c == nil { 140 if c == nil {
57 c = <-cmdChan 141 c = <-cmdChan
58 if c == nil { // stop() 142 if c == nil { // stop()
59 return false 143 return false
60 } 144 }
61 } 145 }
62 return true 146 return true
63 } 147 }
148 if ensureCmd() {
149 if bytes.Compare(c.targ, start) < 0 {
150 c.targ = start
151 }
152 }
153
154 defer ret.stop()
64 for { 155 for {
65 if !ensureCmd() { 156 if !ensureCmd() {
66 return 157 return
67 } 158 }
68 » » » previous := c.targ 159 » » » terminalCallback := true
69 » » » needCallback := true 160 » » » collection.VisitItemsAscend(c.targ, true, func(i *gkvlit e.Item) bool {
70 » » » coll.VisitItemsAscend(c.targ, true, func(i *gkvlite.Item ) bool {
71 if !ensureCmd() { 161 if !ensureCmd() {
72 return false 162 return false
73 } 163 }
74 » » » » if !bytes.Equal(previous, c.targ) { 164 » » » » if bytes.Compare(i.Key, c.targ) < 0 {
75 » » » » » // we need to start a new ascention func tion 165 » » » » » // we need to start a new ascension func tion
76 » » » » » needCallback = false 166 » » » » » terminalCallback = false
167 » » » » » return false
168 » » » » }
169 » » » » if !bytes.HasPrefix(i.Key, prefix) {
170 » » » » » // we're no longer in prefix, terminate
77 return false 171 return false
78 } 172 }
79 if end != nil && bytes.Compare(i.Key, end) >= 0 { 173 if end != nil && bytes.Compare(i.Key, end) >= 0 {
80 » » » » » // we hit our cap 174 » » » » » // we hit our cap, terminate.
81 » » » » » ret.stop()
82 return false 175 return false
83 } 176 }
84 c.cb(i) 177 c.cb(i)
85 previous = i.Key
86 c = nil 178 c = nil
87 return true 179 return true
88 }) 180 })
89 » » » if c != nil && needCallback { 181 » » » if terminalCallback && ensureCmd() {
90 c.cb(nil) 182 c.cb(nil)
91 c = nil 183 c = nil
92 } 184 }
93 } 185 }
94 }() 186 }()
95 187
96 return ret 188 return ret
97 } 189 }
98 190
99 func (t *iterator) stop() { 191 func (t *iterator) stop() {
100 t.stopper.Do(func() { 192 t.stopper.Do(func() {
101 t.stopped = true 193 t.stopped = true
102 close(t.ch) 194 close(t.ch)
103 }) 195 })
104 } 196 }
105 197
106 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { 198 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) {
107 if t.stopped { 199 if t.stopped {
108 cb(nil) 200 cb(nil)
109 return 201 return
110 } 202 }
111 203
112 if targ == nil {
113 targ = t.prev
114 if targ == nil {
115 targ = []byte{}
116 }
117 }
118
119 waiter := make(chan struct{}) 204 waiter := make(chan struct{})
120 t.ch <- &cmd{targ, func(i *gkvlite.Item) { 205 t.ch <- &cmd{targ, func(i *gkvlite.Item) {
121 defer close(waiter) 206 defer close(waiter)
122 207
123 cb(i)
124 if i == nil { 208 if i == nil {
125 t.stop() 209 t.stop()
126 } else {
127 t.prev = i.Key
128 } 210 }
211 cb(i)
129 }} 212 }}
130 <-waiter 213 <-waiter
131 } 214 }
OLDNEW
« no previous file with comments | « impl/memory/error_markers.go ('k') | impl/memory/gkvlite_iter_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698