OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package txnBuf |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "sort" |
| 10 |
| 11 "github.com/luci/gae/impl/memory" |
| 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/luci-go/common/stringset" |
| 15 ) |
| 16 |
| 17 // queryToIter takes a FinalizedQuery and returns an iterator function which |
| 18 // will produce either *items or errors. |
| 19 // |
| 20 // - d is the raw datastore to run this query on |
| 21 // - filter is a function which will return true if the given key should be |
| 22 // excluded from the result set. |
| 23 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
e) func() (*item, error) { |
| 24 c := make(chan *item) |
| 25 |
| 26 go func() { |
| 27 defer close(c) |
| 28 |
| 29 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC
B) (keepGoing bool) { |
| 30 i := &item{key: k, data: pm} |
| 31 select { |
| 32 case c <- i: |
| 33 return true |
| 34 case <-stopChan: |
| 35 return false |
| 36 } |
| 37 }) |
| 38 if err != nil { |
| 39 c <- &item{err: err} |
| 40 } |
| 41 }() |
| 42 |
| 43 return func() (*item, error) { |
| 44 itm := <-c |
| 45 if itm == nil { |
| 46 return nil, nil |
| 47 } |
| 48 if itm.err != nil { |
| 49 return nil, itm.err |
| 50 } |
| 51 return itm, nil |
| 52 } |
| 53 } |
| 54 |
| 55 // adjustQuery applies various mutations to the query to make it suitable for |
| 56 // merging. In general, this removes limits and offsets the 'distinct' modifier, |
| 57 // and it ensures that if there are sort orders which won't appear in the |
| 58 // result data that the query is transformed into a projection query which |
| 59 // contains all of the data. A non-projection query will never be transformed |
| 60 // in this way. |
| 61 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { |
| 62 q := fq.Original() |
| 63 |
| 64 // The limit and offset must be done in-memory because otherwise we may |
| 65 // request too few entities from the underlying store if many matching |
| 66 // entities have been deleted in the buffered transaction. |
| 67 q = q.Limit(-1) |
| 68 q = q.Offset(-1) |
| 69 |
| 70 // distinction must be done in-memory, because otherwise there's no way |
| 71 // to merge in the effect of the in-flight changes (because there's no w
ay |
| 72 // to push back to the datastore "yeah, I know you told me that the (1,
2) |
| 73 // result came from `/Bob,1`, but would you mind pretending that it didn
't |
| 74 // and tell me next the one instead? |
| 75 q = q.Distinct(false) |
| 76 |
| 77 // since we need to merge results, we must have all order-related fields |
| 78 // in each result. The only time we wouldn't have all the data available
would |
| 79 // be for a keys-only or projection query. To fix this, we convert all |
| 80 // Projection and KeysOnly queries to project on /all/ Orders. |
| 81 // |
| 82 // FinalizedQuery already guarantees that all projected fields show up i
n |
| 83 // the Orders, but the projected fields could be a subset of the orders. |
| 84 // |
| 85 // Additionally on a keys-only query, any orders other than __key__ requ
ire |
| 86 // conversion of this query to a projection query including those orders
in |
| 87 // order to merge the results correctly. |
| 88 // |
| 89 // In both cases, the resulting objects returned to the higher layers of
the |
| 90 // stack will only include the information requested by the user; keys-o
nly |
| 91 // queries will discard all PropertyMap data, and projection queries wil
l |
| 92 // discard any field data that the user didn't ask for. |
| 93 orders := fq.Orders() |
| 94 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { |
| 95 q = q.KeysOnly(false) |
| 96 |
| 97 for _, o := range orders { |
| 98 if o.Property == "__key__" { |
| 99 continue |
| 100 } |
| 101 q = q.Project(o.Property) |
| 102 } |
| 103 } |
| 104 |
| 105 return q.Finalize() |
| 106 } |
| 107 |
| 108 // runMergedQueries executes a user query `fq` against the parent datastore as |
| 109 // well as the in-memory datastore, calling `cb` with the merged result set. |
| 110 // |
| 111 // It's expected that the caller of this function will apply limit and offset |
| 112 // if the query contains those restrictions. This may convert the query to |
| 113 // an expanded projection query with more data than the user asked for. It's the |
| 114 // caller's responsibility to prune away the extra data. |
| 115 // |
| 116 // See also `dsTxnBuf.Run()`. |
| 117 func runMergedQueries(fq *ds.FinalizedQuery, sizes *sizeTracker, |
| 118 memDS, parentDS ds.RawInterface, cb func(k *ds.Key, data ds.PropertyMap)
bool) error { |
| 119 |
| 120 toRun, err := adjustQuery(fq) |
| 121 if err != nil { |
| 122 return err |
| 123 } |
| 124 |
| 125 cmpLower, cmpUpper := memory.GetBinaryBounds(fq) |
| 126 cmpOrder := fq.Orders() |
| 127 cmpFn := func(i *item) string { |
| 128 return i.getCmpRow(cmpLower, cmpUpper, cmpOrder) |
| 129 } |
| 130 |
| 131 dedup := stringset.Set(nil) |
| 132 distinct := stringset.Set(nil) |
| 133 distinctOrder := []ds.IndexColumn(nil) |
| 134 if len(fq.Project()) > 0 { // the original query was a projection query |
| 135 if fq.Distinct() { |
| 136 // it was a distinct projection query, so we need to ded
up by distinct |
| 137 // options. |
| 138 distinct = stringset.New(0) |
| 139 proj := fq.Project() |
| 140 distinctOrder = make([]ds.IndexColumn, len(proj)) |
| 141 for i, p := range proj { |
| 142 distinctOrder[i].Property = p |
| 143 } |
| 144 } |
| 145 } else { |
| 146 // the original was a normal or keys-only query, so we need to d
edup by keys. |
| 147 dedup = stringset.New(0) |
| 148 } |
| 149 |
| 150 stopChan := make(chan struct{}) |
| 151 |
| 152 parIter := queryToIter(stopChan, toRun, parentDS) |
| 153 memIter := queryToIter(stopChan, toRun, memDS) |
| 154 |
| 155 parItemGet := func() (*item, error) { |
| 156 for { |
| 157 itm, err := parIter() |
| 158 if itm == nil || err != nil { |
| 159 return nil, err |
| 160 } |
| 161 encKey := itm.getEncKey() |
| 162 if sizes.has(encKey) || (dedup != nil && dedup.Has(encKe
y)) { |
| 163 continue |
| 164 } |
| 165 return itm, nil |
| 166 } |
| 167 } |
| 168 memItemGet := func() (*item, error) { |
| 169 for { |
| 170 itm, err := memIter() |
| 171 if itm == nil || err != nil { |
| 172 return nil, err |
| 173 } |
| 174 if dedup != nil && dedup.Has(itm.getEncKey()) { |
| 175 continue |
| 176 } |
| 177 return itm, nil |
| 178 } |
| 179 } |
| 180 |
| 181 defer func() { |
| 182 close(stopChan) |
| 183 parItemGet() |
| 184 memItemGet() |
| 185 }() |
| 186 |
| 187 pitm, err := parItemGet() |
| 188 if err != nil { |
| 189 return err |
| 190 } |
| 191 |
| 192 mitm, err := memItemGet() |
| 193 if err != nil { |
| 194 return err |
| 195 } |
| 196 |
| 197 for { |
| 198 // the err can be set during the loop below. If we come around t
he bend and |
| 199 // it's set, then we need to return it. We don't check it immedi
ately |
| 200 // because it's set after we already have a good result to retur
n to the |
| 201 // user. |
| 202 if err != nil { |
| 203 return err |
| 204 } |
| 205 |
| 206 usePitm := pitm != nil |
| 207 if pitm != nil && mitm != nil { |
| 208 usePitm = cmpFn(pitm) < cmpFn(mitm) |
| 209 } else if pitm == nil && mitm == nil { |
| 210 break |
| 211 } |
| 212 |
| 213 toUse := (*item)(nil) |
| 214 // we check the error at the beginning of the loop. |
| 215 if usePitm { |
| 216 toUse = pitm |
| 217 pitm, err = parItemGet() |
| 218 } else { |
| 219 toUse = mitm |
| 220 mitm, err = memItemGet() |
| 221 } |
| 222 |
| 223 if dedup != nil { |
| 224 if !dedup.Add(toUse.getEncKey()) { |
| 225 continue |
| 226 } |
| 227 } |
| 228 if distinct != nil { |
| 229 // NOTE: We know that toUse will not be used after this
point for |
| 230 // comparison purposes, so re-use its cmpRow property fo
r our distinct |
| 231 // filter here. |
| 232 toUse.cmpRow = "" |
| 233 if !distinct.Add(toUse.getCmpRow(nil, nil, distinctOrder
)) { |
| 234 continue |
| 235 } |
| 236 } |
| 237 if !cb(toUse.key, toUse.data) { |
| 238 break |
| 239 } |
| 240 } |
| 241 |
| 242 return nil |
| 243 } |
| 244 |
| 245 // toComparableString computes the byte-sortable 'order' string for the given |
| 246 // key/PropertyMap. |
| 247 // |
| 248 // * start/end are byte sequences which are the inequality bounds of the |
| 249 // query, if any. These are a serialized datastore.Property. If the |
| 250 // inequality column is inverted, then start and end are also inverted and |
| 251 // swapped with each other. |
| 252 // * order is the list of sort orders in the actual executing queries. |
| 253 // * k / pm are the data to derive a sortable string for. |
| 254 // |
| 255 // The result of this function is the series of serialized properties, one per |
| 256 // order column, which represent this key/pm's first entry in the composite |
| 257 // index that would point to it (e.g. the one with `order` sort orders). |
| 258 func toComparableString(start, end []byte, order []ds.IndexColumn, k *ds.Key, pm
ds.PropertyMap) (row, key []byte) { |
| 259 doCmp := true |
| 260 soFar := []byte{} |
| 261 ps := serialize.PropertyMapPartially(k, nil) |
| 262 for _, ord := range order { |
| 263 row, ok := ps[ord.Property] |
| 264 if !ok { |
| 265 if vals, ok := pm[ord.Property]; ok { |
| 266 row = serialize.PropertySlice(vals) |
| 267 } |
| 268 } |
| 269 sort.Sort(row) |
| 270 foundOne := false |
| 271 for _, serialized := range row { |
| 272 if ord.Descending { |
| 273 serialized = serialize.Invert(serialized) |
| 274 } |
| 275 if doCmp { |
| 276 maybe := serialize.Join(soFar, serialized) |
| 277 cmp := bytes.Compare(maybe, start) |
| 278 if cmp >= 0 { |
| 279 foundOne = true |
| 280 soFar = maybe |
| 281 doCmp = len(soFar) < len(start) |
| 282 break |
| 283 } |
| 284 } else { |
| 285 foundOne = true |
| 286 soFar = serialize.Join(soFar, serialized) |
| 287 break |
| 288 } |
| 289 } |
| 290 if !foundOne { |
| 291 return nil, nil |
| 292 } |
| 293 } |
| 294 if end != nil && bytes.Compare(soFar, end) >= 0 { |
| 295 return nil, nil |
| 296 } |
| 297 return soFar, ps["__key__"][0] |
| 298 } |
OLD | NEW |