Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(815)

Unified Diff: impl/memory/datastore_query_execution.go

Issue 1302813003: impl/memory: Implement Queries (Closed) Base URL: https://github.com/luci/gae.git@add_multi_iterator
Patch Set: stringSet everywhere Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/memory/datastore_query.go ('k') | impl/memory/datastore_query_execution_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/memory/datastore_query_execution.go
diff --git a/impl/memory/datastore_query_execution.go b/impl/memory/datastore_query_execution.go
new file mode 100644
index 0000000000000000000000000000000000000000..869428953c93e13fc1ed470ed9a9a5eaf9dced4d
--- /dev/null
+++ b/impl/memory/datastore_query_execution.go
@@ -0,0 +1,238 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package memory
+
+import (
+ "bytes"
+ "fmt"
+
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/datastore/serialize"
+ "github.com/luci/luci-go/common/cmpbin"
+)
+
+type queryStrategy interface {
+ // handle applies the strategy to the embedded user callback.
+ // - rawData is the slice of encoded Properties from the index row
+ // (correctly de-inverted).
+ // - decodedProps is the slice of decoded Properties from the index row
+ // - key is the decoded Key from the index row (the last item in rawData and
+ // decodedProps)
+ // - gc is the getCursor function to be passed to the user's callback
+ handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool
+}
+
+type projectionLookup struct {
+ suffixIndex int
+ propertyName string
+}
+
+type projectionStrategy struct {
+ cb ds.RawRunCB
+
+ project []projectionLookup
+ distinct stringSet
+}
+
+func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) queryStrategy {
+ projectionLookups := make([]projectionLookup, len(q.project))
+ for i, prop := range q.project {
+ projectionLookups[i].propertyName = prop
+ lookupErr := fmt.Errorf("planning a strategy for an unfulfillable query?")
+ for j, col := range rq.suffixFormat {
+ if col.Property == prop {
+ projectionLookups[i].suffixIndex = j
+ lookupErr = nil
+ break
+ }
+ }
+ impossible(lookupErr)
+ }
+ ret := &projectionStrategy{cb: cb, project: projectionLookups}
+ if q.distinct {
+ ret.distinct = stringSet{}
+ }
+ return ret
+}
+
+func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
+ projectedRaw := [][]byte(nil)
+ if s.distinct != nil {
+ projectedRaw = make([][]byte, len(decodedProps))
+ }
+ pmap := make(ds.PropertyMap, len(s.project))
+ for i, p := range s.project {
+ if s.distinct != nil {
+ projectedRaw[i] = rawData[p.suffixIndex]
+ }
+ pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex]}
+ }
+ if s.distinct != nil {
+ if !s.distinct.add(string(bjoin(projectedRaw...))) {
+ return true
+ }
+ }
+ return s.cb(key, pmap, gc)
+}
+
+type keysOnlyStrategy struct {
+ cb ds.RawRunCB
+
+ dedup stringSet
+}
+
+func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
+ if !s.dedup.add(string(rawData[len(rawData)-1])) {
+ return true
+ }
+ return s.cb(key, nil, gc)
+}
+
+type normalStrategy struct {
+ cb ds.RawRunCB
+
+ ns string
+ head *memCollection
+ dedup stringSet
+}
+
+func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
+ coll := head.GetCollection("ents:" + ns)
+ if coll == nil {
+ return nil
+ }
+ return &normalStrategy{cb, ns, coll, stringSet{}}
+}
+
+func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool {
+ rawKey := rawData[len(rawData)-1]
+ if !s.dedup.add(string(rawKey)) {
+ return true
+ }
+
+ rawEnt := s.head.Get(rawKey)
+ if rawEnt == nil {
+ // entity doesn't exist at head
+ return true
+ }
+ pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize.WithoutContext, globalAppID, s.ns)
+ memoryCorruption(err)
+
+ return s.cb(key, pm, gc)
+}
+
+func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy {
+ if q.keysOnly {
+ return &keysOnlyStrategy{cb, stringSet{}}
+ }
+ if len(q.project) > 0 {
+ return newProjectionStrategy(q, rq, cb)
+ }
+ return newNormalStrategy(rq.ns, cb, head)
+}
+
+func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) {
+ buf := serialize.Invertible(bytes.NewBuffer(suffix))
+ decoded = make([]ds.Property, len(suffixFormat))
+ raw = make([][]byte, len(suffixFormat))
+
+ err := error(nil)
+ for i := range decoded {
+ if count > 0 && i > count {
+ break
+ }
+ needInvert := suffixFormat[i].Direction == ds.DESCENDING
+
+ buf.SetInvert(needInvert)
+ decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutContext, globalAppID, ns)
+ memoryCorruption(err)
+
+ offset := len(suffix) - buf.Len()
+ raw[i] = suffix[:offset]
+ suffix = suffix[offset:]
+ if needInvert {
+ raw[i] = invert(raw[i])
+ }
+ }
+
+ return
+}
+
+func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error {
+ q := origQ.(*queryImpl)
+
+ rq, err := q.reduce(ns, isTxn)
+ if err == errQueryDone {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ idxs, err := getIndexes(rq, idx)
+ if err == errQueryDone {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ strategy := pickQueryStrategy(q, rq, cb, head)
+ if strategy == nil {
+ // e.g. the normalStrategy found that there were NO entities in the current
+ // namespace.
+ return nil
+ }
+
+ offset := q.offset
+ limit := q.limit
+ hasLimit := q.limitSet && limit >= 0
+
+ cursorPrefix := []byte(nil)
+ getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
+ return func() (ds.Cursor, error) {
+ if cursorPrefix == nil {
+ buf := &bytes.Buffer{}
+ _, err := cmpbin.WriteUint(buf, uint64(len(rq.suffixFormat)))
+ memoryCorruption(err)
+
+ for _, col := range rq.suffixFormat {
+ err := serialize.WriteIndexColumn(buf, col)
+ memoryCorruption(err)
+ }
+ cursorPrefix = buf.Bytes()
+ }
+ // TODO(riannucci): Do we need to decrement suffix instead of increment
+ // if we're sorting by __key__ DESCENDING?
+ return queryCursor(bjoin(cursorPrefix, increment(suffix))), nil
+ }
+ }
+
+ multiIterate(idxs, func(suffix []byte) bool {
+ if offset > 0 {
+ offset--
+ return true
+ }
+ if hasLimit {
+ if limit <= 0 {
+ return false
+ }
+ limit--
+ }
+
+ rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix, -1)
+
+ keyProp := decodedProps[len(decodedProps)-1]
+ if keyProp.Type() != ds.PTKey {
+ impossible(fmt.Errorf("decoded index row doesn't end with a Key: %#v", keyProp))
+ }
+
+ return strategy.handle(
+ rawData, decodedProps, keyProp.Value().(ds.Key),
+ getCursorFn(suffix))
+ })
+
+ return nil
+}
« no previous file with comments | « impl/memory/datastore_query.go ('k') | impl/memory/datastore_query_execution_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698