| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "fmt" | 9 "fmt" |
| 10 "sort" | 10 "sort" |
| 11 | 11 |
| 12 ds "github.com/luci/gae/service/datastore" | 12 ds "github.com/luci/gae/service/datastore" |
| 13 "github.com/luci/gae/service/datastore/serialize" | 13 "github.com/luci/gae/service/datastore/serialize" |
| 14 "github.com/luci/gkvlite" | |
| 15 ) | 14 ) |
| 16 | 15 |
| 17 type qIndexSlice []*ds.IndexDefinition | 16 type qIndexSlice []*ds.IndexDefinition |
| 18 | 17 |
| 19 func (s qIndexSlice) Len() int { return len(s) } | 18 func (s qIndexSlice) Len() int { return len(s) } |
| 20 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | 19 func (s qIndexSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
| 21 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } | 20 func (s qIndexSlice) Less(i, j int) bool { return s[i].Less(s[j]) } |
| 22 | 21 |
| 23 func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { | 22 func defaultIndexes(kind string, pmap ds.PropertyMap) []*ds.IndexDefinition { |
| 24 ret := make(qIndexSlice, 0, 2*len(pmap)+1) | 23 ret := make(qIndexSlice, 0, 2*len(pmap)+1) |
| (...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 176 } | 175 } |
| 177 itrDef := iterDefinition{c: idxColl} | 176 itrDef := iterDefinition{c: idxColl} |
| 178 | 177 |
| 179 if endsWith != nil { | 178 if endsWith != nil { |
| 180 full := serialize.ToBytes(*endsWith.Flip()) | 179 full := serialize.ToBytes(*endsWith.Flip()) |
| 181 // chop off the null terminating byte | 180 // chop off the null terminating byte |
| 182 itrDef.prefix = full[:len(full)-1] | 181 itrDef.prefix = full[:len(full)-1] |
| 183 } | 182 } |
| 184 | 183 |
| 185 it := itrDef.mkIter() | 184 it := itrDef.mkIter() |
| 186 » defer it.stop() | 185 » for ent := it.next(); ent != nil; ent = it.next() { |
| 187 » for !it.stopped { | 186 » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer(ent.key
)) |
| 188 » » it.next(nil, func(i *gkvlite.Item) { | 187 » » memoryCorruption(err) |
| 189 » » » if i == nil { | 188 » » if !cb(qi.Flip()) { |
| 190 » » » » return | 189 » » » break |
| 191 » » » } | 190 » » } |
| 192 » » » qi, err := serialize.ReadIndexDefinition(bytes.NewBuffer
(i.Key)) | |
| 193 » » » memoryCorruption(err) | |
| 194 » » » if !cb(qi.Flip()) { | |
| 195 » » » » it.stop() | |
| 196 » » » } | |
| 197 » » }) | |
| 198 } | 191 } |
| 199 } | 192 } |
| 200 | 193 |
| 201 func mergeIndexes(ns string, store, oldIdx, newIdx memStore) { | 194 func mergeIndexes(ns string, store, oldIdx, newIdx memStore) { |
| 202 prefixBuf := []byte("idx:" + ns + ":") | 195 prefixBuf := []byte("idx:" + ns + ":") |
| 203 origPrefixBufLen := len(prefixBuf) | 196 origPrefixBufLen := len(prefixBuf) |
| 204 | 197 |
| 205 oldIdx = oldIdx.Snapshot() | 198 oldIdx = oldIdx.Snapshot() |
| 206 newIdx = newIdx.Snapshot() | 199 newIdx = newIdx.Snapshot() |
| 207 | 200 |
| 208 » gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun
c(k, ov, nv []byte) { | 201 » memStoreCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx")
, func(k, ov, nv []byte) { |
| 209 prefixBuf = append(prefixBuf[:origPrefixBufLen], k...) | 202 prefixBuf = append(prefixBuf[:origPrefixBufLen], k...) |
| 210 ks := string(prefixBuf) | 203 ks := string(prefixBuf) |
| 211 | 204 |
| 212 coll := store.GetOrCreateCollection(ks) | 205 coll := store.GetOrCreateCollection(ks) |
| 213 | 206 |
| 214 oldColl := oldIdx.GetCollection(ks) | 207 oldColl := oldIdx.GetCollection(ks) |
| 215 newColl := newIdx.GetCollection(ks) | 208 newColl := newIdx.GetCollection(ks) |
| 216 | 209 |
| 217 switch { | 210 switch { |
| 218 case ov == nil && nv != nil: // all additions | 211 case ov == nil && nv != nil: // all additions |
| 219 » » » newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite
m) bool { | 212 » » » newColl.ForEachItem(func(k, _ []byte) bool { |
| 220 » » » » coll.Set(i.Key, []byte{}) | 213 » » » » coll.Set(k, []byte{}) |
| 221 return true | 214 return true |
| 222 }) | 215 }) |
| 223 case ov != nil && nv == nil: // all deletions | 216 case ov != nil && nv == nil: // all deletions |
| 224 » » » oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite
m) bool { | 217 » » » oldColl.ForEachItem(func(k, _ []byte) bool { |
| 225 » » » » coll.Delete(i.Key) | 218 » » » » coll.Delete(k) |
| 226 return true | 219 return true |
| 227 }) | 220 }) |
| 228 case ov != nil && nv != nil: // merge | 221 case ov != nil && nv != nil: // merge |
| 229 » » » gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { | 222 » » » memStoreCollide(oldColl, newColl, func(k, ov, nv []byte)
{ |
| 230 if nv == nil { | 223 if nv == nil { |
| 231 coll.Delete(k) | 224 coll.Delete(k) |
| 232 } else { | 225 } else { |
| 233 coll.Set(k, []byte{}) | 226 coll.Set(k, []byte{}) |
| 234 } | 227 } |
| 235 }) | 228 }) |
| 236 default: | 229 default: |
| 237 » » » impossible(fmt.Errorf("both values from gkvCollide were
nil?")) | 230 » » » impossible(fmt.Errorf("both values from memStoreCollide
were nil?")) |
| 238 } | 231 } |
| 239 // TODO(riannucci): remove entries from idxColl and remove index
collections | 232 // TODO(riannucci): remove entries from idxColl and remove index
collections |
| 240 // when there are no index entries for that index any more. | 233 // when there are no index entries for that index any more. |
| 241 }) | 234 }) |
| 242 } | 235 } |
| 243 | 236 |
| 244 func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) { | 237 func addIndexes(store memStore, aid string, compIdx []*ds.IndexDefinition) { |
| 245 normalized := make([]*ds.IndexDefinition, len(compIdx)) | 238 normalized := make([]*ds.IndexDefinition, len(compIdx)) |
| 246 idxColl := store.GetOrCreateCollection("idx") | 239 idxColl := store.GetOrCreateCollection("idx") |
| 247 for i, idx := range compIdx { | 240 for i, idx := range compIdx { |
| 248 normalized[i] = idx.Normalize() | 241 normalized[i] = idx.Normalize() |
| 249 idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()),
[]byte{}) | 242 idxColl.Set(serialize.ToBytes(*normalized[i].PrepForIdxTable()),
[]byte{}) |
| 250 } | 243 } |
| 251 | 244 |
| 252 for _, ns := range namespaces(store) { | 245 for _, ns := range namespaces(store) { |
| 246 kctx := ds.MkKeyContext(aid, ns) |
| 253 if allEnts := store.Snapshot().GetCollection("ents:" + ns); allE
nts != nil { | 247 if allEnts := store.Snapshot().GetCollection("ents:" + ns); allE
nts != nil { |
| 254 » » » allEnts.VisitItemsAscend(nil, true, func(i *gkvlite.Item
) bool { | 248 » » » allEnts.ForEachItem(func(ik, iv []byte) bool { |
| 255 » » » » pm, err := rpm(i.Val) | 249 » » » » pm, err := rpm(iv) |
| 256 memoryCorruption(err) | 250 memoryCorruption(err) |
| 257 | 251 |
| 258 » » » » prop, err := serialize.ReadProperty(bytes.NewBuf
fer(i.Key), serialize.WithoutContext, ds.MkKeyContext(aid, ns)) | 252 » » » » prop, err := serialize.ReadProperty(bytes.NewBuf
fer(ik), serialize.WithoutContext, kctx) |
| 259 memoryCorruption(err) | 253 memoryCorruption(err) |
| 260 | 254 |
| 261 k := prop.Value().(*ds.Key) | 255 k := prop.Value().(*ds.Key) |
| 262 | 256 |
| 263 sip := serialize.PropertyMapPartially(k, pm) | 257 sip := serialize.PropertyMapPartially(k, pm) |
| 264 | 258 |
| 265 mergeIndexes(ns, store, | 259 mergeIndexes(ns, store, |
| 266 newMemStore(), | 260 newMemStore(), |
| 267 indexEntries(k, sip, normalized)) | 261 indexEntries(k, sip, normalized)) |
| 268 return true | 262 return true |
| (...skipping 12 matching lines...) Expand all Loading... |
| 281 var compIdx []*ds.IndexDefinition | 275 var compIdx []*ds.IndexDefinition |
| 282 walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool { | 276 walkCompIdxs(store.Snapshot(), nil, func(i *ds.IndexDefinition) bool { |
| 283 compIdx = append(compIdx, i) | 277 compIdx = append(compIdx, i) |
| 284 return true | 278 return true |
| 285 }) | 279 }) |
| 286 | 280 |
| 287 mergeIndexes(key.Namespace(), store, | 281 mergeIndexes(key.Namespace(), store, |
| 288 indexEntriesWithBuiltins(key, oldEnt, compIdx), | 282 indexEntriesWithBuiltins(key, oldEnt, compIdx), |
| 289 indexEntriesWithBuiltins(key, newEnt, compIdx)) | 283 indexEntriesWithBuiltins(key, newEnt, compIdx)) |
| 290 } | 284 } |
| OLD | NEW |