Chromium Code Reviews| Index: impl/memory/datastore_query_execution.go |
| diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go |
| index 2470b7de1dec926741d63e69e357209952d262ab..0c8a2d0938d4fc600ee50e4e53c2fee4b5632257 100644 |
| --- a/impl/memory/datastore_query_execution.go |
| +++ b/impl/memory/datastore_query_execution.go |
| @@ -15,6 +15,10 @@ import ( |
| "github.com/luci/luci-go/common/data/stringset" |
| ) |
| +// errQueryEntrySkipped is a sentinel error returned by queryStrategy handlers |
| +// if they decided not to process the entry in their handle(). |
| +var errQueryEntrySkipped = errors.New("query result skipped") |
| + |
| type queryStrategy interface { |
| // handle applies the strategy to the embedded user callback. |
| // - rawData is the slice of encoded Properties from the index row |
| @@ -23,7 +27,8 @@ type queryStrategy interface { |
| // - key is the decoded Key from the index row (the last item in rawData and |
| // decodedProps) |
| // - gc is the getCursor function to be passed to the user's callback |
| - handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error |
| + handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, |
| + cb ds.RawRunCB, gc func() (ds.Cursor, error)) error |
| } |
| type projectionLookup struct { |
| @@ -32,13 +37,11 @@ type projectionLookup struct { |
| } |
| type projectionStrategy struct { |
| - cb ds.RawRunCB |
| - |
| project []projectionLookup |
| distinct stringset.Set |
| } |
| -func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB) queryStrategy { |
| +func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery) queryStrategy { |
| proj := fq.Project() |
| projectionLookups := make([]projectionLookup, len(proj)) |
| @@ -54,14 +57,16 @@ func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun |
| } |
| impossible(lookupErr) |
| } |
| - ret := &projectionStrategy{cb: cb, project: projectionLookups} |
| + ret := &projectionStrategy{project: projectionLookups} |
| if fq.Distinct() { |
| ret.distinct = stringset.New(0) |
| } |
| return ret |
| } |
| -func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { |
| +func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, |
| + cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { |
| + |
| projectedRaw := [][]byte(nil) |
| if s.distinct != nil { |
| projectedRaw = make([][]byte, len(decodedProps)) |
| @@ -78,39 +83,40 @@ func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property |
| return nil |
| } |
| } |
| - return s.cb(key, pmap, gc) |
| + return cb(key, pmap, gc) |
| } |
| type keysOnlyStrategy struct { |
| - cb ds.RawRunCB |
| - |
| dedup stringset.Set |
| } |
| -func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { |
| +func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
| + cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { |
| + |
| if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
| return nil |
| } |
| - return s.cb(key, nil, gc) |
| + return cb(key, nil, gc) |
| } |
| type normalStrategy struct { |
| - cb ds.RawRunCB |
| - |
| - kc ds.KeyContext |
| - head memCollection |
| - dedup stringset.Set |
| + kc ds.KeyContext |
| + isCount bool |
| + head memCollection |
| + dedup stringset.Set |
| } |
| -func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStrategy { |
| +func newNormalStrategy(kc ds.KeyContext, isCount bool, head memStore) queryStrategy { |
| coll := head.GetCollection("ents:" + kc.Namespace) |
| if coll == nil { |
| return nil |
| } |
| - return &normalStrategy{cb, kc, coll, stringset.New(0)} |
| + return &normalStrategy{kc, isCount, coll, stringset.New(0)} |
| } |
| -func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { |
| +func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
| + cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { |
| + |
| rawKey := rawData[len(rawData)-1] |
| if !s.dedup.Add(string(rawKey)) { |
| return nil |
| @@ -119,22 +125,28 @@ func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
| rawEnt := s.head.Get(rawKey) |
| if rawEnt == nil { |
| // entity doesn't exist at head |
| - return nil |
| + return errQueryEntrySkipped |
|
dnj
2016/11/11 08:37:53
This is the source of the problem. If the callback
|
| } |
| - pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc) |
| - memoryCorruption(err) |
| - return s.cb(key, pm, gc) |
| + var pm ds.PropertyMap |
| + if !s.isCount { |
| + var err error |
| + if pm, err = serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc); err != nil { |
| + memoryCorruption(err) |
| + } |
| + } |
| + |
| + return cb(key, pm, gc) |
| } |
| -func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy { |
| +func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, isCount bool, head memStore) queryStrategy { |
| if fq.KeysOnly() { |
| - return &keysOnlyStrategy{cb, stringset.New(0)} |
| + return &keysOnlyStrategy{stringset.New(0)} |
| } |
| if len(fq.Project()) > 0 { |
| - return newProjectionStrategy(fq, rq, cb) |
| + return newProjectionStrategy(fq, rq) |
| } |
| - return newNormalStrategy(rq.kc, cb, head) |
| + return newNormalStrategy(rq.kc, isCount, head) |
| } |
| func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) { |
| @@ -166,13 +178,7 @@ func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c |
| } |
| func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore) (ret int64, err error) { |
| - if len(fq.Project()) == 0 && !fq.KeysOnly() { |
|
dnj
2016/11/11 08:37:53
So maybe I'm reading things wrong, but I think Cou
|
| - fq, err = fq.Original().KeysOnly(true).Finalize() |
| - if err != nil { |
| - return |
| - } |
| - } |
| - err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error { |
| + err = executeQuery(fq, kc, isTxn, true, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error { |
| ret++ |
| return nil |
| }) |
| @@ -225,7 +231,7 @@ func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor |
| return nil |
| } |
| -func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore, cb ds.RawRunCB) error { |
| +func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn, isCount bool, idx, head memStore, cb ds.RawRunCB) error { |
| rq, err := reduce(fq, kc, isTxn) |
| if err == ds.ErrNullQuery { |
| return nil |
| @@ -246,7 +252,7 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head |
| return err |
| } |
| - strategy := pickQueryStrategy(fq, rq, cb, head) |
| + strategy := pickQueryStrategy(fq, rq, isCount, head) |
| if strategy == nil { |
| // e.g. the normalStrategy found that there were NO entities in the current |
| // namespace. |
| @@ -277,15 +283,17 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head |
| } |
| return multiIterate(idxs, func(suffix []byte) error { |
| + iterCB := cb |
| + |
| + // Apply our limit/offset. |
| if offset > 0 { |
| - offset-- |
| - return nil |
| + // Don't actually callback for this entry. |
| + iterCB = func(*ds.Key, ds.PropertyMap, ds.CursorCB) error { return nil } |
| } |
| if hasLimit { |
| if limit <= 0 { |
| return ds.Stop |
| } |
| - limit-- |
| } |
| rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq.suffixFormat, suffix, -1) |
| @@ -295,8 +303,20 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head |
| impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", keyProp)) |
| } |
| - return strategy.handle( |
| + err := strategy.handle( |
| rawData, decodedProps, keyProp.Value().(*ds.Key), |
| - getCursorFn(suffix)) |
| + iterCB, getCursorFn(suffix)) |
| + |
| + if err == errQueryEntrySkipped { |
| + return nil |
| + } |
| + |
| + // We processed this entry; advance our limit/offset. |
| + if offset > 0 { |
| + offset-- |
| + } else if hasLimit { |
| + limit-- |
| + } |
| + return err |
| }) |
| } |