OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) error { | 90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key
, gc func() (ds.Cursor, error)) error { |
91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { | 91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
92 return nil | 92 return nil |
93 } | 93 } |
94 return s.cb(key, nil, gc) | 94 return s.cb(key, nil, gc) |
95 } | 95 } |
96 | 96 |
97 type normalStrategy struct { | 97 type normalStrategy struct { |
98 cb ds.RawRunCB | 98 cb ds.RawRunCB |
99 | 99 |
100 » aid string | 100 » kc ds.KeyContext |
101 » ns string | |
102 head memCollection | 101 head memCollection |
103 dedup stringset.Set | 102 dedup stringset.Set |
104 } | 103 } |
105 | 104 |
106 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head memStore) queryStrat
egy { | 105 func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStr
ategy { |
107 » coll := head.GetCollection("ents:" + ns) | 106 » coll := head.GetCollection("ents:" + kc.Namespace) |
108 if coll == nil { | 107 if coll == nil { |
109 return nil | 108 return nil |
110 } | 109 } |
111 » return &normalStrategy{cb, aid, ns, coll, stringset.New(0)} | 110 » return &normalStrategy{cb, kc, coll, stringset.New(0)} |
112 } | 111 } |
113 | 112 |
114 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) error { | 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
gc func() (ds.Cursor, error)) error { |
115 rawKey := rawData[len(rawData)-1] | 114 rawKey := rawData[len(rawData)-1] |
116 if !s.dedup.Add(string(rawKey)) { | 115 if !s.dedup.Add(string(rawKey)) { |
117 return nil | 116 return nil |
118 } | 117 } |
119 | 118 |
120 rawEnt := s.head.Get(rawKey) | 119 rawEnt := s.head.Get(rawKey) |
121 if rawEnt == nil { | 120 if rawEnt == nil { |
122 // entity doesn't exist at head | 121 // entity doesn't exist at head |
123 return nil | 122 return nil |
124 } | 123 } |
125 » pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.aid, s.ns) | 124 » pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.
WithoutContext, s.kc) |
126 memoryCorruption(err) | 125 memoryCorruption(err) |
127 | 126 |
128 return s.cb(key, pm, gc) | 127 return s.cb(key, pm, gc) |
129 } | 128 } |
130 | 129 |
131 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head memStore) queryStrategy { | 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB,
head memStore) queryStrategy { |
132 if fq.KeysOnly() { | 131 if fq.KeysOnly() { |
133 return &keysOnlyStrategy{cb, stringset.New(0)} | 132 return &keysOnlyStrategy{cb, stringset.New(0)} |
134 } | 133 } |
135 if len(fq.Project()) > 0 { | 134 if len(fq.Project()) > 0 { |
136 return newProjectionStrategy(fq, rq, cb) | 135 return newProjectionStrategy(fq, rq, cb) |
137 } | 136 } |
138 » return newNormalStrategy(rq.aid, rq.ns, cb, head) | 137 » return newNormalStrategy(rq.kc, cb, head) |
139 } | 138 } |
140 | 139 |
141 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c
ount int) (raw [][]byte, decoded []ds.Property) { | 140 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c
ount int) (raw [][]byte, decoded []ds.Property) { |
142 buf := serialize.Invertible(bytes.NewBuffer(suffix)) | 141 buf := serialize.Invertible(bytes.NewBuffer(suffix)) |
143 decoded = make([]ds.Property, len(suffixFormat)) | 142 decoded = make([]ds.Property, len(suffixFormat)) |
144 raw = make([][]byte, len(suffixFormat)) | 143 raw = make([][]byte, len(suffixFormat)) |
145 | 144 |
146 err := error(nil) | 145 err := error(nil) |
| 146 kc := ds.KeyContext{aid, ns} |
147 for i := range decoded { | 147 for i := range decoded { |
148 if count >= 0 && i >= count { | 148 if count >= 0 && i >= count { |
149 break | 149 break |
150 } | 150 } |
151 needInvert := suffixFormat[i].Descending | 151 needInvert := suffixFormat[i].Descending |
152 | 152 |
153 buf.SetInvert(needInvert) | 153 buf.SetInvert(needInvert) |
154 » » decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC
ontext, aid, ns) | 154 » » decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC
ontext, kc) |
155 memoryCorruption(err) | 155 memoryCorruption(err) |
156 | 156 |
157 offset := len(suffix) - buf.Len() | 157 offset := len(suffix) - buf.Len() |
158 raw[i] = suffix[:offset] | 158 raw[i] = suffix[:offset] |
159 suffix = suffix[offset:] | 159 suffix = suffix[offset:] |
160 if needInvert { | 160 if needInvert { |
161 raw[i] = serialize.Invert(raw[i]) | 161 raw[i] = serialize.Invert(raw[i]) |
162 } | 162 } |
163 } | 163 } |
164 | 164 |
165 return | 165 return |
166 } | 166 } |
167 | 167 |
168 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head mem
Store) (ret int64, err error) { | 168 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m
emStore) (ret int64, err error) { |
169 if len(fq.Project()) == 0 && !fq.KeysOnly() { | 169 if len(fq.Project()) == 0 && !fq.KeysOnly() { |
170 fq, err = fq.Original().KeysOnly(true).Finalize() | 170 fq, err = fq.Original().KeysOnly(true).Finalize() |
171 if err != nil { | 171 if err != nil { |
172 return | 172 return |
173 } | 173 } |
174 } | 174 } |
175 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P
ropertyMap, _ ds.CursorCB) error { | 175 » err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.Proper
tyMap, _ ds.CursorCB) error { |
176 ret++ | 176 ret++ |
177 return nil | 177 return nil |
178 }) | 178 }) |
179 return | 179 return |
180 } | 180 } |
181 | 181 |
182 func executeNamespaceQuery(fq *ds.FinalizedQuery, aid string, head memStore, cb
ds.RawRunCB) error { | 182 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor
e, cb ds.RawRunCB) error { |
183 // these objects have no properties, so any filters on properties cause
an | 183 // these objects have no properties, so any filters on properties cause
an |
184 // empty result. | 184 // empty result. |
185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders())
> 1 { | 185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders())
> 1 { |
186 return nil | 186 return nil |
187 } | 187 } |
188 if !(fq.IneqFilterProp() == "" || fq.IneqFilterProp() == "__key__") { | 188 if !(fq.IneqFilterProp() == "" || fq.IneqFilterProp() == "__key__") { |
189 return nil | 189 return nil |
190 } | 190 } |
191 limit, hasLimit := fq.Limit() | 191 limit, hasLimit := fq.Limit() |
192 offset, hasOffset := fq.Offset() | 192 offset, hasOffset := fq.Offset() |
193 start, end := fq.Bounds() | 193 start, end := fq.Bounds() |
194 | 194 |
195 cursErr := errors.New("cursors not supported for __namespace__ query") | 195 cursErr := errors.New("cursors not supported for __namespace__ query") |
196 cursFn := func() (ds.Cursor, error) { return nil, cursErr } | 196 cursFn := func() (ds.Cursor, error) { return nil, cursErr } |
197 if !(start == nil && end == nil) { | 197 if !(start == nil && end == nil) { |
198 return cursErr | 198 return cursErr |
199 } | 199 } |
| 200 |
| 201 kc.Namespace = "" |
200 for _, ns := range namespaces(head) { | 202 for _, ns := range namespaces(head) { |
201 if hasOffset && offset > 0 { | 203 if hasOffset && offset > 0 { |
202 offset-- | 204 offset-- |
203 continue | 205 continue |
204 } | 206 } |
205 if hasLimit { | 207 if hasLimit { |
206 if limit <= 0 { | 208 if limit <= 0 { |
207 return ds.Stop | 209 return ds.Stop |
208 } | 210 } |
209 limit-- | 211 limit-- |
210 } | 212 } |
211 k := (*ds.Key)(nil) | 213 k := (*ds.Key)(nil) |
212 if ns == "" { | 214 if ns == "" { |
213 // Datastore uses an id of 1 to indicate the default nam
espace in its | 215 // Datastore uses an id of 1 to indicate the default nam
espace in its |
214 // metadata API. | 216 // metadata API. |
215 » » » k = ds.MakeKey(aid, "", "__namespace__", 1) | 217 » » » k = kc.MakeKey("__namespace__", 1) |
216 } else { | 218 } else { |
217 » » » k = ds.MakeKey(aid, "", "__namespace__", ns) | 219 » » » k = kc.MakeKey("__namespace__", ns) |
218 } | 220 } |
219 if err := cb(k, nil, cursFn); err != nil { | 221 if err := cb(k, nil, cursFn); err != nil { |
220 return err | 222 return err |
221 } | 223 } |
222 } | 224 } |
223 return nil | 225 return nil |
224 } | 226 } |
225 | 227 |
226 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head m
emStore, cb ds.RawRunCB) error { | 228 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head
memStore, cb ds.RawRunCB) error { |
227 » rq, err := reduce(fq, aid, ns, isTxn) | 229 » rq, err := reduce(fq, kc, isTxn) |
228 if err == ds.ErrNullQuery { | 230 if err == ds.ErrNullQuery { |
229 return nil | 231 return nil |
230 } | 232 } |
231 if err != nil { | 233 if err != nil { |
232 return err | 234 return err |
233 } | 235 } |
234 | 236 |
235 if rq.kind == "__namespace__" { | 237 if rq.kind == "__namespace__" { |
236 » » return executeNamespaceQuery(fq, aid, head, cb) | 238 » » return executeNamespaceQuery(fq, kc, head, cb) |
237 } | 239 } |
238 | 240 |
239 idxs, err := getIndexes(rq, idx) | 241 idxs, err := getIndexes(rq, idx) |
240 if err == ds.ErrNullQuery { | 242 if err == ds.ErrNullQuery { |
241 return nil | 243 return nil |
242 } | 244 } |
243 if err != nil { | 245 if err != nil { |
244 return err | 246 return err |
245 } | 247 } |
246 | 248 |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
279 offset-- | 281 offset-- |
280 return nil | 282 return nil |
281 } | 283 } |
282 if hasLimit { | 284 if hasLimit { |
283 if limit <= 0 { | 285 if limit <= 0 { |
284 return ds.Stop | 286 return ds.Stop |
285 } | 287 } |
286 limit-- | 288 limit-- |
287 } | 289 } |
288 | 290 |
289 » » rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s
uffix, -1) | 291 » » rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq.
suffixFormat, suffix, -1) |
290 | 292 |
291 keyProp := decodedProps[len(decodedProps)-1] | 293 keyProp := decodedProps[len(decodedProps)-1] |
292 if keyProp.Type() != ds.PTKey { | 294 if keyProp.Type() != ds.PTKey { |
293 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) | 295 impossible(fmt.Errorf("decoded index row doesn't end wit
h a Key: %#v", keyProp)) |
294 } | 296 } |
295 | 297 |
296 return strategy.handle( | 298 return strategy.handle( |
297 rawData, decodedProps, keyProp.Value().(*ds.Key), | 299 rawData, decodedProps, keyProp.Value().(*ds.Key), |
298 getCursorFn(suffix)) | 300 getCursorFn(suffix)) |
299 }) | 301 }) |
300 } | 302 } |
OLD | NEW |