| Index: impl/memory/datastore_query_execution.go
|
| diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..fd8ace238f993a048c9a038299980aea921c5e35
|
| --- /dev/null
|
| +++ b/impl/memory/datastore_query_execution.go
|
| @@ -0,0 +1,244 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package memory
|
| +
|
| +import (
|
| + "bytes"
|
| + "fmt"
|
| +
|
| + ds "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/datastore/serialize"
|
| + "github.com/luci/luci-go/common/cmpbin"
|
| +)
|
| +
|
| +type queryStrategy interface {
|
| + // handle applies the strategy to the embedded user callback.
|
| + // - rawData is the slice of encoded Properties from the index row
|
| + // (correctly de-inverted).
|
| + // - decodedProps is the slice of decoded Properties from the index row
|
| + // - key is the decoded Key from the index row (the last item in rawData and
|
| + // decodedProps)
|
| + // - gc is the getCursor function to be passed to the user's callback
|
| + handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool
|
| +}
|
| +
|
| +type projectionLookup struct {
|
| + suffixIndex int
|
| + propertyName string
|
| +}
|
| +
|
| +type projectionStrategy struct {
|
| + cb ds.RawRunCB
|
| +
|
| + project []projectionLookup
|
| + distinct map[string]struct{}
|
| +}
|
| +
|
| +func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) queryStrategy {
|
| + projectionLookups := make([]projectionLookup, len(q.project))
|
| + for i, prop := range q.project {
|
| + projectionLookups[i].propertyName = prop
|
| + lookupErr := fmt.Errorf("planning a strategy for an unfulfillable query?")
|
| + for j, col := range rq.suffixFormat {
|
| + if col.Property == prop {
|
| + projectionLookups[i].suffixIndex = j
|
| + lookupErr = nil
|
| + break
|
| + }
|
| + }
|
| + impossible(lookupErr)
|
| + }
|
| + ret := &projectionStrategy{cb: cb, project: projectionLookups}
|
| + if q.distinct {
|
| + ret.distinct = map[string]struct{}{}
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
|
| + projectedRaw := [][]byte(nil)
|
| + if s.distinct != nil {
|
| + projectedRaw = make([][]byte, len(decodedProps))
|
| + }
|
| + pmap := make(ds.PropertyMap, len(s.project))
|
| + for i, p := range s.project {
|
| + if s.distinct != nil {
|
| + projectedRaw[i] = rawData[p.suffixIndex]
|
| + }
|
| + pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex]}
|
| + }
|
| + if s.distinct != nil {
|
| + rawString := string(bjoin(projectedRaw...))
|
| + if _, ok := s.distinct[rawString]; ok {
|
| + return true
|
| + }
|
| + s.distinct[rawString] = struct{}{}
|
| + }
|
| + return s.cb(key, pmap, gc)
|
| +}
|
| +
|
| +type keysOnlyStrategy struct {
|
| + cb ds.RawRunCB
|
| +
|
| + dedup map[string]struct{}
|
| +}
|
| +
|
| +func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
|
| + strKey := string(rawData[len(rawData)-1])
|
| + if _, ok := s.dedup[strKey]; ok {
|
| + return true
|
| + }
|
| + s.dedup[strKey] = struct{}{}
|
| + return s.cb(key, nil, gc)
|
| +}
|
| +
|
| +type normalStrategy struct {
|
| + cb ds.RawRunCB
|
| +
|
| + ns string
|
| + head *memCollection
|
| + dedup map[string]struct{}
|
| +}
|
| +
|
| +func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
|
| + coll := head.GetCollection("ents:" + ns)
|
| + if coll == nil {
|
| + return nil
|
| + }
|
| + return &normalStrategy{cb, ns, coll, map[string]struct{}{}}
|
| +}
|
| +
|
| +func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
|
| + rawKey := rawData[len(rawData)-1]
|
| + strKey := string(rawKey)
|
| + if _, ok := s.dedup[strKey]; ok {
|
| + return true
|
| + }
|
| + s.dedup[strKey] = struct{}{}
|
| +
|
| + rawEnt := s.head.Get(rawKey)
|
| + if rawEnt == nil {
|
| + // entity doesn't exist at head
|
| + return true
|
| + }
|
| + pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, globalAppID, s.ns)
|
| + memoryCorruption(err)
|
| +
|
| + return s.cb(key, pm, gc)
|
| +}
|
| +
|
| +func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy {
|
| + if q.keysOnly {
|
| + return &keysOnlyStrategy{cb, map[string]struct{}{}}
|
| + }
|
| + if len(q.project) > 0 {
|
| + return newProjectionStrategy(q, rq, cb)
|
| + }
|
| + return newNormalStrategy(rq.ns, cb, head)
|
| +}
|
| +
|
| +func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) {
|
| + buf := serialize.Invertible(bytes.NewBuffer(suffix))
|
| + decoded = make([]ds.Property, len(suffixFormat))
|
| + raw = make([][]byte, len(suffixFormat))
|
| +
|
| + err := error(nil)
|
| + for i := range decoded {
|
| + if count > 0 && i > count {
|
| + break
|
| + }
|
| + needInvert := suffixFormat[i].Direction == ds.DESCENDING
|
| +
|
| + buf.SetInvert(needInvert)
|
| + decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, globalAppID, ns)
|
| + memoryCorruption(err)
|
| +
|
| + offset := len(suffix) - buf.Len()
|
| + raw[i] = suffix[:offset]
|
| + suffix = suffix[offset:]
|
| + if needInvert {
|
| + raw[i] = invert(raw[i])
|
| + }
|
| + }
|
| +
|
| + return
|
| +}
|
| +
|
| +func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
|
| + q := origQ.(*queryImpl)
|
| +
|
| + rq, err := q.reduce(ns, isTxn)
|
| + if err == errQueryDone {
|
| + return nil
|
| + }
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + idxs, err := getIndicies(rq, idx)
|
| + if err == errQueryDone {
|
| + return nil
|
| + }
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + strategy := pickQueryStrategy(q, rq, cb, head)
|
| + if strategy == nil {
|
| + // e.g. the normalStrategy found that there were NO entities in the current
|
| + // namespace.
|
| + return nil
|
| + }
|
| +
|
| + offset := q.offset
|
| + limit := q.limit
|
| + hasLimit := q.limitSet && limit >= 0
|
| +
|
| + cursorPrefix := []byte(nil)
|
| + getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
|
| + return func() (ds.Cursor, error) {
|
| + if cursorPrefix == nil {
|
| + buf := &bytes.Buffer{}
|
| + _, err := cmpbin.WriteUint(buf, uint64(len(rq.suffixFormat)))
|
| + memoryCorruption(err)
|
| +
|
| + for _, col := range rq.suffixFormat {
|
| + err := serialize.WriteIndexColumn(buf, col)
|
| + memoryCorruption(err)
|
| + }
|
| + cursorPrefix = buf.Bytes()
|
| + }
|
| + // TODO(riannucci): Do we need to decrement suffix instead of increment
|
| + // if we're sorting by __key__ DESCENDING?
|
| + return queryCursor(bjoin(cursorPrefix, increment(suffix))), nil
|
| + }
|
| + }
|
| +
|
| + multiIterate(idxs, func(suffix []byte) bool {
|
| + if offset > 0 {
|
| + offset--
|
| + return true
|
| + }
|
| + if hasLimit {
|
| + if limit <= 0 {
|
| + return false
|
| + }
|
| + limit--
|
| + }
|
| +
|
| + rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix, -1)
|
| +
|
| + keyProp := decodedProps[len(decodedProps)-1]
|
| + if keyProp.Type() != ds.PTKey {
|
| + impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", keyProp))
|
| + }
|
| +
|
| + return strategy.handle(
|
| + rawData, decodedProps, keyProp.Value().(ds.Key),
|
| + getCursorFn(suffix))
|
| + })
|
| +
|
| + return nil
|
| +}
|
|
|