OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "fmt" | 9 "fmt" |
10 "sort" | 10 "sort" |
11 | 11 |
12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
14 "github.com/luci/gkvlite" | |
15 ) | 14 ) |
16 | 15 |
17 type qIndexSlice []*ds.IndexDefinition | 16 type qIndexSlice []*ds.IndexDefinition |
18 | 17 |
19 func (s qIndexSlice) Len() int { return len(s) } | 18 func (s qIndexSlice) Len() int { return len(s) } |
20 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | 19 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
21 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } | 20 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } |
22 | 21 |
23 func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { | 22 func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { |
24 ret := make(qIndexSlice, 0, 2*len(pmap)+1) | 23 ret := make(qIndexSlice, 0, 2*len(pmap)+1) |
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
176 } | 175 } |
177 itrDef := iterDefinition{c: idxColl} | 176 itrDef := iterDefinition{c: idxColl} |
178 | 177 |
179 if endsWith != nil { | 178 if endsWith != nil { |
180 full := serialize.ToBytes(*endsWith.Flip()) | 179 full := serialize.ToBytes(*endsWith.Flip()) |
181 // chop off the null terminating byte | 180 // chop off the null terminating byte |
182 itrDef.prefix = full[:len(full)-1] | 181 itrDef.prefix = full[:len(full)-1] |
183 } | 182 } |
184 | 183 |
185 it := itrDef.mkIter() | 184 it := itrDef.mkIter() |
186 » defer it.stop() | 185 » for ent := it.next(); ent != nil; ent = it.next() { |
187 » for !it.stopped { | 186 » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(ent.key
)) |
188 » » it.next(nil, func(i *gkvlite.Item) { | 187 » » memoryCorruption(err) |
189 » » » if i == nil { | 188 » » if !cb(qi.Flip()) { |
190 » » » » return | 189 » » » break |
191 » » » } | 190 » » } |
192 » » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer
(i.Key)) | |
193 » » » memoryCorruption(err) | |
194 » » » if !cb(qi.Flip()) { | |
195 » » » » it.stop() | |
196 » » » } | |
197 » » }) | |
198 } | 191 } |
199 } | 192 } |
200 | 193 |
201 func mergeIndexes(ns string, store, oldIdx, newIdx memStore) { | 194 func mergeIndexes(ns string, store, oldIdx, newIdx memStore) { |
202 prefixBuf := []byte("idx:" + ns + ":") | 195 prefixBuf := []byte("idx:" + ns + ":") |
203 origPrefixBufLen := len(prefixBuf) | 196 origPrefixBufLen := len(prefixBuf) |
204 | 197 |
205 oldIdx = oldIdx.Snapshot() | 198 oldIdx = oldIdx.Snapshot() |
206 newIdx = newIdx.Snapshot() | 199 newIdx = newIdx.Snapshot() |
207 | 200 |
208 » gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun
c(k, ov, nv []byte) { | 201 » memStoreCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx")
, func(k, ov, nv []byte) { |
209 prefixBuf = append(prefixBuf[:origPrefixBufLen], k...) | 202 prefixBuf = append(prefixBuf[:origPrefixBufLen], k...) |
210 ks := string(prefixBuf) | 203 ks := string(prefixBuf) |
211 | 204 |
212 coll := store.GetOrCreateCollection(ks) | 205 coll := store.GetOrCreateCollection(ks) |
213 | 206 |
214 oldColl := oldIdx.GetCollection(ks) | 207 oldColl := oldIdx.GetCollection(ks) |
215 newColl := newIdx.GetCollection(ks) | 208 newColl := newIdx.GetCollection(ks) |
216 | 209 |
217 switch { | 210 switch { |
218 case ov == nil && nv != nil: // all additions | 211 case ov == nil && nv != nil: // all additions |
219 » » » newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite
m) bool { | 212 » » » newColl.ForEachItem(func(k, _ []byte) bool { |
220 » » » » coll.Set(i.Key, []byte{}) | 213 » » » » coll.Set(k, []byte{}) |
221 return true | 214 return true |
222 }) | 215 }) |
223 case ov != nil && nv == nil: // all deletions | 216 case ov != nil && nv == nil: // all deletions |
224 » » » oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite
m) bool { | 217 » » » oldColl.ForEachItem(func(k, _ []byte) bool { |
225 » » » » coll.Delete(i.Key) | 218 » » » » coll.Delete(k) |
226 return true | 219 return true |
227 }) | 220 }) |
228 case ov != nil && nv != nil: // merge | 221 case ov != nil && nv != nil: // merge |
229 » » » gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { | 222 » » » memStoreCollide(oldColl, newColl, func(k, ov, nv []byte)
{ |
230 if nv == nil { | 223 if nv == nil { |
231 coll.Delete(k) | 224 coll.Delete(k) |
232 } else { | 225 } else { |
233 coll.Set(k, []byte{}) | 226 coll.Set(k, []byte{}) |
234 } | 227 } |
235 }) | 228 }) |
236 default: | 229 default: |
237 » » » impossible(fmt.Errorf("both values from gkvCollide were
nil?")) | 230 » » » impossible(fmt.Errorf("both values from memStoreCollide
were nil?")) |
238 } | 231 } |
239 // TODO(riannucci): remove entries from idxColl and remove index
collections | 232 // TODO(riannucci): remove entries from idxColl and remove index
collections |
240 // when there are no index entries for that index any more. | 233 // when there are no index entries for that index any more. |
241 }) | 234 }) |
242 } | 235 } |
243 | 236 |
244 func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) { | 237 func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) { |
245 normalized := make([]*ds.IndexDefinition, len(compIdx)) | 238 normalized := make([]*ds.IndexDefinition, len(compIdx)) |
246 idxColl := store.GetOrCreateCollection("idx") | 239 idxColl := store.GetOrCreateCollection("idx") |
247 for i, idx := range compIdx { | 240 for i, idx := range compIdx { |
248 normalized[i] = idx.Normalize() | 241 normalized[i] = idx.Normalize() |
249 idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()),
[]byte{}) | 242 idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()),
[]byte{}) |
250 } | 243 } |
251 | 244 |
252 for _, ns := range namespaces(store) { | 245 for _, ns := range namespaces(store) { |
| 246 kctx := ds.MkKeyContext(aid, ns) |
253 if allEnts := store.Snapshot().GetCollection("ents:" + ns); allE
nts != nil { | 247 if allEnts := store.Snapshot().GetCollection("ents:" + ns); allE
nts != nil { |
254 » » » allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item
) bool { | 248 » » » allEnts.ForEachItem(func(ik, iv []byte) bool { |
255 » » » » pm, err := rpm(i.Val) | 249 » » » » pm, err := rpm(iv) |
256 memoryCorruption(err) | 250 memoryCorruption(err) |
257 | 251 |
258 » » » » prop, err := serialize.ReadProperty(bytes.NewBuf
fer(i.Key), serialize.WithoutContext, ds.MkKeyContext(aid, ns)) | 252 » » » » prop, err := serialize.ReadProperty(bytes.NewBuf
fer(ik), serialize.WithoutContext, kctx) |
259 memoryCorruption(err) | 253 memoryCorruption(err) |
260 | 254 |
261 k := prop.Value().(*ds.Key) | 255 k := prop.Value().(*ds.Key) |
262 | 256 |
263 sip := serialize.PropertyMapPartially(k, pm) | 257 sip := serialize.PropertyMapPartially(k, pm) |
264 | 258 |
265 mergeIndexes(ns, store, | 259 mergeIndexes(ns, store, |
266 newMemStore(), | 260 newMemStore(), |
267 indexEntries(k, sip, normalized)) | 261 indexEntries(k, sip, normalized)) |
268 return true | 262 return true |
(...skipping 12 matching lines...) Expand all Loading... |
281 var compIdx []*ds.IndexDefinition | 275 var compIdx []*ds.IndexDefinition |
282 walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool { | 276 walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool { |
283 compIdx = append(compIdx, i) | 277 compIdx = append(compIdx, i) |
284 return true | 278 return true |
285 }) | 279 }) |
286 | 280 |
287 mergeIndexes(key.Namespace(), store, | 281 mergeIndexes(key.Namespace(), store, |
288 indexEntriesWithBuiltins(key, oldEnt, compIdx), | 282 indexEntriesWithBuiltins(key, oldEnt, compIdx), |
289 indexEntriesWithBuiltins(key, newEnt, compIdx)) | 283 indexEntriesWithBuiltins(key, newEnt, compIdx)) |
290 } | 284 } |
OLD | NEW |