Index: impl/memory/datastore_query_execution.go |
diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go |
index 2470b7de1dec926741d63e69e357209952d262ab..0c8a2d0938d4fc600ee50e4e53c2fee4b5632257 100644 |
--- a/impl/memory/datastore_query_execution.go |
+++ b/impl/memory/datastore_query_execution.go |
@@ -15,6 +15,10 @@ import ( |
"github.com/luci/luci-go/common/data/stringset" |
) |
+// errQueryEntrySkipped is a sentinel error returned by queryStrategy handlers |
+// if they decided not to process the entry in their handle(). |
+var errQueryEntrySkipped = errors.New("query result skipped") |
+ |
type queryStrategy interface { |
// handle applies the strategy to the embedded user callback. |
// - rawData is the slice of encoded Properties from the index row |
@@ -23,7 +27,8 @@ type queryStrategy interface { |
// - key is the decoded Key from the index row (the last item in rawData and |
// decodedProps) |
// - gc is the getCursor function to be passed to the user's callback |
- handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error |
+ handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, |
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error |
} |
type projectionLookup struct { |
@@ -32,13 +37,11 @@ type projectionLookup struct { |
} |
type projectionStrategy struct { |
- cb ds.RawRunCB |
- |
project []projectionLookup |
distinct stringset.Set |
} |
-func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB) queryStrategy { |
+func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery) queryStrategy { |
proj := fq.Project() |
projectionLookups := make([]projectionLookup, len(proj)) |
@@ -54,14 +57,16 @@ func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun |
} |
impossible(lookupErr) |
} |
- ret := &projectionStrategy{cb: cb, project: projectionLookups} |
+ ret := &projectionStrategy{project: projectionLookups} |
if fq.Distinct() { |
ret.distinct = stringset.New(0) |
} |
return ret |
} |
-func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { |
+func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, |
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { |
+ |
projectedRaw := [][]byte(nil) |
if s.distinct != nil { |
projectedRaw = make([][]byte, len(decodedProps)) |
@@ -78,39 +83,40 @@ func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property |
return nil |
} |
} |
- return s.cb(key, pmap, gc) |
+ return cb(key, pmap, gc) |
} |
type keysOnlyStrategy struct { |
- cb ds.RawRunCB |
- |
dedup stringset.Set |
} |
-func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { |
+func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { |
+ |
if !s.dedup.Add(string(rawData[len(rawData)-1])) { |
return nil |
} |
- return s.cb(key, nil, gc) |
+ return cb(key, nil, gc) |
} |
type normalStrategy struct { |
- cb ds.RawRunCB |
- |
- kc ds.KeyContext |
- head memCollection |
- dedup stringset.Set |
+ kc ds.KeyContext |
+ isCount bool |
+ head memCollection |
+ dedup stringset.Set |
} |
-func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStrategy { |
+func newNormalStrategy(kc ds.KeyContext, isCount bool, head memStore) queryStrategy { |
coll := head.GetCollection("ents:" + kc.Namespace) |
if coll == nil { |
return nil |
} |
- return &normalStrategy{cb, kc, coll, stringset.New(0)} |
+ return &normalStrategy{kc, isCount, coll, stringset.New(0)} |
} |
-func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error { |
+func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error { |
+ |
rawKey := rawData[len(rawData)-1] |
if !s.dedup.Add(string(rawKey)) { |
return nil |
@@ -119,22 +125,28 @@ func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, |
rawEnt := s.head.Get(rawKey) |
if rawEnt == nil { |
// entity doesn't exist at head |
- return nil |
+ return errQueryEntrySkipped |
dnj
2016/11/11 08:37:53
This is the source of the problem. If the callback
|
} |
- pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc) |
- memoryCorruption(err) |
- return s.cb(key, pm, gc) |
+ var pm ds.PropertyMap |
+ if !s.isCount { |
+ var err error |
+ if pm, err = serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc); err != nil { |
+ memoryCorruption(err) |
+ } |
+ } |
+ |
+ return cb(key, pm, gc) |
} |
-func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy { |
+func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, isCount bool, head memStore) queryStrategy { |
if fq.KeysOnly() { |
- return &keysOnlyStrategy{cb, stringset.New(0)} |
+ return &keysOnlyStrategy{stringset.New(0)} |
} |
if len(fq.Project()) > 0 { |
- return newProjectionStrategy(fq, rq, cb) |
+ return newProjectionStrategy(fq, rq) |
} |
- return newNormalStrategy(rq.kc, cb, head) |
+ return newNormalStrategy(rq.kc, isCount, head) |
} |
func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) { |
@@ -166,13 +178,7 @@ func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c |
} |
func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore) (ret int64, err error) { |
- if len(fq.Project()) == 0 && !fq.KeysOnly() { |
dnj
2016/11/11 08:37:53
So maybe I'm reading things wrong, but I think Cou
|
- fq, err = fq.Original().KeysOnly(true).Finalize() |
- if err != nil { |
- return |
- } |
- } |
- err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error { |
+ err = executeQuery(fq, kc, isTxn, true, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error { |
ret++ |
return nil |
}) |
@@ -225,7 +231,7 @@ func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor |
return nil |
} |
-func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore, cb ds.RawRunCB) error { |
+func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn, isCount bool, idx, head memStore, cb ds.RawRunCB) error { |
rq, err := reduce(fq, kc, isTxn) |
if err == ds.ErrNullQuery { |
return nil |
@@ -246,7 +252,7 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head |
return err |
} |
- strategy := pickQueryStrategy(fq, rq, cb, head) |
+ strategy := pickQueryStrategy(fq, rq, isCount, head) |
if strategy == nil { |
// e.g. the normalStrategy found that there were NO entities in the current |
// namespace. |
@@ -277,15 +283,17 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head |
} |
return multiIterate(idxs, func(suffix []byte) error { |
+ iterCB := cb |
+ |
+ // Apply our limit/offset. |
if offset > 0 { |
- offset-- |
- return nil |
+ // Don't actually callback for this entry. |
+ iterCB = func(*ds.Key, ds.PropertyMap, ds.CursorCB) error { return nil } |
} |
if hasLimit { |
if limit <= 0 { |
return ds.Stop |
} |
- limit-- |
} |
rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq.suffixFormat, suffix, -1) |
@@ -295,8 +303,20 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head |
impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", keyProp)) |
} |
- return strategy.handle( |
+ err := strategy.handle( |
rawData, decodedProps, keyProp.Value().(*ds.Key), |
- getCursorFn(suffix)) |
+ iterCB, getCursorFn(suffix)) |
+ |
+ if err == errQueryEntrySkipped { |
+ return nil |
+ } |
+ |
+ // We processed this entry; advance our limit/offset. |
+ if offset > 0 { |
+ offset-- |
+ } else if hasLimit { |
+ limit-- |
+ } |
+ return err |
}) |
} |