Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(166)

Side by Side Diff: filter/txnBuf/query_merger.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: Fix builtin+ancestor+multi-eq+multiIterator bug, add more txnBuf test Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package txnBuf
6
7 import (
8 "bytes"
9 "sort"
10 "sync"
11
12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gae/service/datastore/serialize"
14 "github.com/luci/luci-go/common/stringset"
15 )
16
17 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac e, dedup, filter func(string) bool) func() (*item, error) {
18 c := make(chan *item)
19
20 go func() {
21 defer close(c)
22
23 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC B) (keepGoing bool) {
24 i := &item{key: k, data: pm}
25 encKey := i.getEncKey()
26 if filter != nil && filter(encKey) {
27 return true
28 }
29 if dedup != nil && dedup(encKey) {
30 return true
31 }
32
33 select {
34 case c <- i:
35 return true
36 case <-stopChan:
37 return false
38 }
39 })
40 if err != nil {
41 c <- &item{err: err}
42 }
43 }()
44
45 return func() (*item, error) {
46 for {
47 itm := <-c
48 if itm == nil {
49 return nil, nil
50 }
51 if itm.err != nil {
52 return nil, itm.err
53 }
54 return itm, nil
55 }
56 }
57 }
58
59 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
60 q := fq.Original()
61
62 // The limit and offset must be done in-memory because otherwise we may
63 // request too few entities from the underlying store if many matching
64 // entities have been deleted in the buffered transaction.
65 q = q.Limit(-1)
66 q = q.Offset(-1)
67
68 // distinction must be done in-memory, because otherwise there's no way
69 // to merge in the effect of the in-flight changes (because there's no w ay
70 // to push back to the datastore "yeah, I know you told me that the (1, 2)
71 // result came from `/Bob,1`, but would you mind pretending that it didn 't
72 // and tell me next the one instead?
73 if fq.Distinct() {
74 q = q.Distinct(false)
75 }
76
77 // since we need to merge results, we must have all order-related fields
78 // in each result. The only time we wouldn't have all the data available would
79 // be for a keys-only or projection query. To fix this, we convert all
80 // Projection and KeysOnly queries to project on /all/ Orders.
81 //
82 // FinalizedQuery already guarantees that all projected fields show up i n
83 // the Orders, but the projected fields could be a subset of the orders.
84 //
85 // Additionally on a keys-only query, any orders other than __key__ requ ire
86 // conversion of this query to a projection query including those orders in
87 // order to merge the results correctly.
88 //
89 // In both cases, the resulting objects returned to the higher layers of
90 // the stack will only include the information requested by the user; ke ysonly
91 // queries will discard all PropertyMap data, and projection queries wil l
92 // discard any field data that the user didn't ask for.
93 orders := fq.Orders()
94 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
95 q = q.KeysOnly(false)
96
97 for _, o := range orders {
98 if o.Property == "__key__" {
99 continue
100 }
101 q = q.Project(o.Property)
102 }
103 }
104
105 return q.Finalize()
106 }
107
108 func runMergedQueries(fq *ds.FinalizedQuery, state *txnBufState, cb func(k *ds.K ey, data ds.PropertyMap) bool) error {
109 toRun, err := adjustQuery(fq)
110 if err != nil {
111 return err
112 }
113
114 stopChan := make(chan struct{})
115 defer close(stopChan)
116
117 dedup := stringset.Set(nil)
118 distinct := stringset.Set(nil)
119 if len(fq.Project()) > 0 { // the original query was a projection query
120 if fq.Distinct() {
121 // it was a distinct projection query, so we need to ded up by distinct
122 // options.
123 distinct = stringset.New(0)
124 }
125 } else {
126 // the original was a normal or keysonly query, so we need to de dup by keys
127 dedup = stringset.New(0)
128 }
129
130 // need lock around dedup since it's not threadsafe and we read/write it in
131 // different goroutines. No lock is needed around state.entState.has bec ause
132 // the whole transaction is locked during this query and so the entState is
133 // effectively read-only.
134 dedupLock := sync.Mutex{}
135 dedupFn := (func(string) bool)(nil)
136 if dedup != nil {
137 dedupFn = func(val string) bool {
138 dedupLock.Lock()
139 defer dedupLock.Unlock()
140 return dedup.Has(val)
141 }
142 }
143
144 parItemGet := queryToIter(stopChan, toRun, state.parentDS, dedupFn, stat e.entState.has)
145 memItemGet := queryToIter(stopChan, toRun, state.memDS, dedupFn, nil)
146
147 pitm, err := parItemGet()
148 if err != nil {
149 return err
150 }
151
152 mitm, err := memItemGet()
153 if err != nil {
154 return err
155 }
156
157 for {
158 select {
159 case <-stopChan:
160 break
161 default:
162 }
163
164 usePitm := pitm != nil
165 if pitm != nil && mitm != nil {
166 usePitm = pitm.getCmpRow(fq) < mitm.getCmpRow(fq)
167 } else if pitm == nil && mitm == nil {
168 break
169 }
170
171 toUse := (*item)(nil)
172 if usePitm {
173 toUse = pitm
174 if pitm, err = parItemGet(); err != nil {
175 return err
176 }
177 } else {
178 toUse = mitm
179 if mitm, err = memItemGet(); err != nil {
180 return err
181 }
182 }
183
184 if dedup != nil {
185 encKey := toUse.getEncKey()
186 dedupLock.Lock()
187 dedup.Add(encKey)
188 dedupLock.Unlock()
189 }
190 if distinct != nil {
191 key := toUse.getEncKey()
192 toUse.cmpRow = ""
193 row := toUse.getCmpRow(toRun)
194 if !distinct.Add(row[:len(row)-len(key)]) {
195 continue
196 }
197 }
198 if !cb(toUse.key, toUse.data) {
199 break
200 }
201 }
202
203 return nil
204 }
205
206 func toComparableString(fq *ds.FinalizedQuery, k *ds.Key, pm ds.PropertyMap) (ro w, key []byte) {
207 doCmp := true
208 soFar := []byte{}
209 start, end := []byte(nil), []byte(nil)
210 // TODO(riannucci): extract start, end byte sequences from fq
211 ps := serialize.PropertyMapPartially(k, nil)
212 for _, ord := range fq.Orders() {
213 row, ok := ps[ord.Property]
214 if !ok {
215 if vals, ok := pm[ord.Property]; ok {
216 row = serialize.PropertySlice(vals)
217 }
218 }
219 sort.Sort(row)
220 foundOne := false
221 for _, serialized := range row {
222 if ord.Descending {
223 serialized = serialize.Invert(serialized)
224 }
225 if doCmp {
226 maybe := serialize.Join(soFar, serialized)
227 cmp := bytes.Compare(maybe, start)
228 if cmp >= 0 {
229 foundOne = true
230 soFar = maybe
231 doCmp = len(soFar) < len(start)
232 break
233 }
234 } else {
235 foundOne = true
236 soFar = serialize.Join(soFar, serialized)
237 break
238 }
239 }
240 if !foundOne {
241 return nil, nil
242 }
243 }
244 if end != nil && bytes.Compare(soFar, end) >= 0 {
245 return nil, nil
246 }
247 return soFar, ps["__key__"][0]
248 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698