Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1856)

Side by Side Diff: impl/memory/datastore_query_execution.go

Issue 1355783002: Refactor keys and queries in datastore service and implementation. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: appease errcheck Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/datastore_query.go ('k') | impl/memory/datastore_query_execution_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 10
11 ds "github.com/luci/gae/service/datastore" 11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/gae/service/datastore/serialize" 12 "github.com/luci/gae/service/datastore/serialize"
13 "github.com/luci/luci-go/common/cmpbin" 13 "github.com/luci/luci-go/common/cmpbin"
14 "github.com/luci/luci-go/common/stringset" 14 "github.com/luci/luci-go/common/stringset"
15 ) 15 )
16 16
17 type queryStrategy interface { 17 type queryStrategy interface {
18 // handle applies the strategy to the embedded user callback. 18 // handle applies the strategy to the embedded user callback.
19 // - rawData is the slice of encoded Properties from the index row 19 // - rawData is the slice of encoded Properties from the index row
20 // (correctly de-inverted). 20 // (correctly de-inverted).
21 // - decodedProps is the slice of decoded Properties from the index ro w 21 // - decodedProps is the slice of decoded Properties from the index ro w
22 // - key is the decoded Key from the index row (the last item in rawDa ta and 22 // - key is the decoded Key from the index row (the last item in rawDa ta and
23 // decodedProps) 23 // decodedProps)
24 // - gc is the getCursor function to be passed to the user's callback 24 // - gc is the getCursor function to be passed to the user's callback
25 » handle(rawData [][]byte, decodedProps []ds.Property, key ds.Key, gc func () (ds.Cursor, error)) bool 25 » handle(rawData [][]byte, decodedProps []ds.Property, key *ds.Key, gc fun c() (ds.Cursor, error)) bool
26 } 26 }
27 27
28 type projectionLookup struct { 28 type projectionLookup struct {
29 suffixIndex int 29 suffixIndex int
30 propertyName string 30 propertyName string
31 } 31 }
32 32
33 type projectionStrategy struct { 33 type projectionStrategy struct {
34 cb ds.RawRunCB 34 cb ds.RawRunCB
35 35
36 project []projectionLookup 36 project []projectionLookup
37 distinct stringset.Set 37 distinct stringset.Set
38 } 38 }
39 39
40 func newProjectionStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB) query Strategy { 40 func newProjectionStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRun CB) queryStrategy {
41 » projectionLookups := make([]projectionLookup, len(q.project)) 41 » proj := fq.Project()
42 » for i, prop := range q.project { 42
43 » projectionLookups := make([]projectionLookup, len(proj))
44 » for i, prop := range proj {
43 projectionLookups[i].propertyName = prop 45 projectionLookups[i].propertyName = prop
44 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?") 46 lookupErr := fmt.Errorf("planning a strategy for an unfulfillabl e query?")
45 for j, col := range rq.suffixFormat { 47 for j, col := range rq.suffixFormat {
46 if col.Property == prop { 48 if col.Property == prop {
47 projectionLookups[i].suffixIndex = j 49 projectionLookups[i].suffixIndex = j
48 lookupErr = nil 50 lookupErr = nil
49 break 51 break
50 } 52 }
51 } 53 }
52 impossible(lookupErr) 54 impossible(lookupErr)
53 } 55 }
54 ret := &projectionStrategy{cb: cb, project: projectionLookups} 56 ret := &projectionStrategy{cb: cb, project: projectionLookups}
55 » if q.distinct { 57 » if fq.Distinct() {
56 ret.distinct = stringset.New(0) 58 ret.distinct = stringset.New(0)
57 } 59 }
58 return ret 60 return ret
59 } 61 }
60 62
61 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key ds.Key, gc func() (ds.Cursor, error)) bool { 63 func (s *projectionStrategy) handle(rawData [][]byte, decodedProps []ds.Property , key *ds.Key, gc func() (ds.Cursor, error)) bool {
62 projectedRaw := [][]byte(nil) 64 projectedRaw := [][]byte(nil)
63 if s.distinct != nil { 65 if s.distinct != nil {
64 projectedRaw = make([][]byte, len(decodedProps)) 66 projectedRaw = make([][]byte, len(decodedProps))
65 } 67 }
66 pmap := make(ds.PropertyMap, len(s.project)) 68 pmap := make(ds.PropertyMap, len(s.project))
67 for i, p := range s.project { 69 for i, p := range s.project {
68 if s.distinct != nil { 70 if s.distinct != nil {
69 projectedRaw[i] = rawData[p.suffixIndex] 71 projectedRaw[i] = rawData[p.suffixIndex]
70 } 72 }
71 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex] } 73 pmap[p.propertyName] = []ds.Property{decodedProps[p.suffixIndex] }
72 } 74 }
73 if s.distinct != nil { 75 if s.distinct != nil {
74 if !s.distinct.Add(string(bjoin(projectedRaw...))) { 76 if !s.distinct.Add(string(bjoin(projectedRaw...))) {
75 return true 77 return true
76 } 78 }
77 } 79 }
78 return s.cb(key, pmap, gc) 80 return s.cb(key, pmap, gc)
79 } 81 }
80 82
81 type keysOnlyStrategy struct { 83 type keysOnlyStrategy struct {
82 cb ds.RawRunCB 84 cb ds.RawRunCB
83 85
84 dedup stringset.Set 86 dedup stringset.Set
85 } 87 }
86 88
87 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, gc func() (ds.Cursor, error)) bool { 89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , gc func() (ds.Cursor, error)) bool {
88 if !s.dedup.Add(string(rawData[len(rawData)-1])) { 90 if !s.dedup.Add(string(rawData[len(rawData)-1])) {
89 return true 91 return true
90 } 92 }
91 return s.cb(key, nil, gc) 93 return s.cb(key, nil, gc)
92 } 94 }
93 95
94 type normalStrategy struct { 96 type normalStrategy struct {
95 cb ds.RawRunCB 97 cb ds.RawRunCB
96 98
97 ns string 99 ns string
98 head *memCollection 100 head *memCollection
99 dedup stringset.Set 101 dedup stringset.Set
100 } 102 }
101 103
102 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy { 104 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy {
103 coll := head.GetCollection("ents:" + ns) 105 coll := head.GetCollection("ents:" + ns)
104 if coll == nil { 106 if coll == nil {
105 return nil 107 return nil
106 } 108 }
107 return &normalStrategy{cb, ns, coll, stringset.New(0)} 109 return &normalStrategy{cb, ns, coll, stringset.New(0)}
108 } 110 }
109 111
110 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key ds.Key, g c func() (ds.Cursor, error)) bool { 112 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) bool {
111 rawKey := rawData[len(rawData)-1] 113 rawKey := rawData[len(rawData)-1]
112 if !s.dedup.Add(string(rawKey)) { 114 if !s.dedup.Add(string(rawKey)) {
113 return true 115 return true
114 } 116 }
115 117
116 rawEnt := s.head.Get(rawKey) 118 rawEnt := s.head.Get(rawKey)
117 if rawEnt == nil { 119 if rawEnt == nil {
118 // entity doesn't exist at head 120 // entity doesn't exist at head
119 return true 121 return true
120 } 122 }
121 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns) 123 pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns)
122 memoryCorruption(err) 124 memoryCorruption(err)
123 125
124 return s.cb(key, pm, gc) 126 return s.cb(key, pm, gc)
125 } 127 }
126 128
127 func pickQueryStrategy(q *queryImpl, rq *reducedQuery, cb ds.RawRunCB, head *mem Store) queryStrategy { 129 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy {
128 » if q.keysOnly { 130 » if fq.KeysOnly() {
129 return &keysOnlyStrategy{cb, stringset.New(0)} 131 return &keysOnlyStrategy{cb, stringset.New(0)}
130 } 132 }
131 » if len(q.project) > 0 { 133 » if len(fq.Project()) > 0 {
132 » » return newProjectionStrategy(q, rq, cb) 134 » » return newProjectionStrategy(fq, rq, cb)
133 } 135 }
134 return newNormalStrategy(rq.ns, cb, head) 136 return newNormalStrategy(rq.ns, cb, head)
135 } 137 }
136 138
137 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) { 139 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) {
138 buf := serialize.Invertible(bytes.NewBuffer(suffix)) 140 buf := serialize.Invertible(bytes.NewBuffer(suffix))
139 decoded = make([]ds.Property, len(suffixFormat)) 141 decoded = make([]ds.Property, len(suffixFormat))
140 raw = make([][]byte, len(suffixFormat)) 142 raw = make([][]byte, len(suffixFormat))
141 143
142 err := error(nil) 144 err := error(nil)
143 for i := range decoded { 145 for i := range decoded {
144 if count > 0 && i > count { 146 if count > 0 && i > count {
145 break 147 break
146 } 148 }
147 » » needInvert := suffixFormat[i].Direction == ds.DESCENDING 149 » » needInvert := suffixFormat[i].Descending
148 150
149 buf.SetInvert(needInvert) 151 buf.SetInvert(needInvert)
150 decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns) 152 decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns)
151 memoryCorruption(err) 153 memoryCorruption(err)
152 154
153 offset := len(suffix) - buf.Len() 155 offset := len(suffix) - buf.Len()
154 raw[i] = suffix[:offset] 156 raw[i] = suffix[:offset]
155 suffix = suffix[offset:] 157 suffix = suffix[offset:]
156 if needInvert { 158 if needInvert {
157 raw[i] = invert(raw[i]) 159 raw[i] = invert(raw[i])
158 } 160 }
159 } 161 }
160 162
161 return 163 return
162 } 164 }
163 165
164 func executeQuery(origQ ds.Query, ns string, isTxn bool, idx, head *memStore, cb ds.RawRunCB) error { 166 func executeQuery(fq *ds.FinalizedQuery, ns string, isTxn bool, idx, head *memSt ore, cb ds.RawRunCB) error {
165 » q := origQ.(*queryImpl) 167 » rq, err := reduce(fq, ns, isTxn)
166 168 » if err == ds.ErrNullQuery {
167 » rq, err := q.reduce(ns, isTxn)
168 » if err == errQueryDone {
169 return nil 169 return nil
170 } 170 }
171 if err != nil { 171 if err != nil {
172 return err 172 return err
173 } 173 }
174 174
175 idxs, err := getIndexes(rq, idx) 175 idxs, err := getIndexes(rq, idx)
176 » if err == errQueryDone { 176 » if err == ds.ErrNullQuery {
177 return nil 177 return nil
178 } 178 }
179 if err != nil { 179 if err != nil {
180 return err 180 return err
181 } 181 }
182 182
183 » strategy := pickQueryStrategy(q, rq, cb, head) 183 » strategy := pickQueryStrategy(fq, rq, cb, head)
184 if strategy == nil { 184 if strategy == nil {
185 // e.g. the normalStrategy found that there were NO entities in the current 185 // e.g. the normalStrategy found that there were NO entities in the current
186 // namespace. 186 // namespace.
187 return nil 187 return nil
188 } 188 }
189 189
190 » offset := q.offset 190 » offset, _ := fq.Offset()
191 » limit := q.limit 191 » limit, hasLimit := fq.Limit()
192 » hasLimit := q.limitSet && limit >= 0
193 192
194 cursorPrefix := []byte(nil) 193 cursorPrefix := []byte(nil)
195 getCursorFn := func(suffix []byte) func() (ds.Cursor, error) { 194 getCursorFn := func(suffix []byte) func() (ds.Cursor, error) {
196 return func() (ds.Cursor, error) { 195 return func() (ds.Cursor, error) {
197 if cursorPrefix == nil { 196 if cursorPrefix == nil {
198 buf := &bytes.Buffer{} 197 buf := &bytes.Buffer{}
199 _, err := cmpbin.WriteUint(buf, uint64(len(rq.su ffixFormat))) 198 _, err := cmpbin.WriteUint(buf, uint64(len(rq.su ffixFormat)))
200 memoryCorruption(err) 199 memoryCorruption(err)
201 200
202 for _, col := range rq.suffixFormat { 201 for _, col := range rq.suffixFormat {
(...skipping 21 matching lines...) Expand all
224 } 223 }
225 224
226 rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix , -1) 225 rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix , -1)
227 226
228 keyProp := decodedProps[len(decodedProps)-1] 227 keyProp := decodedProps[len(decodedProps)-1]
229 if keyProp.Type() != ds.PTKey { 228 if keyProp.Type() != ds.PTKey {
230 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) 229 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp))
231 } 230 }
232 231
233 return strategy.handle( 232 return strategy.handle(
234 » » » rawData, decodedProps, keyProp.Value().(ds.Key), 233 » » » rawData, decodedProps, keyProp.Value().(*ds.Key),
235 getCursorFn(suffix)) 234 getCursorFn(suffix))
236 }) 235 })
237 236
238 return nil 237 return nil
239 } 238 }
OLDNEW
« no previous file with comments | « impl/memory/datastore_query.go ('k') | impl/memory/datastore_query_execution_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698