| Index: filter/txnBuf/query_merger.go
|
| diff --git a/filter/txnBuf/query_merger.go b/filter/txnBuf/query_merger.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..406d5cae68e7112793e98ddd7959f7e179a51d9c
|
| --- /dev/null
|
| +++ b/filter/txnBuf/query_merger.go
|
| @@ -0,0 +1,248 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package txnBuf
|
| +
|
| +import (
|
| + "bytes"
|
| + "sort"
|
| + "sync"
|
| +
|
| + ds "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/datastore/serialize"
|
| + "github.com/luci/luci-go/common/stringset"
|
| +)
|
| +
|
| +func queryToIter(stopChan chan struct{}, fq *ds.FinalizedQuery, d ds.RawInterface, dedup, filter func(string) bool) func() (*item, error) {
|
| + c := make(chan *item)
|
| +
|
| + go func() {
|
| + defer close(c)
|
| +
|
| + err := d.Run(fq, func(k *ds.Key, pm ds.PropertyMap, _ ds.CursorCB) (keepGoing bool) {
|
| + i := &item{key: k, data: pm}
|
| + encKey := i.getEncKey()
|
| + if filter != nil && filter(encKey) {
|
| + return true
|
| + }
|
| + if dedup != nil && dedup(encKey) {
|
| + return true
|
| + }
|
| +
|
| + select {
|
| + case c <- i:
|
| + return true
|
| + case <-stopChan:
|
| + return false
|
| + }
|
| + })
|
| + if err != nil {
|
| + c <- &item{err: err}
|
| + }
|
| + }()
|
| +
|
| + return func() (*item, error) {
|
| + for {
|
| + itm := <-c
|
| + if itm == nil {
|
| + return nil, nil
|
| + }
|
| + if itm.err != nil {
|
| + return nil, itm.err
|
| + }
|
| + return itm, nil
|
| + }
|
| + }
|
| +}
|
| +
|
| +func adjustQuery(fq *ds.FinalizedQuery) (*ds.FinalizedQuery, error) {
|
| + q := fq.Original()
|
| +
|
| + // The limit and offset must be done in-memory because otherwise we may
|
| + // request too few entities from the underlying store if many matching
|
| + // entities have been deleted in the buffered transaction.
|
| + q = q.Limit(-1)
|
| + q = q.Offset(-1)
|
| +
|
| + // distinction must be done in-memory, because otherwise there's no way
|
| + // to merge in the effect of the in-flight changes (because there's no way
|
| + // to push back to the datastore "yeah, I know you told me that the (1, 2)
|
| + // result came from `/Bob,1`, but would you mind pretending that it didn't
|
| + // and tell me next the one instead?
|
| + if fq.Distinct() {
|
| + q = q.Distinct(false)
|
| + }
|
| +
|
| + // since we need to merge results, we must have all order-related fields
|
| + // in each result. The only time we wouldn't have all the data available would
|
| + // be for a keys-only or projection query. To fix this, we convert all
|
| + // Projection and KeysOnly queries to project on /all/ Orders.
|
| + //
|
| + // FinalizedQuery already guarantees that all projected fields show up in
|
| + // the Orders, but the projected fields could be a subset of the orders.
|
| + //
|
| + // Additionally on a keys-only query, any orders other than __key__ require
|
| + // conversion of this query to a projection query including those orders in
|
| + // order to merge the results correctly.
|
| + //
|
| + // In both cases, the resulting objects returned to the higher layers of
|
| + // the stack will only include the information requested by the user; keysonly
|
| + // queries will discard all PropertyMap data, and projection queries will
|
| + // discard any field data that the user didn't ask for.
|
| + orders := fq.Orders()
|
| + if len(fq.Project()) > 0 || (fq.KeysOnly() && len(orders) > 1) {
|
| + q = q.KeysOnly(false)
|
| +
|
| + for _, o := range orders {
|
| + if o.Property == "__key__" {
|
| + continue
|
| + }
|
| + q = q.Project(o.Property)
|
| + }
|
| + }
|
| +
|
| + return q.Finalize()
|
| +}
|
| +
|
| +func runMergedQueries(fq *ds.FinalizedQuery, state *txnBufState, cb func(k *ds.Key, data ds.PropertyMap) bool) error {
|
| + toRun, err := adjustQuery(fq)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + stopChan := make(chan struct{})
|
| + defer close(stopChan)
|
| +
|
| + dedup := stringset.Set(nil)
|
| + distinct := stringset.Set(nil)
|
| + if len(fq.Project()) > 0 { // the original query was a projection query
|
| + if fq.Distinct() {
|
| + // it was a distinct projection query, so we need to dedup by distinct
|
| + // options.
|
| + distinct = stringset.New(0)
|
| + }
|
| + } else {
|
| + // the original was a normal or keysonly query, so we need to dedup by keys
|
| + dedup = stringset.New(0)
|
| + }
|
| +
|
| + // need lock around dedup since it's not threadsafe and we read/write it in
|
| + // different goroutines. No lock is needed around state.entState.has because
|
| + // the whole transaction is locked during this query and so the entState is
|
| + // effectively read-only.
|
| + dedupLock := sync.Mutex{}
|
| + dedupFn := (func(string) bool)(nil)
|
| + if dedup != nil {
|
| + dedupFn = func(val string) bool {
|
| + dedupLock.Lock()
|
| + defer dedupLock.Unlock()
|
| + return dedup.Has(val)
|
| + }
|
| + }
|
| +
|
| + parItemGet := queryToIter(stopChan, toRun, state.parentDS, dedupFn, state.entState.has)
|
| + memItemGet := queryToIter(stopChan, toRun, state.memDS, dedupFn, nil)
|
| +
|
| + pitm, err := parItemGet()
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + mitm, err := memItemGet()
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + for {
|
| + select {
|
| + case <-stopChan:
|
| + break
|
| + default:
|
| + }
|
| +
|
| + usePitm := pitm != nil
|
| + if pitm != nil && mitm != nil {
|
| + usePitm = pitm.getCmpRow(fq) < mitm.getCmpRow(fq)
|
| + } else if pitm == nil && mitm == nil {
|
| + break
|
| + }
|
| +
|
| + toUse := (*item)(nil)
|
| + if usePitm {
|
| + toUse = pitm
|
| + if pitm, err = parItemGet(); err != nil {
|
| + return err
|
| + }
|
| + } else {
|
| + toUse = mitm
|
| + if mitm, err = memItemGet(); err != nil {
|
| + return err
|
| + }
|
| + }
|
| +
|
| + if dedup != nil {
|
| + encKey := toUse.getEncKey()
|
| + dedupLock.Lock()
|
| + dedup.Add(encKey)
|
| + dedupLock.Unlock()
|
| + }
|
| + if distinct != nil {
|
| + key := toUse.getEncKey()
|
| + toUse.cmpRow = ""
|
| + row := toUse.getCmpRow(toRun)
|
| + if !distinct.Add(row[:len(row)-len(key)]) {
|
| + continue
|
| + }
|
| + }
|
| + if !cb(toUse.key, toUse.data) {
|
| + break
|
| + }
|
| + }
|
| +
|
| + return nil
|
| +}
|
| +
|
| +func toComparableString(fq *ds.FinalizedQuery, k *ds.Key, pm ds.PropertyMap) (row, key []byte) {
|
| + doCmp := true
|
| + soFar := []byte{}
|
| + start, end := []byte(nil), []byte(nil)
|
| + // TODO(riannucci): extract start, end byte sequences from fq
|
| + ps := serialize.PropertyMapPartially(k, nil)
|
| + for _, ord := range fq.Orders() {
|
| + row, ok := ps[ord.Property]
|
| + if !ok {
|
| + if vals, ok := pm[ord.Property]; ok {
|
| + row = serialize.PropertySlice(vals)
|
| + }
|
| + }
|
| + sort.Sort(row)
|
| + foundOne := false
|
| + for _, serialized := range row {
|
| + if ord.Descending {
|
| + serialized = serialize.Invert(serialized)
|
| + }
|
| + if doCmp {
|
| + maybe := serialize.Join(soFar, serialized)
|
| + cmp := bytes.Compare(maybe, start)
|
| + if cmp >= 0 {
|
| + foundOne = true
|
| + soFar = maybe
|
| + doCmp = len(soFar) < len(start)
|
| + break
|
| + }
|
| + } else {
|
| + foundOne = true
|
| + soFar = serialize.Join(soFar, serialized)
|
| + break
|
| + }
|
| + }
|
| + if !foundOne {
|
| + return nil, nil
|
| + }
|
| + }
|
| + if end != nil && bytes.Compare(soFar, end) >= 0 {
|
| + return nil, nil
|
| + }
|
| + return soFar, ps["__key__"][0]
|
| +}
|
|
|