Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1515)

Side by Side Diff: impl/memory/datastore_query_execution.go

Issue 2498463003: Fix a bug where deletions weren't updating the raw Kind index. (Closed)
Patch Set: impl/memory: query limits/offsets count deleted Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 11
12 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize" 13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/luci-go/common/data/cmpbin" 14 "github.com/luci/luci-go/common/data/cmpbin"
15 "github.com/luci/luci-go/common/data/stringset" 15 "github.com/luci/luci-go/common/data/stringset"
16 ) 16 )
17 17
18 // errQueryEntrySkipped is a sentinel error returned by queryStrategy handlers
19 // if they decided not to process the entry in their handle().
20 var errQueryEntrySkipped = errors.New("query result skipped")
21
18 type queryStrategy interface { 22 type queryStrategy interface {
19 // handle applies the strategy to the embedded user callback. 23 // handle applies the strategy to the embedded user callback.
20 // - rawData is the slice of encoded Properties from the index row 24 // - rawData is the slice of encoded Properties from the index row
21 // (correctly de-inverted). 25 // (correctly de-inverted).
22 // - decodedProps is the slice of decoded Properties from the index ro w 26 // - decodedProps is the slice of decoded Properties from the index ro w
23 // - key is the decoded Key from the index row (the last item in rawDa ta and 27 // - key is the decoded Key from the index row (the last item in rawDa ta and
24 // decodedProps) 28 // decodedProps)
25 // - gc is the getCursor function to be passed to the user's callback 29 // - gc is the getCursor function to be passed to the user's callback
26 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun c() (ds.Cursor, error)) error 30 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key,
31 » » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error
27 } 32 }
28 33
29 type projectionLookup struct { 34 type projectionLookup struct {
30 suffixIndex int 35 suffixIndex int
31 propertyName string 36 propertyName string
32 } 37 }
33 38
34 type projectionStrategy struct { 39 type projectionStrategy struct {
35 cb ds.RawRunCB
36
37 project []projectionLookup 40 project []projectionLookup
38 distinct stringset.Set 41 distinct stringset.Set
39 } 42 }
40 43
41 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun CB) queryStrategy { 44 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery) queryStrateg y {
42 proj := fq.Project() 45 proj := fq.Project()
43 46
44 projectionLookups := make([]projectionLookup, len(proj)) 47 projectionLookups := make([]projectionLookup, len(proj))
45 for i, prop := range proj { 48 for i, prop := range proj {
46 projectionLookups[i].propertyName = prop 49 projectionLookups[i].propertyName = prop
47 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?") 50 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?")
48 for j, col := range rq.suffixFormat { 51 for j, col := range rq.suffixFormat {
49 if col.Property == prop { 52 if col.Property == prop {
50 projectionLookups[i].suffixIndex = j 53 projectionLookups[i].suffixIndex = j
51 lookupErr = nil 54 lookupErr = nil
52 break 55 break
53 } 56 }
54 } 57 }
55 impossible(lookupErr) 58 impossible(lookupErr)
56 } 59 }
57 » ret := &projectionStrategy{cb: cb, project: projectionLookups} 60 » ret := &projectionStrategy{project: projectionLookups}
58 if fq.Distinct() { 61 if fq.Distinct() {
59 ret.distinct = stringset.New(0) 62 ret.distinct = stringset.New(0)
60 } 63 }
61 return ret 64 return ret
62 } 65 }
63 66
64 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key, gc func() (ds.Cursor, error)) error { 67 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key,
68 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error {
69
65 projectedRaw := [][]byte(nil) 70 projectedRaw := [][]byte(nil)
66 if s.distinct != nil { 71 if s.distinct != nil {
67 projectedRaw = make([][]byte, len(decodedProps)) 72 projectedRaw = make([][]byte, len(decodedProps))
68 } 73 }
69 pmap := make(ds.PropertyMap, len(s.project)) 74 pmap := make(ds.PropertyMap, len(s.project))
70 for i, p := range s.project { 75 for i, p := range s.project {
71 if s.distinct != nil { 76 if s.distinct != nil {
72 projectedRaw[i] = rawData[p.suffixIndex] 77 projectedRaw[i] = rawData[p.suffixIndex]
73 } 78 }
74 pmap[p.propertyName] = decodedProps[p.suffixIndex] 79 pmap[p.propertyName] = decodedProps[p.suffixIndex]
75 } 80 }
76 if s.distinct != nil { 81 if s.distinct != nil {
77 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { 82 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) {
78 return nil 83 return nil
79 } 84 }
80 } 85 }
81 » return s.cb(key, pmap, gc) 86 » return cb(key, pmap, gc)
82 } 87 }
83 88
84 type keysOnlyStrategy struct { 89 type keysOnlyStrategy struct {
85 cb ds.RawRunCB
86
87 dedup stringset.Set 90 dedup stringset.Set
88 } 91 }
89 92
90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , gc func() (ds.Cursor, error)) error { 93 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key ,
94 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error {
95
91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { 96 if !s.dedup.Add(string(rawData[len(rawData)-1])) {
92 return nil 97 return nil
93 } 98 }
94 » return s.cb(key, nil, gc) 99 » return cb(key, nil, gc)
95 } 100 }
96 101
97 type normalStrategy struct { 102 type normalStrategy struct {
98 » cb ds.RawRunCB 103 » kc ds.KeyContext
99 104 » isCount bool
100 » kc ds.KeyContext 105 » head memCollection
101 » head memCollection 106 » dedup stringset.Set
102 » dedup stringset.Set
103 } 107 }
104 108
105 func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStr ategy { 109 func newNormalStrategy(kc ds.KeyContext, isCount bool, head memStore) queryStrat egy {
106 coll := head.GetCollection("ents:" + kc.Namespace) 110 coll := head.GetCollection("ents:" + kc.Namespace)
107 if coll == nil { 111 if coll == nil {
108 return nil 112 return nil
109 } 113 }
110 » return &normalStrategy{cb, kc, coll, stringset.New(0)} 114 » return &normalStrategy{kc, isCount, coll, stringset.New(0)}
111 } 115 }
112 116
113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { 117 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
118 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error {
119
114 rawKey := rawData[len(rawData)-1] 120 rawKey := rawData[len(rawData)-1]
115 if !s.dedup.Add(string(rawKey)) { 121 if !s.dedup.Add(string(rawKey)) {
116 return nil 122 return nil
117 } 123 }
118 124
119 rawEnt := s.head.Get(rawKey) 125 rawEnt := s.head.Get(rawKey)
120 if rawEnt == nil { 126 if rawEnt == nil {
121 // entity doesn't exist at head 127 // entity doesn't exist at head
122 » » return nil 128 » » return errQueryEntrySkipped
dnj 2016/11/11 08:37:53 This is the source of the problem. If the callback
123 } 129 }
124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, s.kc)
125 memoryCorruption(err)
126 130
127 » return s.cb(key, pm, gc) 131 » var pm ds.PropertyMap
132 » if !s.isCount {
133 » » var err error
134 » » if pm, err = serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc); err != nil {
135 » » » memoryCorruption(err)
136 » » }
137 » }
138
139 » return cb(key, pm, gc)
128 } 140 }
129 141
130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy { 142 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, isCount bool, he ad memStore) queryStrategy {
131 if fq.KeysOnly() { 143 if fq.KeysOnly() {
132 » » return &keysOnlyStrategy{cb, stringset.New(0)} 144 » » return &keysOnlyStrategy{stringset.New(0)}
133 } 145 }
134 if len(fq.Project()) > 0 { 146 if len(fq.Project()) > 0 {
135 » » return newProjectionStrategy(fq, rq, cb) 147 » » return newProjectionStrategy(fq, rq)
136 } 148 }
137 » return newNormalStrategy(rq.kc, cb, head) 149 » return newNormalStrategy(rq.kc, isCount, head)
138 } 150 }
139 151
140 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) { 152 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) {
141 buf := serialize.Invertible(bytes.NewBuffer(suffix)) 153 buf := serialize.Invertible(bytes.NewBuffer(suffix))
142 decoded = make([]ds.Property, len(suffixFormat)) 154 decoded = make([]ds.Property, len(suffixFormat))
143 raw = make([][]byte, len(suffixFormat)) 155 raw = make([][]byte, len(suffixFormat))
144 156
145 err := error(nil) 157 err := error(nil)
146 kc := ds.MkKeyContext(aid, ns) 158 kc := ds.MkKeyContext(aid, ns)
147 for i := range decoded { 159 for i := range decoded {
(...skipping 11 matching lines...) Expand all
159 suffix = suffix[offset:] 171 suffix = suffix[offset:]
160 if needInvert { 172 if needInvert {
161 raw[i] = serialize.Invert(raw[i]) 173 raw[i] = serialize.Invert(raw[i])
162 } 174 }
163 } 175 }
164 176
165 return 177 return
166 } 178 }
167 179
168 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m emStore) (ret int64, err error) { 180 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m emStore) (ret int64, err error) {
169 » if len(fq.Project()) == 0 && !fq.KeysOnly() { 181 » err = executeQuery(fq, kc, isTxn, true, idx, head, func(_ *ds.Key, _ ds. PropertyMap, _ ds.CursorCB) error {
dnj 2016/11/11 08:37:53 So maybe I'm reading things wrong, but I think Cou
170 » » fq, err = fq.Original().KeysOnly(true).Finalize()
171 » » if err != nil {
172 » » » return
173 » » }
174 » }
175 » err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.Proper tyMap, _ ds.CursorCB) error {
176 ret++ 182 ret++
177 return nil 183 return nil
178 }) 184 })
179 return 185 return
180 } 186 }
181 187
182 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor e, cb ds.RawRunCB) error { 188 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor e, cb ds.RawRunCB) error {
183 // these objects have no properties, so any filters on properties cause an 189 // these objects have no properties, so any filters on properties cause an
184 // empty result. 190 // empty result.
185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 { 191 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 {
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
218 } else { 224 } else {
219 k = kc.MakeKey("__namespace__", ns) 225 k = kc.MakeKey("__namespace__", ns)
220 } 226 }
221 if err := cb(k, nil, cursFn); err != nil { 227 if err := cb(k, nil, cursFn); err != nil {
222 return err 228 return err
223 } 229 }
224 } 230 }
225 return nil 231 return nil
226 } 232 }
227 233
228 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore, cb ds.RawRunCB) error { 234 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn, isCount bool, idx, head memStore, cb ds.RawRunCB) error {
229 rq, err := reduce(fq, kc, isTxn) 235 rq, err := reduce(fq, kc, isTxn)
230 if err == ds.ErrNullQuery { 236 if err == ds.ErrNullQuery {
231 return nil 237 return nil
232 } 238 }
233 if err != nil { 239 if err != nil {
234 return err 240 return err
235 } 241 }
236 242
237 if rq.kind == "__namespace__" { 243 if rq.kind == "__namespace__" {
238 return executeNamespaceQuery(fq, kc, head, cb) 244 return executeNamespaceQuery(fq, kc, head, cb)
239 } 245 }
240 246
241 idxs, err := getIndexes(rq, idx) 247 idxs, err := getIndexes(rq, idx)
242 if err == ds.ErrNullQuery { 248 if err == ds.ErrNullQuery {
243 return nil 249 return nil
244 } 250 }
245 if err != nil { 251 if err != nil {
246 return err 252 return err
247 } 253 }
248 254
249 » strategy := pickQueryStrategy(fq, rq, cb, head) 255 » strategy := pickQueryStrategy(fq, rq, isCount, head)
250 if strategy == nil { 256 if strategy == nil {
251 // e.g. the normalStrategy found that there were NO entities in the current 257 // e.g. the normalStrategy found that there were NO entities in the current
252 // namespace. 258 // namespace.
253 return nil 259 return nil
254 } 260 }
255 261
256 offset, _ := fq.Offset() 262 offset, _ := fq.Offset()
257 limit, hasLimit := fq.Limit() 263 limit, hasLimit := fq.Limit()
258 264
259 cursorPrefix := []byte(nil) 265 cursorPrefix := []byte(nil)
(...skipping 10 matching lines...) Expand all
270 } 276 }
271 cursorPrefix = buf.Bytes() 277 cursorPrefix = buf.Bytes()
272 } 278 }
273 // TODO(riannucci): Do we need to decrement suffix inste ad of increment 279 // TODO(riannucci): Do we need to decrement suffix inste ad of increment
274 // if we're sorting by __key__ DESCENDING? 280 // if we're sorting by __key__ DESCENDING?
275 return queryCursor(serialize.Join(cursorPrefix, incremen t(suffix))), nil 281 return queryCursor(serialize.Join(cursorPrefix, incremen t(suffix))), nil
276 } 282 }
277 } 283 }
278 284
279 return multiIterate(idxs, func(suffix []byte) error { 285 return multiIterate(idxs, func(suffix []byte) error {
286 iterCB := cb
287
288 // Apply our limit/offset.
280 if offset > 0 { 289 if offset > 0 {
281 » » » offset-- 290 » » » // Don't actually callback for this entry.
282 » » » return nil 291 » » » iterCB = func(*ds.Key, ds.PropertyMap, ds.CursorCB) erro r { return nil }
283 } 292 }
284 if hasLimit { 293 if hasLimit {
285 if limit <= 0 { 294 if limit <= 0 {
286 return ds.Stop 295 return ds.Stop
287 } 296 }
288 limit--
289 } 297 }
290 298
291 rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq. suffixFormat, suffix, -1) 299 rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq. suffixFormat, suffix, -1)
292 300
293 keyProp := decodedProps[len(decodedProps)-1] 301 keyProp := decodedProps[len(decodedProps)-1]
294 if keyProp.Type() != ds.PTKey { 302 if keyProp.Type() != ds.PTKey {
295 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) 303 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp))
296 } 304 }
297 305
298 » » return strategy.handle( 306 » » err := strategy.handle(
299 rawData, decodedProps, keyProp.Value().(*ds.Key), 307 rawData, decodedProps, keyProp.Value().(*ds.Key),
300 » » » getCursorFn(suffix)) 308 » » » iterCB, getCursorFn(suffix))
309
310 » » if err == errQueryEntrySkipped {
311 » » » return nil
312 » » }
313
314 » » // We processed this entry; advance our limit/offset.
315 » » if offset > 0 {
316 » » » offset--
317 » » } else if hasLimit {
318 » » » limit--
319 » » }
320 » » return err
301 }) 321 })
302 } 322 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698