| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package memory | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "sync" | |
| 10 | |
| 11 "github.com/luci/gae/service/datastore/serialize" | |
| 12 "github.com/luci/gkvlite" | |
| 13 ) | |
| 14 | |
| 15 type iterDefinition struct { | |
| 16 // The collection to iterate over | |
| 17 c memCollection | |
| 18 | |
| 19 // The prefix to always assert for every row. A nil prefix matches every
row. | |
| 20 prefix []byte | |
| 21 | |
| 22 // prefixLen is the number of prefix bytes that the caller cares about.
It | |
| 23 // may be <= len(prefix). When doing a multiIterator, this number will b
e used | |
| 24 // to determine the amount of suffix to transfer accross iterators. This
is | |
| 25 // used specifically when using builtin indexes to service ancestor quer
ies. | |
| 26 // The builtin index represents the ancestor key with prefix bytes, but
in a | |
| 27 // multiIterator context, it wants the entire key to be included in the | |
| 28 // suffix. | |
| 29 prefixLen int | |
| 30 | |
| 31 // The start cursor. It's appended to prefix to find the first row. | |
| 32 start []byte | |
| 33 | |
| 34 // The end cursor. It's appended to prefix to find the last row (which i
s not | |
| 35 // included in the interation result). If this is nil, then there's no e
nd | |
| 36 // except the natural end of the collection. | |
| 37 end []byte | |
| 38 } | |
| 39 | |
| 40 func multiIterate(defs []*iterDefinition, cb func(suffix []byte) error) error { | |
| 41 if len(defs) == 0 { | |
| 42 return nil | |
| 43 } | |
| 44 | |
| 45 ts := make([]*iterator, len(defs)) | |
| 46 prefixLens := make([]int, len(defs)) | |
| 47 for i, def := range defs { | |
| 48 // bind i so that the defer below doesn't get goofed by the loop
variable | |
| 49 i := i | |
| 50 ts[i] = def.mkIter() | |
| 51 prefixLens[i] = def.prefixLen | |
| 52 defer ts[i].stop() | |
| 53 } | |
| 54 | |
| 55 suffix := []byte(nil) | |
| 56 skip := -1 | |
| 57 | |
| 58 for { | |
| 59 stop := false | |
| 60 restart := false | |
| 61 | |
| 62 for idx, it := range ts { | |
| 63 if skip >= 0 && skip == idx { | |
| 64 continue | |
| 65 } | |
| 66 def := defs[idx] | |
| 67 | |
| 68 pfxLen := prefixLens[idx] | |
| 69 it.next(serialize.Join(def.prefix[:pfxLen], suffix), fun
c(itm *gkvlite.Item) { | |
| 70 if itm == nil { | |
| 71 // we hit the end of an iterator, we're
now done with the whole | |
| 72 // query. | |
| 73 stop = true | |
| 74 return | |
| 75 } | |
| 76 | |
| 77 sfxRO := itm.Key[pfxLen:] | |
| 78 | |
| 79 if bytes.Compare(sfxRO, suffix) > 0 { | |
| 80 // this row has a higher suffix than any
thing we've seen before. Set | |
| 81 // ourself to be the skip, and resart th
is loop from the top. | |
| 82 suffix = append(suffix[:0], sfxRO...) | |
| 83 skip = idx | |
| 84 if idx != 0 { | |
| 85 // no point to restarting on the
0th index | |
| 86 restart = true | |
| 87 } | |
| 88 } | |
| 89 }) | |
| 90 if stop || restart { | |
| 91 break | |
| 92 } | |
| 93 } | |
| 94 if stop { | |
| 95 return nil | |
| 96 } | |
| 97 if restart { | |
| 98 continue | |
| 99 } | |
| 100 | |
| 101 if err := cb(suffix); err != nil { | |
| 102 return err | |
| 103 } | |
| 104 suffix = nil | |
| 105 skip = -1 | |
| 106 } | |
| 107 } | |
| 108 | |
| 109 type cmd struct { | |
| 110 targ []byte | |
| 111 cb func(*gkvlite.Item) | |
| 112 } | |
| 113 | |
| 114 type iterator struct { | |
| 115 stopper sync.Once | |
| 116 | |
| 117 stopped bool | |
| 118 ch chan<- *cmd | |
| 119 } | |
| 120 | |
| 121 func (def *iterDefinition) mkIter() *iterator { | |
| 122 if !def.c.IsReadOnly() { | |
| 123 panic("attempting to make an iterator with r/w collection") | |
| 124 } | |
| 125 | |
| 126 cmdChan := make(chan *cmd) | |
| 127 ret := &iterator{ | |
| 128 ch: cmdChan, | |
| 129 } | |
| 130 | |
| 131 prefix := def.prefix | |
| 132 collection := def.c | |
| 133 | |
| 134 // convert the suffixes from the iterDefinition into full rows for the | |
| 135 // underlying storage. | |
| 136 start := serialize.Join(prefix, def.start) | |
| 137 | |
| 138 end := []byte(nil) | |
| 139 if def.end != nil { | |
| 140 end = serialize.Join(prefix, def.end) | |
| 141 } | |
| 142 | |
| 143 go func() { | |
| 144 c := (*cmd)(nil) | |
| 145 ensureCmd := func() bool { | |
| 146 if c == nil { | |
| 147 c = <-cmdChan | |
| 148 if c == nil { // stop() | |
| 149 return false | |
| 150 } | |
| 151 } | |
| 152 return true | |
| 153 } | |
| 154 if ensureCmd() { | |
| 155 if bytes.Compare(c.targ, start) < 0 { | |
| 156 c.targ = start | |
| 157 } | |
| 158 } | |
| 159 | |
| 160 defer ret.stop() | |
| 161 for { | |
| 162 if !ensureCmd() { | |
| 163 return | |
| 164 } | |
| 165 terminalCallback := true | |
| 166 collection.VisitItemsAscend(c.targ, true, func(i *gkvlit
e.Item) bool { | |
| 167 if !ensureCmd() { | |
| 168 return false | |
| 169 } | |
| 170 if bytes.Compare(i.Key, c.targ) < 0 { | |
| 171 // we need to start a new ascension func
tion | |
| 172 terminalCallback = false | |
| 173 return false | |
| 174 } | |
| 175 if !bytes.HasPrefix(i.Key, prefix) { | |
| 176 // we're no longer in prefix, terminate | |
| 177 return false | |
| 178 } | |
| 179 if end != nil && bytes.Compare(i.Key, end) >= 0
{ | |
| 180 // we hit our cap, terminate. | |
| 181 return false | |
| 182 } | |
| 183 c.cb(i) | |
| 184 c = nil | |
| 185 return true | |
| 186 }) | |
| 187 if terminalCallback && ensureCmd() { | |
| 188 c.cb(nil) | |
| 189 c = nil | |
| 190 } | |
| 191 } | |
| 192 }() | |
| 193 | |
| 194 return ret | |
| 195 } | |
| 196 | |
| 197 func (t *iterator) stop() { | |
| 198 t.stopper.Do(func() { | |
| 199 t.stopped = true | |
| 200 close(t.ch) | |
| 201 }) | |
| 202 } | |
| 203 | |
| 204 func (t *iterator) next(targ []byte, cb func(*gkvlite.Item)) { | |
| 205 if t.stopped { | |
| 206 cb(nil) | |
| 207 return | |
| 208 } | |
| 209 | |
| 210 waiter := make(chan struct{}) | |
| 211 t.ch <- &cmd{targ, func(i *gkvlite.Item) { | |
| 212 defer close(waiter) | |
| 213 | |
| 214 if i == nil { | |
| 215 t.stop() | |
| 216 } | |
| 217 cb(i) | |
| 218 }} | |
| 219 <-waiter | |
| 220 } | |
| OLD | NEW |