| Index: filter/txnBuf/query_merger.go
|
| diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go
|
| index e87d91ea5daed6de1b5ebc7f5999d377b46d30ae..d0e7b37f45cd03c618e561d64b6d907a0f05e141 100644
|
| --- a/filter/txnBuf/query_merger.go
|
| +++ b/filter/txnBuf/query_merger.go
|
| @@ -26,13 +26,13 @@ func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
|
| go func() {
|
| defer close(c)
|
|
|
| - err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) {
|
| + err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) error {
|
| i := &item{key: k, data: pm}
|
| select {
|
| case c <- i:
|
| - return true
|
| + return nil
|
| case <-stopChan:
|
| - return false
|
| + return ds.Stop
|
| }
|
| })
|
| if err != nil {
|
| @@ -115,7 +115,7 @@ func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
|
| //
|
| // See also `dsTxnBuf.Run()`.
|
| func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker,
|
| - memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error {
|
| + memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) error) error {
|
|
|
| toRun, err := adjustQuery(fq)
|
| if err != nil {
|
| @@ -234,8 +234,11 @@ func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker,
|
| continue
|
| }
|
| }
|
| - if !cb(toUse.key, toUse.data) {
|
| - break
|
| + if err := cb(toUse.key, toUse.data); err != nil {
|
| + if err == ds.Stop {
|
| + err = nil
|
| + }
|
| + return err
|
| }
|
| }
|
|
|
|
|