Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(751)

Side by Side Diff: impl/memory/datastore_query_execution.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: make data flow clearer, implement Count Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 10
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , gc func() (ds.Cursor, error)) bool { 89 func (s *keysOnlyStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key , gc func() (ds.Cursor, error)) bool {
90 if !s.dedup.Add(string(rawData[len(rawData)-1])) { 90 if !s.dedup.Add(string(rawData[len(rawData)-1])) {
91 return true 91 return true
92 } 92 }
93 return s.cb(key, nil, gc) 93 return s.cb(key, nil, gc)
94 } 94 }
95 95
96 type normalStrategy struct { 96 type normalStrategy struct {
97 cb ds.RawRunCB 97 cb ds.RawRunCB
98 98
99 aid string
99 ns string 100 ns string
100 head *memCollection 101 head *memCollection
101 dedup stringset.Set 102 dedup stringset.Set
102 } 103 }
103 104
104 func newNormalStrategy(ns string, cb ds.RawRunCB, head *memStore) queryStrategy { 105 func newNormalStrategy(aid, ns string, cb ds.RawRunCB, head *memStore) queryStra tegy {
105 coll := head.GetCollection("ents:" + ns) 106 coll := head.GetCollection("ents:" + ns)
106 if coll == nil { 107 if coll == nil {
107 return nil 108 return nil
108 } 109 }
109 » return &normalStrategy{cb, ns, coll, stringset.New(0)} 110 » return &normalStrategy{cb, aid, ns, coll, stringset.New(0)}
110 } 111 }
111 112
112 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) bool { 113 func (s *normalStrategy) handle(rawData [][]byte, _ []ds.Property, key *ds.Key, gc func() (ds.Cursor, error)) bool {
113 rawKey := rawData[len(rawData)-1] 114 rawKey := rawData[len(rawData)-1]
114 if !s.dedup.Add(string(rawKey)) { 115 if !s.dedup.Add(string(rawKey)) {
115 return true 116 return true
116 } 117 }
117 118
118 rawEnt := s.head.Get(rawKey) 119 rawEnt := s.head.Get(rawKey)
119 if rawEnt == nil { 120 if rawEnt == nil {
120 // entity doesn't exist at head 121 // entity doesn't exist at head
121 return true 122 return true
122 } 123 }
123 » pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, globalAppID, s.ns) 124 » pm, err := serialize.ReadPropertyMap(bytes.NewBuffer(rawEnt), serialize. WithoutContext, s.aid, s.ns)
124 memoryCorruption(err) 125 memoryCorruption(err)
125 126
126 return s.cb(key, pm, gc) 127 return s.cb(key, pm, gc)
127 } 128 }
128 129
129 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy { 130 func pickQueryStrategy(fq *ds.FinalizedQuery, rq *reducedQuery, cb ds.RawRunCB, head *memStore) queryStrategy {
130 if fq.KeysOnly() { 131 if fq.KeysOnly() {
131 return &keysOnlyStrategy{cb, stringset.New(0)} 132 return &keysOnlyStrategy{cb, stringset.New(0)}
132 } 133 }
133 if len(fq.Project()) > 0 { 134 if len(fq.Project()) > 0 {
134 return newProjectionStrategy(fq, rq, cb) 135 return newProjectionStrategy(fq, rq, cb)
135 } 136 }
136 » return newNormalStrategy(rq.ns, cb, head) 137 » return newNormalStrategy(rq.aid, rq.ns, cb, head)
137 } 138 }
138 139
139 func parseSuffix(ns string, suffixFormat []ds.IndexColumn, suffix []byte, count int) (raw [][]byte, decoded []ds.Property) { 140 func parseSuffix(aid, ns string, suffixFormat []ds.IndexColumn, suffix []byte, c ount int) (raw [][]byte, decoded []ds.Property) {
140 buf := serialize.Invertible(bytes.NewBuffer(suffix)) 141 buf := serialize.Invertible(bytes.NewBuffer(suffix))
141 decoded = make([]ds.Property, len(suffixFormat)) 142 decoded = make([]ds.Property, len(suffixFormat))
142 raw = make([][]byte, len(suffixFormat)) 143 raw = make([][]byte, len(suffixFormat))
143 144
144 err := error(nil) 145 err := error(nil)
145 for i := range decoded { 146 for i := range decoded {
146 » » if count > 0 && i > count { 147 » » if count >= 0 && i >= count {
iannucci 2015/09/29 04:43:27 This was another bug, added a test for it.
147 break 148 break
148 } 149 }
149 needInvert := suffixFormat[i].Descending 150 needInvert := suffixFormat[i].Descending
150 151
151 buf.SetInvert(needInvert) 152 buf.SetInvert(needInvert)
152 » » decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, globalAppID, ns) 153 » » decoded[i], err = serialize.ReadProperty(buf, serialize.WithoutC ontext, aid, ns)
153 memoryCorruption(err) 154 memoryCorruption(err)
154 155
155 offset := len(suffix) - buf.Len() 156 offset := len(suffix) - buf.Len()
156 raw[i] = suffix[:offset] 157 raw[i] = suffix[:offset]
157 suffix = suffix[offset:] 158 suffix = suffix[offset:]
158 if needInvert { 159 if needInvert {
159 raw[i] = serialize.Invert(raw[i]) 160 raw[i] = serialize.Invert(raw[i])
160 } 161 }
161 } 162 }
162 163
163 return 164 return
164 } 165 }
165 166
166 func countQuery(fq *ds.FinalizedQuery, ns string, isTxn bool, idx, head *memStor e) (ret int64, err error) { 167 func countQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head *me mStore) (ret int64, err error) {
167 if len(fq.Project()) == 0 && !fq.KeysOnly() { 168 if len(fq.Project()) == 0 && !fq.KeysOnly() {
168 fq, err = fq.Original().KeysOnly(true).Finalize() 169 fq, err = fq.Original().KeysOnly(true).Finalize()
169 if err != nil { 170 if err != nil {
170 return 171 return
171 } 172 }
172 } 173 }
173 » err = executeQuery(fq, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.Proper tyMap, _ ds.CursorCB) bool { 174 » err = executeQuery(fq, aid, ns, isTxn, idx, head, func(_ *ds.Key, _ ds.P ropertyMap, _ ds.CursorCB) bool {
174 ret++ 175 ret++
175 return true 176 return true
176 }) 177 })
177 return 178 return
178 } 179 }
179 180
180 func executeQuery(fq *ds.FinalizedQuery, ns string, isTxn bool, idx, head *memSt ore, cb ds.RawRunCB) error { 181 func executeQuery(fq *ds.FinalizedQuery, aid, ns string, isTxn bool, idx, head * memStore, cb ds.RawRunCB) error {
181 » rq, err := reduce(fq, ns, isTxn) 182 » rq, err := reduce(fq, aid, ns, isTxn)
182 if err == ds.ErrNullQuery { 183 if err == ds.ErrNullQuery {
183 return nil 184 return nil
184 } 185 }
185 if err != nil { 186 if err != nil {
186 return err 187 return err
187 } 188 }
188 189
189 idxs, err := getIndexes(rq, idx) 190 idxs, err := getIndexes(rq, idx)
190 if err == ds.ErrNullQuery { 191 if err == ds.ErrNullQuery {
191 return nil 192 return nil
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 offset-- 230 offset--
230 return true 231 return true
231 } 232 }
232 if hasLimit { 233 if hasLimit {
233 if limit <= 0 { 234 if limit <= 0 {
234 return false 235 return false
235 } 236 }
236 limit-- 237 limit--
237 } 238 }
238 239
239 » » rawData, decodedProps := parseSuffix(ns, rq.suffixFormat, suffix , -1) 240 » » rawData, decodedProps := parseSuffix(aid, ns, rq.suffixFormat, s uffix, -1)
240 241
241 keyProp := decodedProps[len(decodedProps)-1] 242 keyProp := decodedProps[len(decodedProps)-1]
242 if keyProp.Type() != ds.PTKey { 243 if keyProp.Type() != ds.PTKey {
243 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp)) 244 impossible(fmt.Errorf("decoded index row doesn't end wit h a Key: %#v", keyProp))
244 } 245 }
245 246
246 return strategy.handle( 247 return strategy.handle(
247 rawData, decodedProps, keyProp.Value().(*ds.Key), 248 rawData, decodedProps, keyProp.Value().(*ds.Key),
248 getCursorFn(suffix)) 249 getCursorFn(suffix))
249 }) 250 })
250 251
251 return nil 252 return nil
252 } 253 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698