Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: filter/txnBuf/query_merger.go

Issue 1521823003: Clean up callback interfaces. (Closed) Base URL: https://github.com/luci/gae.git@extra
Patch Set: fixins Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/txnBuf/ds_txn.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package txnBuf 5 package txnBuf
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "sort" 9 "sort"
10 10
11 "github.com/luci/gae/impl/memory" 11 "github.com/luci/gae/impl/memory"
12 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize" 13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/luci-go/common/stringset" 14 "github.com/luci/luci-go/common/stringset"
15 ) 15 )
16 16
17 // queryToIter takes a FinalizedQuery and returns an iterator function which 17 // queryToIter takes a FinalizedQuery and returns an iterator function which
18 // will produce either *items or errors. 18 // will produce either *items or errors.
19 // 19 //
20 // - d is the raw datastore to run this query on 20 // - d is the raw datastore to run this query on
21 // - filter is a function which will return true if the given key should be 21 // - filter is a function which will return true if the given key should be
22 // excluded from the result set. 22 // excluded from the result set.
23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e) func() (*item, error) { 23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e) func() (*item, error) {
24 c := make(chan *item) 24 c := make(chan *item)
25 25
26 go func() { 26 go func() {
27 defer close(c) 27 defer close(c)
28 28
29 » » err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) { 29 » » err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) error {
30 i := &item{key: k, data: pm} 30 i := &item{key: k, data: pm}
31 select { 31 select {
32 case c <- i: 32 case c <- i:
33 » » » » return true 33 » » » » return nil
34 case <-stopChan: 34 case <-stopChan:
35 » » » » return false 35 » » » » return ds.Stop
36 } 36 }
37 }) 37 })
38 if err != nil { 38 if err != nil {
39 c <- &item{err: err} 39 c <- &item{err: err}
40 } 40 }
41 }() 41 }()
42 42
43 return func() (*item, error) { 43 return func() (*item, error) {
44 itm := <-c 44 itm := <-c
45 if itm == nil { 45 if itm == nil {
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
108 // runMergedQueries executes a user query `fq` against the parent datastore as 108 // runMergedQueries executes a user query `fq` against the parent datastore as
109 // well as the in-memory datastore, calling `cb` with the merged result set. 109 // well as the in-memory datastore, calling `cb` with the merged result set.
110 // 110 //
111 // It's expected that the caller of this function will apply limit and offset 111 // It's expected that the caller of this function will apply limit and offset
112 // if the query contains those restrictions. This may convert the query to 112 // if the query contains those restrictions. This may convert the query to
113 // an expanded projection query with more data than the user asked for. It's the 113 // an expanded projection query with more data than the user asked for. It's the
114 // caller's responsibility to prune away the extra data. 114 // caller's responsibility to prune away the extra data.
115 // 115 //
116 // See also `dsTxnBuf.Run()`. 116 // See also `dsTxnBuf.Run()`.
117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, 117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker,
118 » memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error { 118 » memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) error) error {
119 119
120 toRun, err := adjustQuery(fq) 120 toRun, err := adjustQuery(fq)
121 if err != nil { 121 if err != nil {
122 return err 122 return err
123 } 123 }
124 124
125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) 125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
126 cmpOrder := fq.Orders() 126 cmpOrder := fq.Orders()
127 cmpFn := func(i *item) string { 127 cmpFn := func(i *item) string {
128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) 128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
227 } 227 }
228 if distinct != nil { 228 if distinct != nil {
229 // NOTE: We know that toUse will not be used after this point for 229 // NOTE: We know that toUse will not be used after this point for
230 // comparison purposes, so re-use its cmpRow property fo r our distinct 230 // comparison purposes, so re-use its cmpRow property fo r our distinct
231 // filter here. 231 // filter here.
232 toUse.cmpRow = "" 232 toUse.cmpRow = ""
233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) { 233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) {
234 continue 234 continue
235 } 235 }
236 } 236 }
237 » » if !cb(toUse.key, toUse.data) { 237 » » if err := cb(toUse.key, toUse.data); err != nil {
238 » » » break 238 » » » if err == ds.Stop {
239 » » » » return nil
240 » » » }
241 » » » return err
239 } 242 }
240 } 243 }
241 244
242 return nil 245 return nil
243 } 246 }
244 247
245 // toComparableString computes the byte-sortable 'order' string for the given 248 // toComparableString computes the byte-sortable 'order' string for the given
246 // key/PropertyMap. 249 // key/PropertyMap.
247 // 250 //
248 // * start/end are byte sequences which are the inequality bounds of the 251 // * start/end are byte sequences which are the inequality bounds of the
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 } 292 }
290 if !foundOne { 293 if !foundOne {
291 return nil, nil 294 return nil, nil
292 } 295 }
293 } 296 }
294 if end != nil && bytes.Compare(soFar, end) >= 0 { 297 if end != nil && bytes.Compare(soFar, end) >= 0 {
295 return nil, nil 298 return nil, nil
296 } 299 }
297 return soFar, ps["__key__"][0] 300 return soFar, ps["__key__"][0]
298 } 301 }
OLDNEW
« no previous file with comments | « filter/txnBuf/ds_txn.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698