OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package txnBuf | 5 package txnBuf |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "sort" | 9 "sort" |
10 | 10 |
11 "github.com/luci/gae/impl/memory" | 11 "github.com/luci/gae/impl/memory" |
12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
14 "github.com/luci/luci-go/common/stringset" | 14 "github.com/luci/luci-go/common/stringset" |
15 ) | 15 ) |
16 | 16 |
17 // queryToIter takes a FinalizedQuery and returns an iterator function which | 17 // queryToIter takes a FinalizedQuery and returns an iterator function which |
18 // will produce either *items or errors. | 18 // will produce either *items or errors. |
19 // | 19 // |
20 // - d is the raw datastore to run this query on | 20 // - d is the raw datastore to run this query on |
21 // - filter is a function which will return true if the given key should be | 21 // - filter is a function which will return true if the given key should be |
22 // excluded from the result set. | 22 // excluded from the result set. |
23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
e) func() (*item, error) { | 23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
e) func() (*item, error) { |
24 c := make(chan *item) | 24 c := make(chan *item) |
25 | 25 |
26 go func() { | 26 go func() { |
27 defer close(c) | 27 defer close(c) |
28 | 28 |
29 » » err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC
B) (keepGoing bool) { | 29 » » err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC
B) error { |
30 i := &item{key: k, data: pm} | 30 i := &item{key: k, data: pm} |
31 select { | 31 select { |
32 case c <- i: | 32 case c <- i: |
33 » » » » return true | 33 » » » » return nil |
34 case <-stopChan: | 34 case <-stopChan: |
35 » » » » return false | 35 » » » » return ds.Stop |
36 } | 36 } |
37 }) | 37 }) |
38 if err != nil { | 38 if err != nil { |
39 c <- &item{err: err} | 39 c <- &item{err: err} |
40 } | 40 } |
41 }() | 41 }() |
42 | 42 |
43 return func() (*item, error) { | 43 return func() (*item, error) { |
44 itm := <-c | 44 itm := <-c |
45 if itm == nil { | 45 if itm == nil { |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
108 // runMergedQueries executes a user query `fq` against the parent datastore as | 108 // runMergedQueries executes a user query `fq` against the parent datastore as |
109 // well as the in-memory datastore, calling `cb` with the merged result set. | 109 // well as the in-memory datastore, calling `cb` with the merged result set. |
110 // | 110 // |
111 // It's expected that the caller of this function will apply limit and offset | 111 // It's expected that the caller of this function will apply limit and offset |
112 // if the query contains those restrictions. This may convert the query to | 112 // if the query contains those restrictions. This may convert the query to |
113 // an expanded projection query with more data than the user asked for. It's the | 113 // an expanded projection query with more data than the user asked for. It's the |
114 // caller's responsibility to prune away the extra data. | 114 // caller's responsibility to prune away the extra data. |
115 // | 115 // |
116 // See also `dsTxnBuf.Run()`. | 116 // See also `dsTxnBuf.Run()`. |
117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, | 117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, |
118 » memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap)
bool) error { | 118 » memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap)
error) error { |
119 | 119 |
120 toRun, err := adjustQuery(fq) | 120 toRun, err := adjustQuery(fq) |
121 if err != nil { | 121 if err != nil { |
122 return err | 122 return err |
123 } | 123 } |
124 | 124 |
125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) | 125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) |
126 cmpOrder := fq.Orders() | 126 cmpOrder := fq.Orders() |
127 cmpFn := func(i *item) string { | 127 cmpFn := func(i *item) string { |
128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) | 128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
227 } | 227 } |
228 if distinct != nil { | 228 if distinct != nil { |
229 // NOTE: We know that toUse will not be used after this
point for | 229 // NOTE: We know that toUse will not be used after this
point for |
230 // comparison purposes, so re-use its cmpRow property fo
r our distinct | 230 // comparison purposes, so re-use its cmpRow property fo
r our distinct |
231 // filter here. | 231 // filter here. |
232 toUse.cmpRow = "" | 232 toUse.cmpRow = "" |
233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder
)) { | 233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder
)) { |
234 continue | 234 continue |
235 } | 235 } |
236 } | 236 } |
237 » » if !cb(toUse.key, toUse.data) { | 237 » » if err := cb(toUse.key, toUse.data); err != nil { |
238 » » » break | 238 » » » if err == ds.Stop { |
| 239 » » » » return nil |
| 240 » » » } |
| 241 » » » return err |
239 } | 242 } |
240 } | 243 } |
241 | 244 |
242 return nil | 245 return nil |
243 } | 246 } |
244 | 247 |
245 // toComparableString computes the byte-sortable 'order' string for the given | 248 // toComparableString computes the byte-sortable 'order' string for the given |
246 // key/PropertyMap. | 249 // key/PropertyMap. |
247 // | 250 // |
248 // * start/end are byte sequences which are the inequality bounds of the | 251 // * start/end are byte sequences which are the inequality bounds of the |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
289 } | 292 } |
290 if !foundOne { | 293 if !foundOne { |
291 return nil, nil | 294 return nil, nil |
292 } | 295 } |
293 } | 296 } |
294 if end != nil && bytes.Compare(soFar, end) >= 0 { | 297 if end != nil && bytes.Compare(soFar, end) >= 0 { |
295 return nil, nil | 298 return nil, nil |
296 } | 299 } |
297 return soFar, ps["__key__"][0] | 300 return soFar, ps["__key__"][0] |
298 } | 301 } |
OLD | NEW |