Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Unified Diff: impl/memory/datastore_query_execution.go

Issue 2498463003: Fix a bug where deletions weren't updating the raw Kind index. (Closed)
Patch Set: impl/memory: query limits/offsets count deleted Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: impl/memory/datastore_query_execution.go
diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go
index 2470b7de1dec926741d63e69e357209952d262ab..0c8a2d0938d4fc600ee50e4e53c2fee4b5632257 100644
--- a/impl/memory/datastore_query_execution.go
+++ b/impl/memory/datastore_query_execution.go
@@ -15,6 +15,10 @@ import (
"github.com/luci/luci-go/common/data/stringset"
)
+// errQueryEntrySkipped is a sentinel error returned by queryStrategy handlers
+// if they decided not to process the entry in their handle().
+var errQueryEntrySkipped = errors.New("query result skipped")
+
type queryStrategy interface {
// handle applies the strategy to the embedded user callback.
// - rawData is the slice of encoded Properties from the index row
@@ -23,7 +27,8 @@ type queryStrategy interface {
// - key is the decoded Key from the index row (the last item in rawData and
// decodedProps)
// - gc is the getCursor function to be passed to the user's callback
- handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error
+ handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key,
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error
}
type projectionLookup struct {
@@ -32,13 +37,11 @@ type projectionLookup struct {
}
type projectionStrategy struct {
- cb ds.RawRunCB
-
project []projectionLookup
distinct stringset.Set
}
-func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB) queryStrategy {
+func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery) queryStrategy {
proj := fq.Project()
projectionLookups := make([]projectionLookup, len(proj))
@@ -54,14 +57,16 @@ func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun
}
impossible(lookupErr)
}
- ret := &projectionStrategy{cb: cb, project: projectionLookups}
+ ret := &projectionStrategy{project: projectionLookups}
if fq.Distinct() {
ret.distinct = stringset.New(0)
}
return ret
}
-func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error {
+func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key,
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error {
+
projectedRaw := [][]byte(nil)
if s.distinct != nil {
projectedRaw = make([][]byte, len(decodedProps))
@@ -78,39 +83,40 @@ func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property
return nil
}
}
- return s.cb(key, pmap, gc)
+ return cb(key, pmap, gc)
}
type keysOnlyStrategy struct {
- cb ds.RawRunCB
-
dedup stringset.Set
}
-func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error {
+func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error {
+
if !s.dedup.Add(string(rawData[len(rawData)-1])) {
return nil
}
- return s.cb(key, nil, gc)
+ return cb(key, nil, gc)
}
type normalStrategy struct {
- cb ds.RawRunCB
-
- kc ds.KeyContext
- head memCollection
- dedup stringset.Set
+ kc ds.KeyContext
+ isCount bool
+ head memCollection
+ dedup stringset.Set
}
-func newNormalStrategy(kc ds.KeyContext, cb ds.RawRunCB, head memStore) queryStrategy {
+func newNormalStrategy(kc ds.KeyContext, isCount bool, head memStore) queryStrategy {
coll := head.GetCollection("ents:" + kc.Namespace)
if coll == nil {
return nil
}
- return &normalStrategy{cb, kc, coll, stringset.New(0)}
+ return &normalStrategy{kc, isCount, coll, stringset.New(0)}
}
-func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) error {
+func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
+ cb ds.RawRunCB, gc func() (ds.Cursor, error)) error {
+
rawKey := rawData[len(rawData)-1]
if !s.dedup.Add(string(rawKey)) {
return nil
@@ -119,22 +125,28 @@ func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key,
rawEnt := s.head.Get(rawKey)
if rawEnt == nil {
// entity doesn't exist at head
- return nil
+ return errQueryEntrySkipped
dnj 2016/11/11 08:37:53 This is the source of the problem. If the callback
}
- pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc)
- memoryCorruption(err)
- return s.cb(key, pm, gc)
+ var pm ds.PropertyMap
+ if !s.isCount {
+ var err error
+ if pm, err = serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, s.kc); err != nil {
+ memoryCorruption(err)
+ }
+ }
+
+ return cb(key, pm, gc)
}
-func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head memStore) queryStrategy {
+func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, isCount bool, head memStore) queryStrategy {
if fq.KeysOnly() {
- return &keysOnlyStrategy{cb, stringset.New(0)}
+ return &keysOnlyStrategy{stringset.New(0)}
}
if len(fq.Project()) > 0 {
- return newProjectionStrategy(fq, rq, cb)
+ return newProjectionStrategy(fq, rq)
}
- return newNormalStrategy(rq.kc, cb, head)
+ return newNormalStrategy(rq.kc, isCount, head)
}
func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) {
@@ -166,13 +178,7 @@ func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c
}
func countQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore) (ret int64, err error) {
- if len(fq.Project()) == 0 && !fq.KeysOnly() {
dnj 2016/11/11 08:37:53 So maybe I'm reading things wrong, but I think Cou
- fq, err = fq.Original().KeysOnly(true).Finalize()
- if err != nil {
- return
- }
- }
- err = executeQuery(fq, kc, isTxn, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error {
+ err = executeQuery(fq, kc, isTxn, true, idx, head, func(_ *ds.Key, _ ds.PropertyMap, _ ds.CursorCB) error {
ret++
return nil
})
@@ -225,7 +231,7 @@ func executeNamespaceQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, head memStor
return nil
}
-func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head memStore, cb ds.RawRunCB) error {
+func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn, isCount bool, idx, head memStore, cb ds.RawRunCB) error {
rq, err := reduce(fq, kc, isTxn)
if err == ds.ErrNullQuery {
return nil
@@ -246,7 +252,7 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head
return err
}
- strategy := pickQueryStrategy(fq, rq, cb, head)
+ strategy := pickQueryStrategy(fq, rq, isCount, head)
if strategy == nil {
// e.g. the normalStrategy found that there were NO entities in the current
// namespace.
@@ -277,15 +283,17 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head
}
return multiIterate(idxs, func(suffix []byte) error {
+ iterCB := cb
+
+ // Apply our limit/offset.
if offset > 0 {
- offset--
- return nil
+ // Don't actually callback for this entry.
+ iterCB = func(*ds.Key, ds.PropertyMap, ds.CursorCB) error { return nil }
}
if hasLimit {
if limit <= 0 {
return ds.Stop
}
- limit--
}
rawData, decodedProps := parseSuffix(kc.AppID, kc.Namespace, rq.suffixFormat, suffix, -1)
@@ -295,8 +303,20 @@ func executeQuery(fq *ds.FinalizedQuery, kc ds.KeyContext, isTxn bool, idx, head
impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", keyProp))
}
- return strategy.handle(
+ err := strategy.handle(
rawData, decodedProps, keyProp.Value().(*ds.Key),
- getCursorFn(suffix))
+ iterCB, getCursorFn(suffix))
+
+ if err == errQueryEntrySkipped {
+ return nil
+ }
+
+ // We processed this entry; advance our limit/offset.
+ if offset > 0 {
+ offset--
+ } else if hasLimit {
+ limit--
+ }
+ return err
})
}

Powered by Google App Engine
This is Rietveld 408576698