OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package txnBuf |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "sort" |
| 10 "sync" |
| 11 |
| 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/luci-go/common/stringset" |
| 15 ) |
| 16 |
| 17 func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterfac
e, dedup, filter func(string) bool) func() (*item, error) { |
| 18 c := make(chan *item) |
| 19 |
| 20 go func() { |
| 21 defer close(c) |
| 22 |
| 23 err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorC
B) (keepGoing bool) { |
| 24 i := &item{key: k, data: pm} |
| 25 encKey := i.getEncKey() |
| 26 if filter != nil && filter(encKey) { |
| 27 return true |
| 28 } |
| 29 if dedup != nil && dedup(encKey) { |
| 30 return true |
| 31 } |
| 32 |
| 33 select { |
| 34 case c <- i: |
| 35 return true |
| 36 case <-stopChan: |
| 37 return false |
| 38 } |
| 39 }) |
| 40 if err != nil { |
| 41 c <- &item{err: err} |
| 42 } |
| 43 }() |
| 44 |
| 45 return func() (*item, error) { |
| 46 for { |
| 47 itm := <-c |
| 48 if itm == nil { |
| 49 return nil, nil |
| 50 } |
| 51 if itm.err != nil { |
| 52 return nil, itm.err |
| 53 } |
| 54 return itm, nil |
| 55 } |
| 56 } |
| 57 } |
| 58 |
| 59 func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) { |
| 60 q := fq.Original() |
| 61 |
| 62 // The limit and offset must be done in-memory because otherwise we may |
| 63 // request too few entities from the underlying store if many matching |
| 64 // entities have been deleted in the buffered transaction. |
| 65 q = q.Limit(-1) |
| 66 q = q.Offset(-1) |
| 67 |
| 68 // distinction must be done in-memory, because otherwise there's no way |
| 69 // to merge in the effect of the in-flight changes (because there's no w
ay |
| 70 // to push back to the datastore "yeah, I know you told me that the (1,
2) |
| 71 // result came from `/Bob,1`, but would you mind pretending that it didn
't |
| 72 // and tell me next the one instead? |
| 73 if fq.Distinct() { |
| 74 q = q.Distinct(false) |
| 75 } |
| 76 |
| 77 // since we need to merge results, we must have all order-related fields |
| 78 // in each result. The only time we wouldn't have all the data available
would |
| 79 // be for a keys-only or projection query. To fix this, we convert all |
| 80 // Projection and KeysOnly queries to project on /all/ Orders. |
| 81 // |
| 82 // FinalizedQuery already guarantees that all projected fields show up i
n |
| 83 // the Orders, but the projected fields could be a subset of the orders. |
| 84 // |
| 85 // Additionally on a keys-only query, any orders other than __key__ requ
ire |
| 86 // conversion of this query to a projection query including those orders
in |
| 87 // order to merge the results correctly. |
| 88 // |
| 89 // In both cases, the resulting objects returned to the higher layers of |
| 90 // the stack will only include the information requested by the user; ke
ysonly |
| 91 // queries will discard all PropertyMap data, and projection queries wil
l |
| 92 // discard any field data that the user didn't ask for. |
| 93 orders := fq.Orders() |
| 94 if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) { |
| 95 q = q.KeysOnly(false) |
| 96 |
| 97 for _, o := range orders { |
| 98 if o.Property == "__key__" { |
| 99 continue |
| 100 } |
| 101 q = q.Project(o.Property) |
| 102 } |
| 103 } |
| 104 |
| 105 return q.Finalize() |
| 106 } |
| 107 |
| 108 func runMergedQueries(fq *ds.FinalizedQuery, state *txnBufState, cb func(k *ds.K
ey, data ds.PropertyMap) bool) error { |
| 109 toRun, err := adjustQuery(fq) |
| 110 if err != nil { |
| 111 return err |
| 112 } |
| 113 |
| 114 stopChan := make(chan struct{}) |
| 115 defer close(stopChan) |
| 116 |
| 117 dedup := stringset.Set(nil) |
| 118 distinct := stringset.Set(nil) |
| 119 if len(fq.Project()) > 0 { // the original query was a projection query |
| 120 if fq.Distinct() { |
| 121 // it was a distinct projection query, so we need to ded
up by distinct |
| 122 // options. |
| 123 distinct = stringset.New(0) |
| 124 } |
| 125 } else { |
| 126 // the original was a normal or keysonly query, so we need to de
dup by keys |
| 127 dedup = stringset.New(0) |
| 128 } |
| 129 |
| 130 // need lock around dedup since it's not threadsafe and we read/write it
in |
| 131 // different goroutines. No lock is needed around state.entState.has bec
ause |
| 132 // the whole transaction is locked during this query and so the entState
is |
| 133 // effectively read-only. |
| 134 dedupLock := sync.Mutex{} |
| 135 dedupFn := (func(string) bool)(nil) |
| 136 if dedup != nil { |
| 137 dedupFn = func(val string) bool { |
| 138 dedupLock.Lock() |
| 139 defer dedupLock.Unlock() |
| 140 return dedup.Has(val) |
| 141 } |
| 142 } |
| 143 |
| 144 parItemGet := queryToIter(stopChan, toRun, state.parentDS, dedupFn, stat
e.entState.has) |
| 145 memItemGet := queryToIter(stopChan, toRun, state.memDS, dedupFn, nil) |
| 146 |
| 147 pitm, err := parItemGet() |
| 148 if err != nil { |
| 149 return err |
| 150 } |
| 151 |
| 152 mitm, err := memItemGet() |
| 153 if err != nil { |
| 154 return err |
| 155 } |
| 156 |
| 157 for { |
| 158 select { |
| 159 case <-stopChan: |
| 160 break |
| 161 default: |
| 162 } |
| 163 |
| 164 usePitm := pitm != nil |
| 165 if pitm != nil && mitm != nil { |
| 166 usePitm = pitm.getCmpRow(fq) < mitm.getCmpRow(fq) |
| 167 } else if pitm == nil && mitm == nil { |
| 168 break |
| 169 } |
| 170 |
| 171 toUse := (*item)(nil) |
| 172 if usePitm { |
| 173 toUse = pitm |
| 174 if pitm, err = parItemGet(); err != nil { |
| 175 return err |
| 176 } |
| 177 } else { |
| 178 toUse = mitm |
| 179 if mitm, err = memItemGet(); err != nil { |
| 180 return err |
| 181 } |
| 182 } |
| 183 |
| 184 if dedup != nil { |
| 185 encKey := toUse.getEncKey() |
| 186 dedupLock.Lock() |
| 187 dedup.Add(encKey) |
| 188 dedupLock.Unlock() |
| 189 } |
| 190 if distinct != nil { |
| 191 key := toUse.getEncKey() |
| 192 toUse.cmpRow = "" |
| 193 row := toUse.getCmpRow(toRun) |
| 194 if !distinct.Add(row[:len(row)-len(key)]) { |
| 195 continue |
| 196 } |
| 197 } |
| 198 if !cb(toUse.key, toUse.data) { |
| 199 break |
| 200 } |
| 201 } |
| 202 |
| 203 return nil |
| 204 } |
| 205 |
| 206 func toComparableString(fq *ds.FinalizedQuery, k *ds.Key, pm ds.PropertyMap) (ro
w, key []byte) { |
| 207 doCmp := true |
| 208 soFar := []byte{} |
| 209 start, end := []byte(nil), []byte(nil) |
| 210 // TODO(riannucci): extract start, end byte sequences from fq |
| 211 ps := serialize.PropertyMapPartially(k, nil) |
| 212 for _, ord := range fq.Orders() { |
| 213 row, ok := ps[ord.Property] |
| 214 if !ok { |
| 215 if vals, ok := pm[ord.Property]; ok { |
| 216 row = serialize.PropertySlice(vals) |
| 217 } |
| 218 } |
| 219 sort.Sort(row) |
| 220 foundOne := false |
| 221 for _, serialized := range row { |
| 222 if ord.Descending { |
| 223 serialized = serialize.Invert(serialized) |
| 224 } |
| 225 if doCmp { |
| 226 maybe := serialize.Join(soFar, serialized) |
| 227 cmp := bytes.Compare(maybe, start) |
| 228 if cmp >= 0 { |
| 229 foundOne = true |
| 230 soFar = maybe |
| 231 doCmp = len(soFar) < len(start) |
| 232 break |
| 233 } |
| 234 } else { |
| 235 foundOne = true |
| 236 soFar = serialize.Join(soFar, serialized) |
| 237 break |
| 238 } |
| 239 } |
| 240 if !foundOne { |
| 241 return nil, nil |
| 242 } |
| 243 } |
| 244 if end != nil && bytes.Compare(soFar, end) >= 0 { |
| 245 return nil, nil |
| 246 } |
| 247 return soFar, ps["__key__"][0] |
| 248 } |
OLD | NEW |