Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 | 11 |
| 12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/luci-go/common/data/cmpbin" | 14 "github.com/luci/luci-go/common/data/cmpbin" |
| 15 "github.com/luci/luci-go/common/data/stringset" | 15 "github.com/luci/luci-go/common/data/stringset" |
| 16 ) | 16 ) |
| 17 | 17 |
| 18 // errQueryEntrySkipped is a sentinel error returned by queryStrategy handlers | |
| 19 // if they decided not to process the entry in their handle(). | |
| 20 var errQueryEntrySkipped = errors.New("query result skipped") | |
| 21 | |
| 18 type queryStrategy interface { | 22 type queryStrategy interface { |
| 19 // handle applies the strategy to the embedded user callback. | 23 // handle applies the strategy to the embedded user callback. |
| 20 // - rawData is the slice of encoded Properties from the index row | 24 // - rawData is the slice of encoded Properties from the index row |
| 21 // (correctly de-inverted). | 25 // (correctly de-inverted). |
| 22 // - decodedProps is the slice of decoded Properties from the index ro w | 26 // - decodedProps is the slice of decoded Properties from the index ro w |
| 23 // - key is the decoded Key from the index row (the last item in rawDa ta and | 27 // - key is the decoded Key from the index row (the last item in rawDa ta and |
| 24 // decodedProps) | 28 // decodedProps) |
| 25 // - gc is the getCursor function to be passed to the user's callback | 29 // - gc is the getCursor function to be passed to the user's callback |
| 26 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun c() (ds.Cursor, error)) error | 30 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, |
| 31 » » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error | |
| 27 } | 32 } |
| 28 | 33 |
| 29 type projectionLookup struct { | 34 type projectionLookup struct { |
| 30 suffixIndex int | 35 suffixIndex int |
| 31 propertyName string | 36 propertyName string |
| 32 } | 37 } |
| 33 | 38 |
| 34 type projectionStrategy struct { | 39 type projectionStrategy struct { |
| 35 cb ds.RawRunCB | |
| 36 | |
| 37 project []projectionLookup | 40 project []projectionLookup |
| 38 distinct stringset.Set | 41 distinct stringset.Set |
| 39 } | 42 } |
| 40 | 43 |
| 41 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun CB) queryStrategy { | 44 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery) queryStrateg y { |
| 42 proj := fq.Project() | 45 proj := fq.Project() |
| 43 | 46 |
| 44 projectionLookups := make([]projectionLookup, len(proj)) | 47 projectionLookups := make([]projectionLookup, len(proj)) |
| 45 for i, prop := range proj { | 48 for i, prop := range proj { |
| 46 projectionLookups[i].propertyName = prop | 49 projectionLookups[i].propertyName = prop |
| 47 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?") | 50 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?") |
| 48 for j, col := range rq.suffixFormat { | 51 for j, col := range rq.suffixFormat { |
| 49 if col.Property == prop { | 52 if col.Property == prop { |
| 50 projectionLookups[i].suffixIndex = j | 53 projectionLookups[i].suffixIndex = j |
| 51 lookupErr = nil | 54 lookupErr = nil |
| 52 break | 55 break |
| 53 } | 56 } |
| 54 } | 57 } |
| 55 impossible(lookupErr) | 58 impossible(lookupErr) |
| 56 } | 59 } |
| 57 » ret := &projectionStrategy{cb: cb, project: projectionLookups} | 60 » ret := &projectionStrategy{project: projectionLookups} |
| 58 if fq.Distinct() { | 61 if fq.Distinct() { |
| 59 ret.distinct = stringset.New(0) | 62 ret.distinct = stringset.New(0) |
| 60 } | 63 } |
| 61 return ret | 64 return ret |
| 62 } | 65 } |
| 63 | 66 |
| 64 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key, gc func() (ds.Cursor, error)) error { | 67 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key, |
| 68 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { | |
| 69 | |
| 65 projectedRaw := [][]byte(nil) | 70 projectedRaw := [][]byte(nil) |
| 66 if s.distinct != nil { | 71 if s.distinct != nil { |
| 67 projectedRaw = make([][]byte, len(decodedProps)) | 72 projectedRaw = make([][]byte, len(decodedProps)) |
| 68 } | 73 } |
| 69 pmap := make(ds.PropertyMap, len(s.project)) | 74 pmap := make(ds.PropertyMap, len(s.project)) |
| 70 for i, p := range s.project { | 75 for i, p := range s.project { |
| 71 if s.distinct != nil { | 76 if s.distinct != nil { |
| 72 projectedRaw[i] = rawData[p.suffixIndex] | 77 projectedRaw[i] = rawData[p.suffixIndex] |
| 73 } | 78 } |
| 74 pmap[p.propertyName] = decodedProps[p.suffixIndex] | 79 pmap[p.propertyName] = decodedProps[p.suffixIndex] |
| 75 } | 80 } |
| 76 if s.distinct != nil { | 81 if s.distinct != nil { |
| 77 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { | 82 if !s.distinct.Add(string(serialize.Join(projectedRaw...))) { |
| 78 return nil | 83 return nil |
| 79 } | 84 } |
| 80 } | 85 } |
| 81 » return s.cb(key, pmap, gc) | 86 » return cb(key, pmap, gc) |
| 82 } | 87 } |
| 83 | 88 |
| 84 type keysOnlyStrategy struct { | 89 type keysOnlyStrategy struct { |
| 85 cb ds.RawRunCB | |
| 86 | |
| 87 dedup stringset.Set | 90 dedup stringset.Set |
| 88 } | 91 } |
| 89 | 92 |
| 90 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , gc func() (ds.Cursor, error)) error { | 93 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , |
| 94 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { | |
| 95 | |
| 91 if !s.dedup.Add(string(rawData[len(rawData)-1])) { | 96 if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
| 92 return nil | 97 return nil |
| 93 } | 98 } |
| 94 » return s.cb(key, nil, gc) | 99 » return cb(key, nil, gc) |
| 95 } | 100 } |
| 96 | 101 |
| 97 type normalStrategy struct { | 102 type normalStrategy struct { |
| 98 » cb ds.RawRunCB | 103 » kc ds.KeyContext |
| 99 | 104 » isCount bool |
| 100 » kc ds.KeyContext | 105 » head memCollection |
| 101 » head memCollection | 106 » dedup stringset.Set |
| 102 » dedup stringset.Set | |
| 103 } | 107 } |
| 104 | 108 |
| 105 func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStr ategy { | 109 func newNormalStrategy(kc ds.KeyContext, isCount bool, head memStore) queryStrat egy { |
| 106 coll := head.GetCollection("ents:" + kc.Namespace) | 110 coll := head.GetCollection("ents:" + kc.Namespace) |
| 107 if coll == nil { | 111 if coll == nil { |
| 108 return nil | 112 return nil |
| 109 } | 113 } |
| 110 » return &normalStrategy{cb, kc, coll, stringset.New(0)} | 114 » return &normalStrategy{kc, isCount, coll, stringset.New(0)} |
| 111 } | 115 } |
| 112 | 116 |
| 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { | 117 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
| 118 » cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { | |
| 119 | |
| 114 rawKey := rawData[len(rawData)-1] | 120 rawKey := rawData[len(rawData)-1] |
| 115 if !s.dedup.Add(string(rawKey)) { | 121 if !s.dedup.Add(string(rawKey)) { |
| 116 return nil | 122 return nil |
| 117 } | 123 } |
| 118 | 124 |
| 119 rawEnt := s.head.Get(rawKey) | 125 rawEnt := s.head.Get(rawKey) |
| 120 if rawEnt == nil { | 126 if rawEnt == nil { |
| 121 // entity doesn't exist at head | 127 // entity doesn't exist at head |
| 122 » » return nil | 128 » » return errQueryEntrySkipped |
|
dnj
2016/11/11 08:37:53
This is the source of the problem. If the callback
| |
| 123 } | 129 } |
| 124 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, s.kc) | |
| 125 memoryCorruption(err) | |
| 126 | 130 |
| 127 » return s.cb(key, pm, gc) | 131 » var pm ds.PropertyMap |
| 132 » if !s.isCount { | |
| 133 » » var err error | |
| 134 » » if pm, err = serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc); err != nil { | |
| 135 » » » memoryCorruption(err) | |
| 136 » » } | |
| 137 » } | |
| 138 | |
| 139 » return cb(key, pm, gc) | |
| 128 } | 140 } |
| 129 | 141 |
| 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy { | 142 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, isCount bool, he ad memStore) queryStrategy { |
| 131 if fq.KeysOnly() { | 143 if fq.KeysOnly() { |
| 132 » » return &keysOnlyStrategy{cb, stringset.New(0)} | 144 » » return &keysOnlyStrategy{stringset.New(0)} |
| 133 } | 145 } |
| 134 if len(fq.Project()) > 0 { | 146 if len(fq.Project()) > 0 { |
| 135 » » return newProjectionStrategy(fq, rq, cb) | 147 » » return newProjectionStrategy(fq, rq) |
| 136 } | 148 } |
| 137 » return newNormalStrategy(rq.kc, cb, head) | 149 » return newNormalStrategy(rq.kc, isCount, head) |
| 138 } | 150 } |
| 139 | 151 |
| 140 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) { | 152 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) { |
| 141 buf := serialize.Invertible(bytes.NewBuffer(suffix)) | 153 buf := serialize.Invertible(bytes.NewBuffer(suffix)) |
| 142 decoded = make([]ds.Property, len(suffixFormat)) | 154 decoded = make([]ds.Property, len(suffixFormat)) |
| 143 raw = make([][]byte, len(suffixFormat)) | 155 raw = make([][]byte, len(suffixFormat)) |
| 144 | 156 |
| 145 err := error(nil) | 157 err := error(nil) |
| 146 kc := ds.MkKeyContext(aid, ns) | 158 kc := ds.MkKeyContext(aid, ns) |
| 147 for i := range decoded { | 159 for i := range decoded { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 159 suffix = suffix[offset:] | 171 suffix = suffix[offset:] |
| 160 if needInvert { | 172 if needInvert { |
| 161 raw[i] = serialize.Invert(raw[i]) | 173 raw[i] = serialize.Invert(raw[i]) |
| 162 } | 174 } |
| 163 } | 175 } |
| 164 | 176 |
| 165 return | 177 return |
| 166 } | 178 } |
| 167 | 179 |
| 168 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m emStore) (ret int64, err error) { | 180 func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head m emStore) (ret int64, err error) { |
| 169 » if len(fq.Project()) == 0 && !fq.KeysOnly() { | 181 » err = executeQuery(fq, kc, isTxn, true, idx, head, func(_ *ds.Key, _ ds. PropertyMap, _ ds.CursorCB) error { |
|
dnj
2016/11/11 08:37:53
So maybe I'm reading things wrong, but I think Cou
| |
| 170 » » fq, err = fq.Original().KeysOnly(true).Finalize() | |
| 171 » » if err != nil { | |
| 172 » » » return | |
| 173 » » } | |
| 174 » } | |
| 175 » err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.Proper tyMap, _ ds.CursorCB) error { | |
| 176 ret++ | 182 ret++ |
| 177 return nil | 183 return nil |
| 178 }) | 184 }) |
| 179 return | 185 return |
| 180 } | 186 } |
| 181 | 187 |
| 182 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor e, cb ds.RawRunCB) error { | 188 func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor e, cb ds.RawRunCB) error { |
| 183 // these objects have no properties, so any filters on properties cause an | 189 // these objects have no properties, so any filters on properties cause an |
| 184 // empty result. | 190 // empty result. |
| 185 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 { | 191 if len(fq.EqFilters()) > 0 || len(fq.Project()) > 0 || len(fq.Orders()) > 1 { |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 218 } else { | 224 } else { |
| 219 k = kc.MakeKey("__namespace__", ns) | 225 k = kc.MakeKey("__namespace__", ns) |
| 220 } | 226 } |
| 221 if err := cb(k, nil, cursFn); err != nil { | 227 if err := cb(k, nil, cursFn); err != nil { |
| 222 return err | 228 return err |
| 223 } | 229 } |
| 224 } | 230 } |
| 225 return nil | 231 return nil |
| 226 } | 232 } |
| 227 | 233 |
| 228 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore, cb ds.RawRunCB) error { | 234 func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn, isCount bool, idx, head memStore, cb ds.RawRunCB) error { |
| 229 rq, err := reduce(fq, kc, isTxn) | 235 rq, err := reduce(fq, kc, isTxn) |
| 230 if err == ds.ErrNullQuery { | 236 if err == ds.ErrNullQuery { |
| 231 return nil | 237 return nil |
| 232 } | 238 } |
| 233 if err != nil { | 239 if err != nil { |
| 234 return err | 240 return err |
| 235 } | 241 } |
| 236 | 242 |
| 237 if rq.kind == "__namespace__" { | 243 if rq.kind == "__namespace__" { |
| 238 return executeNamespaceQuery(fq, kc, head, cb) | 244 return executeNamespaceQuery(fq, kc, head, cb) |
| 239 } | 245 } |
| 240 | 246 |
| 241 idxs, err := getIndexes(rq, idx) | 247 idxs, err := getIndexes(rq, idx) |
| 242 if err == ds.ErrNullQuery { | 248 if err == ds.ErrNullQuery { |
| 243 return nil | 249 return nil |
| 244 } | 250 } |
| 245 if err != nil { | 251 if err != nil { |
| 246 return err | 252 return err |
| 247 } | 253 } |
| 248 | 254 |
| 249 » strategy := pickQueryStrategy(fq, rq, cb, head) | 255 » strategy := pickQueryStrategy(fq, rq, isCount, head) |
| 250 if strategy == nil { | 256 if strategy == nil { |
| 251 // e.g. the normalStrategy found that there were NO entities in the current | 257 // e.g. the normalStrategy found that there were NO entities in the current |
| 252 // namespace. | 258 // namespace. |
| 253 return nil | 259 return nil |
| 254 } | 260 } |
| 255 | 261 |
| 256 offset, _ := fq.Offset() | 262 offset, _ := fq.Offset() |
| 257 limit, hasLimit := fq.Limit() | 263 limit, hasLimit := fq.Limit() |
| 258 | 264 |
| 259 cursorPrefix := []byte(nil) | 265 cursorPrefix := []byte(nil) |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 270 } | 276 } |
| 271 cursorPrefix = buf.Bytes() | 277 cursorPrefix = buf.Bytes() |
| 272 } | 278 } |
| 273 // TODO(riannucci): Do we need to decrement suffix inste ad of increment | 279 // TODO(riannucci): Do we need to decrement suffix inste ad of increment |
| 274 // if we're sorting by __key__ DESCENDING? | 280 // if we're sorting by __key__ DESCENDING? |
| 275 return queryCursor(serialize.Join(cursorPrefix, incremen t(suffix))), nil | 281 return queryCursor(serialize.Join(cursorPrefix, incremen t(suffix))), nil |
| 276 } | 282 } |
| 277 } | 283 } |
| 278 | 284 |
| 279 return multiIterate(idxs, func(suffix []byte) error { | 285 return multiIterate(idxs, func(suffix []byte) error { |
| 286 iterCB := cb | |
| 287 | |
| 288 // Apply our limit/offset. | |
| 280 if offset > 0 { | 289 if offset > 0 { |
| 281 » » » offset-- | 290 » » » // Don't actually callback for this entry. |
| 282 » » » return nil | 291 » » » iterCB = func(*ds.Key, ds.PropertyMap, ds.CursorCB) erro r { return nil } |
| 283 } | 292 } |
| 284 if hasLimit { | 293 if hasLimit { |
| 285 if limit <= 0 { | 294 if limit <= 0 { |
| 286 return ds.Stop | 295 return ds.Stop |
| 287 } | 296 } |
| 288 limit-- | |
| 289 } | 297 } |
| 290 | 298 |
| 291 rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq. suffixFormat, suffix, -1) | 299 rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq. suffixFormat, suffix, -1) |
| 292 | 300 |
| 293 keyProp := decodedProps[len(decodedProps)-1] | 301 keyProp := decodedProps[len(decodedProps)-1] |
| 294 if keyProp.Type() != ds.PTKey { | 302 if keyProp.Type() != ds.PTKey { |
| 295 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) | 303 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) |
| 296 } | 304 } |
| 297 | 305 |
| 298 » » return strategy.handle( | 306 » » err := strategy.handle( |
| 299 rawData, decodedProps, keyProp.Value().(*ds.Key), | 307 rawData, decodedProps, keyProp.Value().(*ds.Key), |
| 300 » » » getCursorFn(suffix)) | 308 » » » iterCB, getCursorFn(suffix)) |
| 309 | |
| 310 » » if err == errQueryEntrySkipped { | |
| 311 » » » return nil | |
| 312 » » } | |
| 313 | |
| 314 » » // We processed this entry; advance our limit/offset. | |
| 315 » » if offset > 0 { | |
| 316 » » » offset-- | |
| 317 » » } else if hasLimit { | |
| 318 » » » limit-- | |
| 319 » » } | |
| 320 » » return err | |
| 301 }) | 321 }) |
| 302 } | 322 } |
| OLD | NEW |