| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package txnBuf | 5 package txnBuf |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "sort" | 9 "sort" |
| 10 | 10 |
| 11 "github.com/luci/gae/impl/memory" | 11 "github.com/luci/gae/impl/memory" |
| 12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/luci-go/common/stringset" | 14 "github.com/luci/luci-go/common/stringset" |
| 15 ) | 15 ) |
| 16 | 16 |
| 17 // queryToIter takes a FinalizedQuery and returns an iterator function which | 17 // queryToIter takes a FinalizedQuery and returns an iterator function which |
| 18 // will produce either *items or errors. | 18 // will produce either *items or errors. |
| 19 // | 19 // |
| 20 // - d is the raw datastore to run this query on | 20 // - d is the raw datastore to run this query on |
| 21 // - filter is a function which will return true if the given key should be | 21 // - filter is a function which will return true if the given key should be |
| 22 // excluded from the result set. | 22 // excluded from the result set. |
| 23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
e) func() (*item, error) { | 23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
e) func() (*item, error) { |
| 24 c := make(chan *item) | 24 c := make(chan *item) |
| 25 | 25 |
| 26 go func() { | 26 go func() { |
| 27 defer close(c) | 27 defer close(c) |
| 28 | 28 |
| 29 » » err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC
B) (keepGoing bool) { | 29 » » err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC
B) error { |
| 30 i := &item{key: k, data: pm} | 30 i := &item{key: k, data: pm} |
| 31 select { | 31 select { |
| 32 case c <- i: | 32 case c <- i: |
| 33 » » » » return true | 33 » » » » return nil |
| 34 case <-stopChan: | 34 case <-stopChan: |
| 35 » » » » return false | 35 » » » » return ds.Stop |
| 36 } | 36 } |
| 37 }) | 37 }) |
| 38 if err != nil { | 38 if err != nil { |
| 39 c <- &item{err: err} | 39 c <- &item{err: err} |
| 40 } | 40 } |
| 41 }() | 41 }() |
| 42 | 42 |
| 43 return func() (*item, error) { | 43 return func() (*item, error) { |
| 44 itm := <-c | 44 itm := <-c |
| 45 if itm == nil { | 45 if itm == nil { |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 108 // runMergedQueries executes a user query `fq` against the parent datastore as | 108 // runMergedQueries executes a user query `fq` against the parent datastore as |
| 109 // well as the in-memory datastore, calling `cb` with the merged result set. | 109 // well as the in-memory datastore, calling `cb` with the merged result set. |
| 110 // | 110 // |
| 111 // It's expected that the caller of this function will apply limit and offset | 111 // It's expected that the caller of this function will apply limit and offset |
| 112 // if the query contains those restrictions. This may convert the query to | 112 // if the query contains those restrictions. This may convert the query to |
| 113 // an expanded projection query with more data than the user asked for. It's the | 113 // an expanded projection query with more data than the user asked for. It's the |
| 114 // caller's responsibility to prune away the extra data. | 114 // caller's responsibility to prune away the extra data. |
| 115 // | 115 // |
| 116 // See also `dsTxnBuf.Run()`. | 116 // See also `dsTxnBuf.Run()`. |
| 117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, | 117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, |
| 118 » memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap)
bool) error { | 118 » memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap)
error) error { |
| 119 | 119 |
| 120 toRun, err := adjustQuery(fq) | 120 toRun, err := adjustQuery(fq) |
| 121 if err != nil { | 121 if err != nil { |
| 122 return err | 122 return err |
| 123 } | 123 } |
| 124 | 124 |
| 125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) | 125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) |
| 126 cmpOrder := fq.Orders() | 126 cmpOrder := fq.Orders() |
| 127 cmpFn := func(i *item) string { | 127 cmpFn := func(i *item) string { |
| 128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) | 128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 227 } | 227 } |
| 228 if distinct != nil { | 228 if distinct != nil { |
| 229 // NOTE: We know that toUse will not be used after this
point for | 229 // NOTE: We know that toUse will not be used after this
point for |
| 230 // comparison purposes, so re-use its cmpRow property fo
r our distinct | 230 // comparison purposes, so re-use its cmpRow property fo
r our distinct |
| 231 // filter here. | 231 // filter here. |
| 232 toUse.cmpRow = "" | 232 toUse.cmpRow = "" |
| 233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder
)) { | 233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder
)) { |
| 234 continue | 234 continue |
| 235 } | 235 } |
| 236 } | 236 } |
| 237 » » if !cb(toUse.key, toUse.data) { | 237 » » if err := cb(toUse.key, toUse.data); err != nil { |
| 238 » » » break | 238 » » » if err == ds.Stop { |
| 239 » » » » return nil |
| 240 » » » } |
| 241 » » » return err |
| 239 } | 242 } |
| 240 } | 243 } |
| 241 | 244 |
| 242 return nil | 245 return nil |
| 243 } | 246 } |
| 244 | 247 |
| 245 // toComparableString computes the byte-sortable 'order' string for the given | 248 // toComparableString computes the byte-sortable 'order' string for the given |
| 246 // key/PropertyMap. | 249 // key/PropertyMap. |
| 247 // | 250 // |
| 248 // * start/end are byte sequences which are the inequality bounds of the | 251 // * start/end are byte sequences which are the inequality bounds of the |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 289 } | 292 } |
| 290 if !foundOne { | 293 if !foundOne { |
| 291 return nil, nil | 294 return nil, nil |
| 292 } | 295 } |
| 293 } | 296 } |
| 294 if end != nil && bytes.Compare(soFar, end) >= 0 { | 297 if end != nil && bytes.Compare(soFar, end) >= 0 { |
| 295 return nil, nil | 298 return nil, nil |
| 296 } | 299 } |
| 297 return soFar, ps["__key__"][0] | 300 return soFar, ps["__key__"][0] |
| 298 } | 301 } |
| OLD | NEW |