Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(81)

Side by Side Diff: filter/txnBuf/query_merger.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: one more test Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sort"
10 "sync"
11
12 "github.com/luci/gae/impl/memory"
13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/datastore/serialize"
15 "github.com/luci/luci-go/common/stringset"
16 )
17
18 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e, dedup, filter func(string) bool) func() (*item, error) {
19 c := make(chan *item)
20
21 go func() {
22 defer close(c)
23
24 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) {
25 i := &item{key: k, data: pm}
26 encKey := i.getEncKey()
27 if filter != nil && filter(encKey) {
28 return true
29 }
30 if dedup != nil && dedup(encKey) {
31 return true
32 }
33
34 select {
35 case c <- i:
36 return true
37 case <-stopChan:
38 return false
39 }
40 })
41 if err != nil {
42 c <- &item{err: err}
43 }
44 }()
45
46 return func() (*item, error) {
47 for {
48 itm := <-c
49 if itm == nil {
50 return nil, nil
51 }
52 if itm.err != nil {
53 return nil, itm.err
54 }
55 return itm, nil
56 }
57 }
58 }
59
60 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
61 q := fq.Original()
62
63 // The limit and offset must be done in-memory because otherwise we may
64 // request too few entities from the underlying store if many matching
65 // entities have been deleted in the buffered transaction.
66 q = q.Limit(-1)
67 q = q.Offset(-1)
68
69 // distinction must be done in-memory, because otherwise there's no way
70 // to merge in the effect of the in-flight changes (because there's no w ay
71 // to push back to the datastore "yeah, I know you told me that the (1, 2)
72 // result came from `/Bob,1`, but would you mind pretending that it didn 't
73 // and tell me next the one instead?
74 if fq.Distinct() {
75 q = q.Distinct(false)
76 }
77
78 // since we need to merge results, we must have all order-related fields
79 // in each result. The only time we wouldn't have all the data available would
80 // be for a keys-only or projection query. To fix this, we convert all
81 // Projection and KeysOnly queries to project on /all/ Orders.
82 //
83 // FinalizedQuery already guarantees that all projected fields show up i n
84 // the Orders, but the projected fields could be a subset of the orders.
85 //
86 // Additionally on a keys-only query, any orders other than __key__ requ ire
87 // conversion of this query to a projection query including those orders in
88 // order to merge the results correctly.
89 //
90 // In both cases, the resulting objects returned to the higher layers of
91 // the stack will only include the information requested by the user; ke ysonly
92 // queries will discard all PropertyMap data, and projection queries wil l
93 // discard any field data that the user didn't ask for.
94 orders := fq.Orders()
95 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
96 q = q.KeysOnly(false)
97
98 for _, o := range orders {
99 if o.Property == "__key__" {
100 continue
101 }
102 q = q.Project(o.Property)
103 }
104 }
105
106 return q.Finalize()
107 }
108
109 func runMergedQueries(fq *ds.FinalizedQuery, state *txnBufState, cb func(k *ds.K ey, data ds.PropertyMap) bool) error {
110 toRun, err := adjustQuery(fq)
111 if err != nil {
112 return err
113 }
114
115 stopChan := make(chan struct{})
116 defer close(stopChan)
117
118 cmpLower, cmpUpper := memory.GetBinaryBounds(fq)
119 cmpOrder := fq.Orders()
120 cmpFn := func(i *item) string {
121 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder)
122 }
123
124 dedup := stringset.Set(nil)
125 distinct := stringset.Set(nil)
126 distinctOrder := []ds.IndexColumn(nil)
127 if len(fq.Project()) > 0 { // the original query was a projection query
128 if fq.Distinct() {
129 // it was a distinct projection query, so we need to ded up by distinct
130 // options.
131 distinct = stringset.New(0)
132 proj := fq.Project()
133 distinctOrder = make([]ds.IndexColumn, len(proj))
134 for i, p := range proj {
135 distinctOrder[i].Property = p
136 }
137 }
138 } else {
139 // the original was a normal or keysonly query, so we need to de dup by keys
140 dedup = stringset.New(0)
141 }
142
143 // need lock around dedup since it's not threadsafe and we read/write it in
144 // different goroutines. No lock is needed around state.entState.has bec ause
145 // the whole transaction is locked during this query and so the entState is
146 // effectively read-only.
147 dedupLock := sync.Mutex{}
148 dedupFn := (func(string) bool)(nil)
149 if dedup != nil {
150 dedupFn = func(val string) bool {
151 dedupLock.Lock()
152 defer dedupLock.Unlock()
153 return dedup.Has(val)
154 }
155 }
156
157 parItemGet := queryToIter(stopChan, toRun, state.parentDS, dedupFn, stat e.entState.has)
158 memItemGet := queryToIter(stopChan, toRun, state.memDS, dedupFn, nil)
159
160 pitm, err := parItemGet()
161 if err != nil {
162 return err
163 }
164
165 mitm, err := memItemGet()
166 if err != nil {
167 return err
168 }
169
170 for {
171 usePitm := pitm != nil
172 if pitm != nil && mitm != nil {
173 usePitm = cmpFn(pitm) < cmpFn(mitm)
174 } else if pitm == nil && mitm == nil {
175 break
176 }
177
178 toUse := (*item)(nil)
179 if usePitm {
180 toUse = pitm
181 if pitm, err = parItemGet(); err != nil {
182 return err
183 }
184 } else {
185 toUse = mitm
186 if mitm, err = memItemGet(); err != nil {
187 return err
188 }
189 }
190
191 if dedup != nil {
192 encKey := toUse.getEncKey()
193 dedupLock.Lock()
194 added := dedup.Add(encKey)
195 dedupLock.Unlock()
196 if !added {
197 continue
198 }
199 }
200 if distinct != nil {
201 // NOTE: We know that toUse will not be used after this point for
202 // comparison purposes, so re-use its cmpRow property fo r our distinct
203 // filter here.
204 toUse.cmpRow = ""
205 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder )) {
206 continue
207 }
208 }
209 if !cb(toUse.key, toUse.data) {
210 break
211 }
212 }
213
214 return nil
215 }
216
217 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
218 doCmp := true
219 soFar := []byte{}
220 ps := serialize.PropertyMapPartially(k, nil)
221 for _, ord := range order {
222 row, ok := ps[ord.Property]
223 if !ok {
224 if vals, ok := pm[ord.Property]; ok {
225 row = serialize.PropertySlice(vals)
226 }
227 }
228 sort.Sort(row)
229 foundOne := false
230 for _, serialized := range row {
231 if ord.Descending {
232 serialized = serialize.Invert(serialized)
233 }
234 if doCmp {
235 maybe := serialize.Join(soFar, serialized)
236 cmp := bytes.Compare(maybe, start)
237 if cmp >= 0 {
238 foundOne = true
239 soFar = maybe
240 doCmp = len(soFar) < len(start)
241 break
242 }
243 } else {
244 foundOne = true
245 soFar = serialize.Join(soFar, serialized)
246 break
247 }
248 }
249 if !foundOne {
250 return nil, nil
251 }
252 }
253 if end != nil && bytes.Compare(soFar, end) >= 0 {
254 return nil, nil
255 }
256 return soFar, ps["__key__"][0]
257 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698