OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "errors" | 9 "errors" |
10 "fmt" | 10 "fmt" |
11 | 11 |
12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
14 "github.com/luci/luci-go/common/data/cmpbin" | 14 "github.com/luci/luci-go/common/data/cmpbin" |
15 "github.com/luci/luci-go/common/data/stringset" | 15 "github.com/luci/luci-go/common/data/stringset" |
16 ) | 16 ) |
17 | 17 |
18 // errQueryEntrySkipped is a sentinel error returned by queryStrategy handlers | |
19 // if they decided not to process the entry in their handle(). | |
20 var errQueryEntrySkipped = errors.New("query result skipped") | |
21 | |
18 type queryStrategy interface { | 22 type queryStrategy interface { |
19 // handle applies the strategy to the embedded user callback. | 23 // handle applies the strategy to the embedded user callback. |
20 // - rawData is the slice of encoded Properties from the index row | 24 // - rawData is the slice of encoded Properties from the index row |
21 // (correctly de-inverted). | 25 // (correctly de-inverted). |
22 // - decodedProps is the slice of decoded Properties from the index ro w | 26 // - decodedProps is the slice of decoded Properties from the index ro w |
23 // - key is the decoded Key from the index row (the last item in rawDa ta and | 27 // - key is the decoded Key from the index row (the last item in rawDa ta and |
24 // decodedProps) | 28 // decodedProps) |
25 // - gc is the getCursor function to be passed to the user's callback | 29 // - gc is the getCursor function to be passed to the user's callback |
26 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun c() (ds.Cursor, error)) error | 30 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, |
31 » » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error | |
27 } | 32 } |
28 | 33 |
29 type projectionLookup struct { | 34 type projectionLookup struct { |
30 suffixIndex int | 35 suffixIndex int |
31 propertyName string | 36 propertyName string |
32 } | 37 } |
33 | 38 |
34 type projectionStrategy struct { | 39 type projectionStrategy struct { |
35 cb ds.RawRunCB | |
36 | |
37 project []projectionLookup | 40 project []projectionLookup |
38 distinct stringset.Set | 41 distinct stringset.Set |
39 } | 42 } |
40 | 43 |
41 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun CB) queryStrategy { | 44 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery) queryStrateg y { |
42 proj := fq.Project() | 45 proj := fq.Project() |
43 | 46 |
44 projectionLookups := make([]projectionLookup, len(proj)) | 47 projectionLookups := make([]projectionLookup, len(proj)) |
45 for i, prop := range proj { | 48 for i, prop := range proj { |
46 projectionLookups[i].propertyName = prop | 49 projectionLookups[i].propertyName = prop |
47 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?") | 50 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?") |
48 for j, col := range rq.suffixFormat { | 51 for j, col := range rq.suffixFormat { |
49 if col.Property == prop { | 52 if col.Property == prop { |
50 projectionLookups[i].suffixIndex = j | 53 projectionLookups[i].suffixIndex = j |
51 lookupErr = nil | 54 lookupErr = nil |
52 break | 55 break |
53 } | 56 } |
54 } | 57 } |
55 impossible(lookupErr) | 58 impossible(lookupErr) |
56 } | 59 } |
57 » ret := &projectionStrategy{cb: cb, project: projectionLookups} | 60 » ret := &projectionStrategy{project: projectionLookups} |
58 if fq.Distinct() { | 61 if fq.Distinct() { |
59 ret.distinct = stringset.New(0) | 62 ret.distinct = stringset.New(0) |
60 } | 63 } |
61 return ret | 64 return ret |
62 } | 65 } |
63 | 66 |
64 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key, gc func() (ds.Cursor, error)) error { | 67 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key, |
68 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { | |
69 | |
65 projectedRaw := [][]byte(nil) | 70 projectedRaw := [][]byte(nil) |
66 if s.distinct != nil { | 71 if s.distinct != nil { |
67 projectedRaw = make([][]byte, len(decodedProps)) | 72 projectedRaw = make([][]byte, len(decodedProps)) |
68 } | 73 } |
69 pmap := make(ds.PropertyMap, len(s.project)) | 74 pmap := make(ds.PropertyMap, len(s.project)) |
70 for i, p := range s.project { | 75 for i, p := range s.project { |
71 if s.distinct != nil { | 76 if s.distinct != nil { |
72 projectedRaw[i] = rawData[p.suffixIndex] | 77 projectedRaw[i] = rawData[p.suffixIndex] |
73 } | 78 } |
74 pmap[p.propertyName] = decodedProps[p.suffixIndex] | 79 pmap[p.propertyName] = decodedProps[p.suffixIndex] |
75 } | 80 } |
76 if s.distinct != nil { | 81 if s.distinct != nil { |
77 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { | 82 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { |
78 return nil | 83 return nil |
79 } | 84 } |
80 } | 85 } |
81 » return s.cb(key, pmap, gc) | 86 » return cb(key, pmap, gc) |
82 } | 87 } |
83 | 88 |
84 type keysOnlyStrategy struct { | 89 type keysOnlyStrategy struct { |
85 cb ds.RawRunCB | |
86 | |
87 dedup stringset.Set | 90 dedup stringset.Set |
88 } | 91 } |
89 | 92 |
90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , gc func() (ds.Cursor, error)) error { | 93 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , |
94 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { | |
95 | |
91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { | 96 if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
92 return nil | 97 return nil |
93 } | 98 } |
94 » return s.cb(key, nil, gc) | 99 » return cb(key, nil, gc) |
95 } | 100 } |
96 | 101 |
97 type normalStrategy struct { | 102 type normalStrategy struct { |
98 » cb ds.RawRunCB | 103 » kc ds.KeyContext |
99 | 104 » isCount bool |
100 » kc ds.KeyContext | 105 » head memCollection |
101 » head memCollection | 106 » dedup stringset.Set |
102 » dedup stringset.Set | |
103 } | 107 } |
104 | 108 |
105 func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStr ategy { | 109 func newNormalStrategy(kc ds.KeyContext, isCount bool, head memStore) queryStrat egy { |
106 coll := head.GetCollection("ents:" + kc.Namespace) | 110 coll := head.GetCollection("ents:" + kc.Namespace) |
107 if coll == nil { | 111 if coll == nil { |
108 return nil | 112 return nil |
109 } | 113 } |
110 » return &normalStrategy{cb, kc, coll, stringset.New(0)} | 114 » return &normalStrategy{kc, isCount, coll, stringset.New(0)} |
111 } | 115 } |
112 | 116 |
113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { | 117 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
118 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { | |
119 | |
114 rawKey := rawData[len(rawData)-1] | 120 rawKey := rawData[len(rawData)-1] |
115 if !s.dedup.Add(string(rawKey)) { | 121 if !s.dedup.Add(string(rawKey)) { |
116 return nil | 122 return nil |
117 } | 123 } |
118 | 124 |
119 rawEnt := s.head.Get(rawKey) | 125 rawEnt := s.head.Get(rawKey) |
120 if rawEnt == nil { | 126 if rawEnt == nil { |
121 // entity doesn't exist at head | 127 // entity doesn't exist at head |
122 » » return nil | 128 » » return errQueryEntrySkipped |
dnj
2016/11/11 08:37:53
This is the source of the problem. If the callback
| |
123 } | 129 } |
124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, s.kc) | |
125 memoryCorruption(err) | |
126 | 130 |
127 » return s.cb(key, pm, gc) | 131 » var pm ds.PropertyMap |
132 » if !s.isCount { | |
133 » » var err error | |
134 » » if pm, err = serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc); err != nil { | |
135 » » » memoryCorruption(err) | |
136 » » } | |
137 » } | |
138 | |
139 » return cb(key, pm, gc) | |
128 } | 140 } |
129 | 141 |
130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy { | 142 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, isCount bool, he ad memStore) queryStrategy { |
131 if fq.KeysOnly() { | 143 if fq.KeysOnly() { |
132 » » return &keysOnlyStrategy{cb, stringset.New(0)} | 144 » » return &keysOnlyStrategy{stringset.New(0)} |
133 } | 145 } |
134 if len(fq.Project()) > 0 { | 146 if len(fq.Project()) > 0 { |
135 » » return newProjectionStrategy(fq, rq, cb) | 147 » » return newProjectionStrategy(fq, rq) |
136 } | 148 } |
137 » return newNormalStrategy(rq.kc, cb, head) | 149 » return newNormalStrategy(rq.kc, isCount, head) |
138 } | 150 } |
139 | 151 |
140 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) { | 152 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) { |
141 buf := serialize.Invertible(bytes.NewBuffer(suffix)) | 153 buf := serialize.Invertible(bytes.NewBuffer(suffix)) |
142 decoded = make([]ds.Property, len(suffixFormat)) | 154 decoded = make([]ds.Property, len(suffixFormat)) |
143 raw = make([][]byte, len(suffixFormat)) | 155 raw = make([][]byte, len(suffixFormat)) |
144 | 156 |
145 err := error(nil) | 157 err := error(nil) |
146 kc := ds.MkKeyContext(aid, ns) | 158 kc := ds.MkKeyContext(aid, ns) |
147 for i := range decoded { | 159 for i := range decoded { |
(...skipping 11 matching lines...) Expand all Loading... | |
159 suffix = suffix[offset:] | 171 suffix = suffix[offset:] |
160 if needInvert { | 172 if needInvert { |
161 raw[i] = serialize.Invert(raw[i]) | 173 raw[i] = serialize.Invert(raw[i]) |
162 } | 174 } |
163 } | 175 } |
164 | 176 |
165 return | 177 return |
166 } | 178 } |
167 | 179 |
168 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m emStore) (ret int64, err error) { | 180 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m emStore) (ret int64, err error) { |
169 » if len(fq.Project()) == 0 && !fq.KeysOnly() { | 181 » err = executeQuery(fq, kc, isTxn, true, idx, head, func(_ *ds.Key, _ ds. PropertyMap, _ ds.CursorCB) error { |
dnj
2016/11/11 08:37:53
So maybe I'm reading things wrong, but I think Cou
| |
170 » » fq, err = fq.Original().KeysOnly(true).Finalize() | |
171 » » if err != nil { | |
172 » » » return | |
173 » » } | |
174 » } | |
175 » err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.Proper tyMap, _ ds.CursorCB) error { | |
176 ret++ | 182 ret++ |
177 return nil | 183 return nil |
178 }) | 184 }) |
179 return | 185 return |
180 } | 186 } |
181 | 187 |
182 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor e, cb ds.RawRunCB) error { | 188 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor e, cb ds.RawRunCB) error { |
183 // these objects have no properties, so any filters on properties cause an | 189 // these objects have no properties, so any filters on properties cause an |
184 // empty result. | 190 // empty result. |
185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 { | 191 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 { |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
218 } else { | 224 } else { |
219 k = kc.MakeKey("__namespace__", ns) | 225 k = kc.MakeKey("__namespace__", ns) |
220 } | 226 } |
221 if err := cb(k, nil, cursFn); err != nil { | 227 if err := cb(k, nil, cursFn); err != nil { |
222 return err | 228 return err |
223 } | 229 } |
224 } | 230 } |
225 return nil | 231 return nil |
226 } | 232 } |
227 | 233 |
228 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore, cb ds.RawRunCB) error { | 234 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn, isCount bool, idx, head memStore, cb ds.RawRunCB) error { |
229 rq, err := reduce(fq, kc, isTxn) | 235 rq, err := reduce(fq, kc, isTxn) |
230 if err == ds.ErrNullQuery { | 236 if err == ds.ErrNullQuery { |
231 return nil | 237 return nil |
232 } | 238 } |
233 if err != nil { | 239 if err != nil { |
234 return err | 240 return err |
235 } | 241 } |
236 | 242 |
237 if rq.kind == "__namespace__" { | 243 if rq.kind == "__namespace__" { |
238 return executeNamespaceQuery(fq, kc, head, cb) | 244 return executeNamespaceQuery(fq, kc, head, cb) |
239 } | 245 } |
240 | 246 |
241 idxs, err := getIndexes(rq, idx) | 247 idxs, err := getIndexes(rq, idx) |
242 if err == ds.ErrNullQuery { | 248 if err == ds.ErrNullQuery { |
243 return nil | 249 return nil |
244 } | 250 } |
245 if err != nil { | 251 if err != nil { |
246 return err | 252 return err |
247 } | 253 } |
248 | 254 |
249 » strategy := pickQueryStrategy(fq, rq, cb, head) | 255 » strategy := pickQueryStrategy(fq, rq, isCount, head) |
250 if strategy == nil { | 256 if strategy == nil { |
251 // e.g. the normalStrategy found that there were NO entities in the current | 257 // e.g. the normalStrategy found that there were NO entities in the current |
252 // namespace. | 258 // namespace. |
253 return nil | 259 return nil |
254 } | 260 } |
255 | 261 |
256 offset, _ := fq.Offset() | 262 offset, _ := fq.Offset() |
257 limit, hasLimit := fq.Limit() | 263 limit, hasLimit := fq.Limit() |
258 | 264 |
259 cursorPrefix := []byte(nil) | 265 cursorPrefix := []byte(nil) |
(...skipping 10 matching lines...) Expand all Loading... | |
270 } | 276 } |
271 cursorPrefix = buf.Bytes() | 277 cursorPrefix = buf.Bytes() |
272 } | 278 } |
273 // TODO(riannucci): Do we need to decrement suffix inste ad of increment | 279 // TODO(riannucci): Do we need to decrement suffix inste ad of increment |
274 // if we're sorting by __key__ DESCENDING? | 280 // if we're sorting by __key__ DESCENDING? |
275 return queryCursor(serialize.Join(cursorPrefix, incremen t(suffix))), nil | 281 return queryCursor(serialize.Join(cursorPrefix, incremen t(suffix))), nil |
276 } | 282 } |
277 } | 283 } |
278 | 284 |
279 return multiIterate(idxs, func(suffix []byte) error { | 285 return multiIterate(idxs, func(suffix []byte) error { |
286 iterCB := cb | |
287 | |
288 // Apply our limit/offset. | |
280 if offset > 0 { | 289 if offset > 0 { |
281 » » » offset-- | 290 » » » // Don't actually callback for this entry. |
282 » » » return nil | 291 » » » iterCB = func(*ds.Key, ds.PropertyMap, ds.CursorCB) erro r { return nil } |
283 } | 292 } |
284 if hasLimit { | 293 if hasLimit { |
285 if limit <= 0 { | 294 if limit <= 0 { |
286 return ds.Stop | 295 return ds.Stop |
287 } | 296 } |
288 limit-- | |
289 } | 297 } |
290 | 298 |
291 rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq. suffixFormat, suffix, -1) | 299 rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq. suffixFormat, suffix, -1) |
292 | 300 |
293 keyProp := decodedProps[len(decodedProps)-1] | 301 keyProp := decodedProps[len(decodedProps)-1] |
294 if keyProp.Type() != ds.PTKey { | 302 if keyProp.Type() != ds.PTKey { |
295 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) | 303 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) |
296 } | 304 } |
297 | 305 |
298 » » return strategy.handle( | 306 » » err := strategy.handle( |
299 rawData, decodedProps, keyProp.Value().(*ds.Key), | 307 rawData, decodedProps, keyProp.Value().(*ds.Key), |
300 » » » getCursorFn(suffix)) | 308 » » » iterCB, getCursorFn(suffix)) |
309 | |
310 » » if err == errQueryEntrySkipped { | |
311 » » » return nil | |
312 » » } | |
313 | |
314 » » // We processed this entry; advance our limit/offset. | |
315 » » if offset > 0 { | |
316 » » » offset-- | |
317 » » } else if hasLimit { | |
318 » » » limit-- | |
319 » » } | |
320 » » return err | |
301 }) | 321 }) |
302 } | 322 } |
OLD | NEW |