Index: filter/txnBuf/query_merger.go |
diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go |
index e87d91ea5daed6de1b5ebc7f5999d377b46d30ae..d07f5cf1e0f631b960ce590230b6a08d3aae9081 100644 |
--- a/filter/txnBuf/query_merger.go |
+++ b/filter/txnBuf/query_merger.go |
@@ -26,13 +26,13 @@ func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac |
go func() { |
defer close(c) |
- err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) { |
+ err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) error { |
i := &item{key: k, data: pm} |
select { |
case c <- i: |
- return true |
+ return nil |
case <-stopChan: |
- return false |
+ return ds.Stop |
} |
}) |
if err != nil { |
@@ -115,7 +115,7 @@ func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { |
// |
// See also `dsTxnBuf.Run()`. |
func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, |
- memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error { |
+ memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) error) error { |
toRun, err := adjustQuery(fq) |
if err != nil { |
@@ -234,8 +234,11 @@ func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, |
continue |
} |
} |
- if !cb(toUse.key, toUse.data) { |
- break |
+ if err := cb(toUse.key, toUse.data); err != nil { |
+ if err == ds.Stop { |
+ return nil |
+ } |
+ return err |
} |
} |