Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(68)

Side by Side Diff: impl/memory/datastore_index.go

Issue 1285703002: Add testable interface for datastore. (Closed) Base URL: https://github.com/luci/gae.git@master
Patch Set: fixes after Raw change Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/memory/datastore_data.go ('k') | impl/memory/datastore_index_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "fmt" 9 "fmt"
10 "sort" 10 "sort"
11 11
12 ds "github.com/luci/gae/service/datastore" 12 ds "github.com/luci/gae/service/datastore"
13 "github.com/luci/gkvlite" 13 "github.com/luci/gkvlite"
14 ) 14 )
15 15
16 var indexCreationDeterministic = false 16 var indexCreationDeterministic = false
17 17
18 type qIndexSlice []*qIndex 18 type qIndexSlice []*ds.IndexDefinition
19 19
20 func (s qIndexSlice) Len() int { return len(s) } 20 func (s qIndexSlice) Len() int { return len(s) }
21 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 21 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
22 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } 22 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) }
23 23
24 func defaultIndicies(kind string, pmap ds.PropertyMap) []*qIndex { 24 func defaultIndicies(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition {
25 ret := make(qIndexSlice, 0, 2*len(pmap)+1) 25 ret := make(qIndexSlice, 0, 2*len(pmap)+1)
26 » ret = append(ret, &qIndex{kind, false, nil}) 26 » ret = append(ret, &ds.IndexDefinition{Kind: kind})
27 for name, pvals := range pmap { 27 for name, pvals := range pmap {
28 needsIndex := false 28 needsIndex := false
29 for _, v := range pvals { 29 for _, v := range pvals {
30 if v.IndexSetting() == ds.ShouldIndex { 30 if v.IndexSetting() == ds.ShouldIndex {
31 needsIndex = true 31 needsIndex = true
32 break 32 break
33 } 33 }
34 } 34 }
35 if !needsIndex { 35 if !needsIndex {
36 continue 36 continue
37 } 37 }
38 » » ret = append(ret, &qIndex{kind, false, []qSortBy{{name, qASC}}}) 38 » » ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I ndexColumn{{Property: name}}})
39 » » ret = append(ret, &qIndex{kind, false, []qSortBy{{name, qDEC}}}) 39 » » ret = append(ret, &ds.IndexDefinition{Kind: kind, SortBy: []ds.I ndexColumn{{Property: name, Direction: ds.DESCENDING}}})
40 } 40 }
41 if indexCreationDeterministic { 41 if indexCreationDeterministic {
42 sort.Sort(ret) 42 sort.Sort(ret)
43 } 43 }
44 return ret 44 return ret
45 } 45 }
46 46
47 func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*qIndex ) *memStore { 47 func indexEntriesWithBuiltins(k ds.Key, pm ds.PropertyMap, complexIdxs []*ds.Ind exDefinition) *memStore {
48 sip := partiallySerialize(pm) 48 sip := partiallySerialize(pm)
49 return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complex Idxs...)) 49 return sip.indexEntries(k, append(defaultIndicies(k.Kind(), pm), complex Idxs...))
50 } 50 }
51 51
52 // serializedPvals is all of the serialized DSProperty values in qASC order. 52 // serializedPvals is all of the serialized DSProperty values in qASC order.
53 type serializedPvals [][]byte 53 type serializedPvals [][]byte
54 54
55 func (s serializedPvals) Len() int { return len(s) } 55 func (s serializedPvals) Len() int { return len(s) }
56 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } 56 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
57 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 } 57 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j]) < 0 }
(...skipping 22 matching lines...) Expand all
80 } 80 }
81 if len(newVals) > 0 { 81 if len(newVals) > 0 {
82 sort.Sort(newVals) 82 sort.Sort(newVals)
83 ret[k] = newVals 83 ret[k] = newVals
84 } 84 }
85 } 85 }
86 return 86 return
87 } 87 }
88 88
89 // indexRowGen contains enough information to generate all of the index rows whi ch 89 // indexRowGen contains enough information to generate all of the index rows whi ch
90 // correspond with a propertyList and a qIndex. 90 // correspond with a propertyList and a ds.IndexDefinition.
91 type indexRowGen struct { 91 type indexRowGen struct {
92 propVec []serializedPvals 92 propVec []serializedPvals
93 » orders []qDirection 93 » orders []ds.IndexDirection
94 } 94 }
95 95
96 // permute calls cb for each index row, in the sorted order of the rows. 96 // permute calls cb for each index row, in the sorted order of the rows.
97 func (s indexRowGen) permute(cb func([]byte)) { 97 func (s indexRowGen) permute(cb func([]byte)) {
98 iVec := make([]int, len(s.propVec)) 98 iVec := make([]int, len(s.propVec))
99 iVecLim := make([]int, len(s.propVec)) 99 iVecLim := make([]int, len(s.propVec))
100 100
101 incPos := func() bool { 101 incPos := func() bool {
102 for i := len(iVec) - 1; i >= 0; i-- { 102 for i := len(iVec) - 1; i >= 0; i-- {
103 var done bool 103 var done bool
104 var newVal int 104 var newVal int
105 » » » if s.orders[i] == qASC { 105 » » » if s.orders[i] == ds.ASCENDING {
106 newVal = (iVec[i] + 1) % iVecLim[i] 106 newVal = (iVec[i] + 1) % iVecLim[i]
107 done = newVal != 0 107 done = newVal != 0
108 } else { 108 } else {
109 newVal = (iVec[i] - 1) 109 newVal = (iVec[i] - 1)
110 if newVal < 0 { 110 if newVal < 0 {
111 newVal = iVecLim[i] - 1 111 newVal = iVecLim[i] - 1
112 } else { 112 } else {
113 done = true 113 done = true
114 } 114 }
115 } 115 }
116 iVec[i] = newVal 116 iVec[i] = newVal
117 if done { 117 if done {
118 return true 118 return true
119 } 119 }
120 } 120 }
121 return false 121 return false
122 } 122 }
123 123
124 for i, sps := range s.propVec { 124 for i, sps := range s.propVec {
125 iVecLim[i] = len(sps) 125 iVecLim[i] = len(sps)
126 } 126 }
127 127
128 for i := range iVec { 128 for i := range iVec {
129 » » if s.orders[i] == qDEC { 129 » » if s.orders[i] == ds.DESCENDING {
130 iVec[i] = iVecLim[i] - 1 130 iVec[i] = iVecLim[i] - 1
131 } 131 }
132 } 132 }
133 133
134 for { 134 for {
135 bufsiz := 0 135 bufsiz := 0
136 for pvalSliceIdx, pvalIdx := range iVec { 136 for pvalSliceIdx, pvalIdx := range iVec {
137 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) 137 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx])
138 } 138 }
139 buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) 139 buf := bytes.NewBuffer(make([]byte, 0, bufsiz))
140 for pvalSliceIdx, pvalIdx := range iVec { 140 for pvalSliceIdx, pvalIdx := range iVec {
141 data := s.propVec[pvalSliceIdx][pvalIdx] 141 data := s.propVec[pvalSliceIdx][pvalIdx]
142 » » » if s.orders[pvalSliceIdx] == qASC { 142 » » » if s.orders[pvalSliceIdx] == ds.ASCENDING {
143 buf.Write(data) 143 buf.Write(data)
144 } else { 144 } else {
145 for _, b := range data { 145 for _, b := range data {
146 buf.WriteByte(b ^ 0xFF) 146 buf.WriteByte(b ^ 0xFF)
147 } 147 }
148 } 148 }
149 } 149 }
150 cb(buf.Bytes()) 150 cb(buf.Bytes())
151 if !incPos() { 151 if !incPos() {
152 break 152 break
153 } 153 }
154 } 154 }
155 } 155 }
156 156
157 type matcher struct { 157 type matcher struct {
158 buf indexRowGen 158 buf indexRowGen
159 } 159 }
160 160
161 // matcher.match checks to see if the mapped, serialized property values 161 // matcher.match checks to see if the mapped, serialized property values
162 // match the index. If they do, it returns a indexRowGen. Do not write or modify 162 // match the index. If they do, it returns a indexRowGen. Do not write or modify
163 // the data in the indexRowGen. 163 // the data in the indexRowGen.
164 func (m *matcher) match(idx *qIndex, sip serializedIndexablePmap) (indexRowGen, bool) { 164 func (m *matcher) match(idx *ds.IndexDefinition, sip serializedIndexablePmap) (i ndexRowGen, bool) {
165 m.buf.propVec = m.buf.propVec[:0] 165 m.buf.propVec = m.buf.propVec[:0]
166 m.buf.orders = m.buf.orders[:0] 166 m.buf.orders = m.buf.orders[:0]
167 » for _, sb := range idx.sortby { 167 » for _, sb := range idx.SortBy {
168 » » if pv, ok := sip[sb.prop]; ok { 168 » » if pv, ok := sip[sb.Property]; ok {
169 m.buf.propVec = append(m.buf.propVec, pv) 169 m.buf.propVec = append(m.buf.propVec, pv)
170 » » » m.buf.orders = append(m.buf.orders, sb.dir) 170 » » » m.buf.orders = append(m.buf.orders, sb.Direction)
171 } else { 171 } else {
172 return indexRowGen{}, false 172 return indexRowGen{}, false
173 } 173 }
174 } 174 }
175 return m.buf, true 175 return m.buf, true
176 } 176 }
177 177
178 func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*qIndex) *memSt ore { 178 func (sip serializedIndexablePmap) indexEntries(k ds.Key, idxs []*ds.IndexDefini tion) *memStore {
179 ret := newMemStore() 179 ret := newMemStore()
180 idxColl := ret.SetCollection("idx", nil) 180 idxColl := ret.SetCollection("idx", nil)
181 // getIdxEnts retrieves an index collection or adds it if it's not there . 181 // getIdxEnts retrieves an index collection or adds it if it's not there .
182 » getIdxEnts := func(qi *qIndex) *memCollection { 182 » getIdxEnts := func(qi *ds.IndexDefinition) *memCollection {
183 buf := &bytes.Buffer{} 183 buf := &bytes.Buffer{}
184 » » qi.WriteBinary(buf) 184 » » qi.Write(buf)
185 b := buf.Bytes() 185 b := buf.Bytes()
186 idxColl.Set(b, []byte{}) 186 idxColl.Set(b, []byte{})
187 return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil) 187 return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(), b), nil)
188 } 188 }
189 189
190 buf := &bytes.Buffer{} 190 buf := &bytes.Buffer{}
191 ds.WriteKey(buf, ds.WithoutContext, k) 191 ds.WriteKey(buf, ds.WithoutContext, k)
192 keyData := buf.Bytes() 192 keyData := buf.Bytes()
193 193
194 walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollec tion) { 194 walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollec tion) {
195 prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil. 195 prev := []byte{} // intentionally make a non-nil slice, gkvlite hates nil.
196 irg.permute(func(data []byte) { 196 irg.permute(func(data []byte) {
197 buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(d ata)+len(keyData))) 197 buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(d ata)+len(keyData)))
198 buf.Write(prefix) 198 buf.Write(prefix)
199 buf.Write(data) 199 buf.Write(data)
200 buf.Write(keyData) 200 buf.Write(keyData)
201 ents.Set(buf.Bytes(), prev) 201 ents.Set(buf.Bytes(), prev)
202 prev = data 202 prev = data
203 }) 203 })
204 } 204 }
205 205
206 mtch := matcher{} 206 mtch := matcher{}
207 for _, idx := range idxs { 207 for _, idx := range idxs {
208 if irg, ok := mtch.match(idx, sip); ok { 208 if irg, ok := mtch.match(idx, sip); ok {
209 idxEnts := getIdxEnts(idx) 209 idxEnts := getIdxEnts(idx)
210 if len(irg.propVec) == 0 { 210 if len(irg.propVec) == 0 {
211 idxEnts.Set(keyData, []byte{}) // propless index , e.g. kind -> key = nil 211 idxEnts.Set(keyData, []byte{}) // propless index , e.g. kind -> key = nil
212 » » » } else if idx.ancestor { 212 » » » } else if idx.Ancestor {
213 for ancKey := k; ancKey != nil; ancKey = ancKey. Parent() { 213 for ancKey := k; ancKey != nil; ancKey = ancKey. Parent() {
214 buf := &bytes.Buffer{} 214 buf := &bytes.Buffer{}
215 ds.WriteKey(buf, ds.WithoutContext, ancK ey) 215 ds.WriteKey(buf, ds.WithoutContext, ancK ey)
216 walkPermutations(buf.Bytes(), irg, idxEn ts) 216 walkPermutations(buf.Bytes(), irg, idxEn ts)
217 } 217 }
218 } else { 218 } else {
219 walkPermutations(nil, irg, idxEnts) 219 walkPermutations(nil, irg, idxEnts)
220 } 220 }
221 } 221 }
222 } 222 }
223 223
224 return ret 224 return ret
225 } 225 }
226 226
227 func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) { 227 func getCompIdxs(idxColl *memCollection) []*ds.IndexDefinition {
228 » idxColl := store.GetCollection("idx")
229 » if idxColl == nil {
230 » » idxColl = store.SetCollection("idx", nil)
231 » }
232
233 // load all current complex query index definitions. 228 // load all current complex query index definitions.
234 » compIdx := []*qIndex{} 229 » compIdx := []*ds.IndexDefinition{}
230 » complexQueryPrefix := ds.IndexComplexQueryPrefix()
235 idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item ) bool { 231 idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item ) bool {
236 if !bytes.HasPrefix(i.Key, complexQueryPrefix) { 232 if !bytes.HasPrefix(i.Key, complexQueryPrefix) {
237 return false 233 return false
238 } 234 }
239 » » qi := &qIndex{} 235 » » qi := &ds.IndexDefinition{}
240 » » if err := qi.ReadBinary(bytes.NewBuffer(i.Key)); err != nil { 236 » » if err := qi.Read(bytes.NewBuffer(i.Key)); err != nil {
241 panic(err) // memory corruption 237 panic(err) // memory corruption
242 } 238 }
243 compIdx = append(compIdx, qi) 239 compIdx = append(compIdx, qi)
244 return true 240 return true
245 }) 241 })
242 return compIdx
243 }
246 244
247 » oldIdx := indexEntriesWithBuiltins(key, oldEnt, compIdx) 245 func getIdxColl(store *memStore) *memCollection {
246 » idxColl := store.GetCollection("idx")
247 » if idxColl == nil {
248 » » idxColl = store.SetCollection("idx", nil)
249 » }
250 » return idxColl
251 }
248 252
249 » newIdx := indexEntriesWithBuiltins(key, newEnt, compIdx) 253 func mergeIndexes(ns string, store, oldIdx, newIdx *memStore) {
250 254 » idxColl := getIdxColl(store)
251 » prefix := "idx:" + key.Namespace() + ":" 255 » prefix := "idx:" + ns + ":"
252
253 gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun c(k, ov, nv []byte) { 256 gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun c(k, ov, nv []byte) {
254 ks := prefix + string(k) 257 ks := prefix + string(k)
255 » » idxColl.Set(k, []byte{}) 258 » » if idxColl.Get(k) == nil {
259 » » » // avoids unnecessary mutation, otherwise the idx collec tion thrashes on
260 » » » // every update.
261 » » » idxColl.Set(k, []byte{})
262 » » }
256 263
257 coll := store.GetCollection(ks) 264 coll := store.GetCollection(ks)
258 if coll == nil { 265 if coll == nil {
259 coll = store.SetCollection(ks, nil) 266 coll = store.SetCollection(ks, nil)
260 } 267 }
261 oldColl := oldIdx.GetCollection(ks) 268 oldColl := oldIdx.GetCollection(ks)
262 newColl := newIdx.GetCollection(ks) 269 newColl := newIdx.GetCollection(ks)
263 270
264 switch { 271 switch {
265 case ov == nil && nv != nil: // all additions 272 case ov == nil && nv != nil: // all additions
(...skipping 14 matching lines...) Expand all
280 coll.Set(k, nv) 287 coll.Set(k, nv)
281 } 288 }
282 }) 289 })
283 default: 290 default:
284 panic("impossible") 291 panic("impossible")
285 } 292 }
286 // TODO(riannucci): remove entries from idxColl and remove index collections 293 // TODO(riannucci): remove entries from idxColl and remove index collections
287 // when there are no index entries for that index any more. 294 // when there are no index entries for that index any more.
288 }) 295 })
289 } 296 }
297
298 func addIndex(store *memStore, ns string, compIdx []*ds.IndexDefinition) {
299 store.GetCollection("ents:"+ns).VisitItemsAscend(nil, true, func(i *gkvl ite.Item) bool {
300 pm, err := rpmWoCtx(i.Val, ns)
301 if err != nil {
302 panic(err) // memory corruption
303 }
304 k, err := ds.ReadKey(bytes.NewBuffer(i.Key), ds.WithoutContext, globalAppID, ns)
305 if err != nil {
306 panic(err)
307 }
308 sip := partiallySerialize(pm)
309 mergeIndexes(ns, store, newMemStore(), sip.indexEntries(k, compI dx))
310 return true
311 })
312 }
313
314 func updateIndicies(store *memStore, key ds.Key, oldEnt, newEnt ds.PropertyMap) {
315 // load all current complex query index definitions.
316 compIdx := getCompIdxs(getIdxColl(store))
317
318 mergeIndexes(key.Namespace(), store,
319 indexEntriesWithBuiltins(key, oldEnt, compIdx),
320 indexEntriesWithBuiltins(key, newEnt, compIdx))
321 }
OLDNEW
« no previous file with comments | « impl/memory/datastore_data.go ('k') | impl/memory/datastore_index_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698