Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(650)

Side by Side Diff: filter/txnBuf/query_merger.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: change pcg timeout and codereview server to crcr.as.com Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/txnBuf/ds_txn.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sort"
10
11 "github.com/luci/gae/impl/memory"
12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/luci-go/common/stringset"
15 )
16
17 // queryToIter takes a FinalizedQuery and returns an iterator function which
18 // will produce either *items or errors.
19 //
20 // - d is the raw datastore to run this query on
21 // - filter is a function which will return true if the given key should be
22 // excluded from the result set.
23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e) func() (*item, error) {
24 c := make(chan *item)
25
26 go func() {
27 defer close(c)
28
29 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) {
30 i := &item{key: k, data: pm}
31 select {
32 case c <- i:
33 return true
34 case <-stopChan:
35 return false
36 }
37 })
38 if err != nil {
39 c <- &item{err: err}
40 }
41 }()
42
43 return func() (*item, error) {
44 itm := <-c
45 if itm == nil {
46 return nil, nil
47 }
48 if itm.err != nil {
49 return nil, itm.err
50 }
51 return itm, nil
52 }
53 }
54
55 // adjustQuery applies various mutations to the query to make it suitable for
56 // merging. In general, this removes limits and offsets the 'distinct' modifier,
57 // and it ensures that if there are sort orders which won't appear in the
58 // result data that the query is transformed into a projection query which
59 // contains all of the data. A non-projection query will never be transformed
60 // in this way.
61 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
62 q := fq.Original()
63
64 // The limit and offset must be done in-memory because otherwise we may
65 // request too few entities from the underlying store if many matching
66 // entities have been deleted in the buffered transaction.
67 q = q.Limit(-1)
68 q = q.Offset(-1)
69
70 // distinction must be done in-memory, because otherwise there's no way
71 // to merge in the effect of the in-flight changes (because there's no w ay
72 // to push back to the datastore "yeah, I know you told me that the (1, 2)
73 // result came from `/Bob,1`, but would you mind pretending that it didn 't
74 // and tell me next the one instead?
75 q = q.Distinct(false)
76
77 // since we need to merge results, we must have all order-related fields
78 // in each result. The only time we wouldn't have all the data available would
79 // be for a keys-only or projection query. To fix this, we convert all
80 // Projection and KeysOnly queries to project on /all/ Orders.
81 //
82 // FinalizedQuery already guarantees that all projected fields show up i n
83 // the Orders, but the projected fields could be a subset of the orders.
84 //
85 // Additionally on a keys-only query, any orders other than __key__ requ ire
86 // conversion of this query to a projection query including those orders in
87 // order to merge the results correctly.
88 //
89 // In both cases, the resulting objects returned to the higher layers of the
90 // stack will only include the information requested by the user; keys-o nly
91 // queries will discard all PropertyMap data, and projection queries wil l
92 // discard any field data that the user didn't ask for.
93 orders := fq.Orders()
94 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
95 q = q.KeysOnly(false)
96
97 for _, o := range orders {
98 if o.Property == "__key__" {
99 continue
100 }
101 q = q.Project(o.Property)
102 }
103 }
104
105 return q.Finalize()
106 }
107
108 // runMergedQueries executes a user query `fq` against the parent datastore as
109 // well as the in-memory datastore, calling `cb` with the merged result set.
110 //
111 // It's expected that the caller of this function will apply limit and offset
112 // if the query contains those restrictions. This may convert the query to
113 // an expanded projection query with more data than the user asked for. It's the
114 // caller's responsibility to prune away the extra data.
115 //
116 // See also `dsTxnBuf.Run()`.
117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker,
118 memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap) bool) error {
119
120 toRun, err := adjustQuery(fq)
121 if err != nil {
122 return err
123 }
124
125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
126 cmpOrder := fq.Orders()
127 cmpFn := func(i *item) string {
128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
129 }
130
131 dedup := stringset.Set(nil)
132 distinct := stringset.Set(nil)
133 distinctOrder := []ds.IndexColumn(nil)
134 if len(fq.Project()) > 0 { // the original query was a projection query
135 if fq.Distinct() {
136 // it was a distinct projection query, so we need to ded up by distinct
137 // options.
138 distinct = stringset.New(0)
139 proj := fq.Project()
140 distinctOrder = make([]ds.IndexColumn, len(proj))
141 for i, p := range proj {
142 distinctOrder[i].Property = p
143 }
144 }
145 } else {
146 // the original was a normal or keys-only query, so we need to d edup by keys.
147 dedup = stringset.New(0)
148 }
149
150 stopChan := make(chan struct{})
151
152 parIter := queryToIter(stopChan, toRun, parentDS)
153 memIter := queryToIter(stopChan, toRun, memDS)
154
155 parItemGet := func() (*item, error) {
156 for {
157 itm, err := parIter()
158 if itm == nil || err != nil {
159 return nil, err
160 }
161 encKey := itm.getEncKey()
162 if sizes.has(encKey) || (dedup != nil && dedup.Has(encKe y)) {
163 continue
164 }
165 return itm, nil
166 }
167 }
168 memItemGet := func() (*item, error) {
169 for {
170 itm, err := memIter()
171 if itm == nil || err != nil {
172 return nil, err
173 }
174 if dedup != nil && dedup.Has(itm.getEncKey()) {
175 continue
176 }
177 return itm, nil
178 }
179 }
180
181 defer func() {
182 close(stopChan)
183 parItemGet()
184 memItemGet()
185 }()
186
187 pitm, err := parItemGet()
188 if err != nil {
189 return err
190 }
191
192 mitm, err := memItemGet()
193 if err != nil {
194 return err
195 }
196
197 for {
198 // the err can be set during the loop below. If we come around t he bend and
199 // it's set, then we need to return it. We don't check it immedi ately
200 // because it's set after we already have a good result to retur n to the
201 // user.
202 if err != nil {
203 return err
204 }
205
206 usePitm := pitm != nil
207 if pitm != nil && mitm != nil {
208 usePitm = cmpFn(pitm) < cmpFn(mitm)
209 } else if pitm == nil && mitm == nil {
210 break
211 }
212
213 toUse := (*item)(nil)
214 // we check the error at the beginning of the loop.
215 if usePitm {
216 toUse = pitm
217 pitm, err = parItemGet()
218 } else {
219 toUse = mitm
220 mitm, err = memItemGet()
221 }
222
223 if dedup != nil {
224 if !dedup.Add(toUse.getEncKey()) {
225 continue
226 }
227 }
228 if distinct != nil {
229 // NOTE: We know that toUse will not be used after this point for
230 // comparison purposes, so re-use its cmpRow property fo r our distinct
231 // filter here.
232 toUse.cmpRow = ""
233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) {
234 continue
235 }
236 }
237 if !cb(toUse.key, toUse.data) {
238 break
239 }
240 }
241
242 return nil
243 }
244
245 // toComparableString computes the byte-sortable 'order' string for the given
246 // key/PropertyMap.
247 //
248 // * start/end are byte sequences which are the inequality bounds of the
249 // query, if any. These are a serialized datastore.Property. If the
250 // inequality column is inverted, then start and end are also inverted and
251 // swapped with each other.
252 // * order is the list of sort orders in the actual executing queries.
253 // * k / pm are the data to derive a sortable string for.
254 //
255 // The result of this function is the series of serialized properties, one per
256 // order column, which represent this key/pm's first entry in the composite
257 // index that would point to it (e.g. the one with `order` sort orders).
258 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
259 doCmp := true
260 soFar := []byte{}
261 ps := serialize.PropertyMapPartially(k, nil)
262 for _, ord := range order {
263 row, ok := ps[ord.Property]
264 if !ok {
265 if vals, ok := pm[ord.Property]; ok {
266 row = serialize.PropertySlice(vals)
267 }
268 }
269 sort.Sort(row)
270 foundOne := false
271 for _, serialized := range row {
272 if ord.Descending {
273 serialized = serialize.Invert(serialized)
274 }
275 if doCmp {
276 maybe := serialize.Join(soFar, serialized)
277 cmp := bytes.Compare(maybe, start)
278 if cmp >= 0 {
279 foundOne = true
280 soFar = maybe
281 doCmp = len(soFar) < len(start)
282 break
283 }
284 } else {
285 foundOne = true
286 soFar = serialize.Join(soFar, serialized)
287 break
288 }
289 }
290 if !foundOne {
291 return nil, nil
292 }
293 }
294 if end != nil && bytes.Compare(soFar, end) >= 0 {
295 return nil, nil
296 }
297 return soFar, ps["__key__"][0]
298 }
OLDNEW
« no previous file with comments | « filter/txnBuf/ds_txn.go ('k') | filter/txnBuf/state.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698