OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memory | |
6 | |
7 import ( | |
8 "bytes" | |
9 "fmt" | |
10 "reflect" | |
11 "sort" | |
12 "time" | |
13 | |
14 "appengine" | |
15 "appengine/datastore" | |
16 | |
17 "github.com/luci/gkvlite" | |
18 "github.com/luci/luci-go/common/cmpbin" | |
19 ) | |
20 | |
21 type typData struct { | |
22 noIndex bool | |
23 typ propValType | |
24 data interface{} | |
25 } | |
26 | |
27 func newTypData(noIndex bool, v interface{}) (ret *typData, err error) { | |
28 typ := pvUNKNOWN | |
29 | |
30 switch x := v.(type) { | |
31 case nil: | |
32 typ = pvNull | |
33 case time.Time: | |
34 typ = pvTime | |
35 case int64: | |
36 typ = pvInt | |
37 case float64: | |
38 typ = pvFloat | |
39 case bool: | |
40 if x { | |
41 typ = pvBoolTrue | |
42 } else { | |
43 typ = pvBoolFalse | |
44 } | |
45 case []byte, datastore.ByteString: | |
46 typ = pvBytes | |
47 case appengine.BlobKey: | |
48 typ = pvBlobKey | |
49 case string: | |
50 typ = pvStr | |
51 case appengine.GeoPoint: | |
52 typ = pvGeoPoint | |
53 case *datastore.Key: | |
54 typ = pvKey | |
55 } | |
56 if typ == pvUNKNOWN { | |
57 err = fmt.Errorf("propValTypeOf: unknown type of %T: %#v", v, v) | |
58 } | |
59 | |
60 return &typData{noIndex, typ, v}, err | |
61 } | |
62 | |
63 func (td *typData) WriteBinary(buf *bytes.Buffer, nso nsOption) error { | |
64 typb := byte(td.typ) | |
65 if td.noIndex { | |
66 typb |= 0x80 | |
67 } | |
68 buf.WriteByte(typb) | |
69 switch td.typ { | |
70 case pvNull, pvBoolFalse, pvBoolTrue: | |
71 return nil | |
72 case pvInt: | |
73 cmpbin.WriteInt(buf, td.data.(int64)) | |
74 case pvFloat: | |
75 writeFloat64(buf, td.data.(float64)) | |
76 case pvStr: | |
77 writeString(buf, td.data.(string)) | |
78 case pvBytes: | |
79 if td.noIndex { | |
80 writeBytes(buf, td.data.([]byte)) | |
81 } else { | |
82 writeBytes(buf, td.data.(datastore.ByteString)) | |
83 } | |
84 case pvTime: | |
85 writeTime(buf, td.data.(time.Time)) | |
86 case pvGeoPoint: | |
87 writeGeoPoint(buf, td.data.(appengine.GeoPoint)) | |
88 case pvKey: | |
89 writeKey(buf, nso, td.data.(*datastore.Key)) | |
90 case pvBlobKey: | |
91 writeString(buf, string(td.data.(appengine.BlobKey))) | |
92 default: | |
93 return fmt.Errorf("write: unknown type! %v", td) | |
94 } | |
95 return nil | |
96 } | |
97 | |
98 func (td *typData) ReadBinary(buf *bytes.Buffer, nso nsOption, ns string) error
{ | |
99 typb, err := buf.ReadByte() | |
100 if err != nil { | |
101 return err | |
102 } | |
103 td.noIndex = (typb & 0x80) != 0 // highbit means noindex | |
104 td.typ = propValType(typb & 0x7f) | |
105 switch td.typ { | |
106 case pvNull: | |
107 td.data = nil | |
108 case pvBoolTrue: | |
109 td.data = true | |
110 case pvBoolFalse: | |
111 td.data = false | |
112 case pvInt: | |
113 td.data, _, err = cmpbin.ReadInt(buf) | |
114 case pvFloat: | |
115 td.data, err = readFloat64(buf) | |
116 case pvStr: | |
117 td.data, err = readString(buf) | |
118 case pvBytes: | |
119 b := []byte(nil) | |
120 if b, err = readBytes(buf); err != nil { | |
121 return err | |
122 } | |
123 if td.noIndex { | |
124 td.data = b | |
125 } else { | |
126 td.data = datastore.ByteString(b) | |
127 } | |
128 case pvTime: | |
129 td.data, err = readTime(buf) | |
130 case pvGeoPoint: | |
131 td.data, err = readGeoPoint(buf) | |
132 case pvKey: | |
133 td.data, err = readKey(buf, nso, ns) | |
134 case pvBlobKey: | |
135 s := "" | |
136 if s, err = readString(buf); err != nil { | |
137 return err | |
138 } | |
139 td.data = appengine.BlobKey(s) | |
140 default: | |
141 return fmt.Errorf("read: unknown type! %v", td) | |
142 } | |
143 | |
144 return err | |
145 } | |
146 | |
147 type pvals struct { | |
148 name string | |
149 vals []*typData | |
150 } | |
151 | |
152 type propertyList []datastore.Property | |
153 | |
154 var _ = datastore.PropertyLoadSaver((*propertyList)(nil)) | |
155 | |
156 func (pl *propertyList) Load(ch <-chan datastore.Property) error { | |
157 return (*datastore.PropertyList)(pl).Load(ch) | |
158 } | |
159 | |
160 func (pl *propertyList) Save(ch chan<- datastore.Property) error { | |
161 return (*datastore.PropertyList)(pl).Save(ch) | |
162 } | |
163 | |
164 // collatedProperties is the reduction of a *propertyList such that each entry | |
165 // in a collatedProperties has a unique name. For example, collating this: | |
166 // pl := &propertyList{ | |
167 // datastore.Property{Name: "wat", Val: "hello"}, | |
168 // datastore.Property{Name: "other", Val: 100}, | |
169 // datastore.Property{Name: "wat", Val: "goodbye", noIndex: true}, | |
170 // } | |
171 // | |
172 // Would get a collatedProperties which looked like: | |
173 // c := collatedProperties{ | |
174 // &pvals{"wat", []*typData{&{false, pvStr, "hello"}, | |
175 // &{true, pvStr, "goodbye"}}}, | |
176 // &pvals{"other", []*typData{&{false, pvInt, 100}}} | |
177 // } | |
178 type collatedProperties []*pvals | |
179 | |
180 func (c collatedProperties) defaultIndicies(kind string) []*qIndex { | |
181 ret := make([]*qIndex, 0, 2*len(c)+1) | |
182 ret = append(ret, &qIndex{kind, false, nil}) | |
183 for _, pvals := range c { | |
184 needsIndex := false | |
185 for _, v := range pvals.vals { | |
186 if !v.noIndex { | |
187 needsIndex = true | |
188 break | |
189 } | |
190 } | |
191 if !needsIndex { | |
192 continue | |
193 } | |
194 ret = append(ret, &qIndex{kind, false, []qSortBy{{pvals.name, qA
SC}}}) | |
195 ret = append(ret, &qIndex{kind, false, []qSortBy{{pvals.name, qD
EC}}}) | |
196 } | |
197 return ret | |
198 } | |
199 | |
200 // serializedPval is a single pvals.vals entry which has been serialized (in | |
201 // qASC order). | |
202 type serializedPval []byte | |
203 | |
204 // serializedPvals is all of the pvals.vals entries from a single pvals (in qASC | |
205 // order). It does not include the pvals.name field. | |
206 type serializedPvals []serializedPval | |
207 | |
208 func (s serializedPvals) Len() int { return len(s) } | |
209 func (s serializedPvals) Swap(i, j int) { s[i], s[j] = s[j], s[i] } | |
210 func (s serializedPvals) Less(i, j int) bool { return bytes.Compare(s[i], s[j])
< 0 } | |
211 | |
212 type mappedPlist map[string]serializedPvals | |
213 | |
214 func (c collatedProperties) indexableMap() (mappedPlist, error) { | |
215 ret := make(mappedPlist, len(c)) | |
216 for _, pv := range c { | |
217 data := make(serializedPvals, 0, len(pv.vals)) | |
218 for _, v := range pv.vals { | |
219 if v.noIndex { | |
220 continue | |
221 } | |
222 buf := &bytes.Buffer{} | |
223 if err := v.WriteBinary(buf, noNS); err != nil { | |
224 return nil, err | |
225 } | |
226 data = append(data, buf.Bytes()) | |
227 } | |
228 if len(data) == 0 { | |
229 continue | |
230 } | |
231 sort.Sort(data) | |
232 ret[pv.name] = data | |
233 } | |
234 return ret, nil | |
235 } | |
236 | |
237 // indexRowGen contains enough information to generate all of the index rows whi
ch | |
238 // correspond with a propertyList and a qIndex. | |
239 type indexRowGen struct { | |
240 propVec []serializedPvals | |
241 orders []qDirection | |
242 } | |
243 | |
244 // permute calls cb for each index row, in the sorted order of the rows. | |
245 func (s indexRowGen) permute(cb func([]byte)) { | |
246 iVec := make([]int, len(s.propVec)) | |
247 iVecLim := make([]int, len(s.propVec)) | |
248 | |
249 incPos := func() bool { | |
250 for i := len(iVec) - 1; i >= 0; i-- { | |
251 var done bool | |
252 var newVal int | |
253 if s.orders[i] == qASC { | |
254 newVal = (iVec[i] + 1) % iVecLim[i] | |
255 done = newVal != 0 | |
256 } else { | |
257 newVal = (iVec[i] - 1) | |
258 if newVal < 0 { | |
259 newVal = iVecLim[i] - 1 | |
260 } else { | |
261 done = true | |
262 } | |
263 } | |
264 iVec[i] = newVal | |
265 if done { | |
266 return true | |
267 } | |
268 } | |
269 return false | |
270 } | |
271 | |
272 for i, sps := range s.propVec { | |
273 iVecLim[i] = len(sps) | |
274 } | |
275 | |
276 for i := range iVec { | |
277 if s.orders[i] == qDEC { | |
278 iVec[i] = iVecLim[i] - 1 | |
279 } | |
280 } | |
281 | |
282 for { | |
283 bufsiz := 0 | |
284 for pvalSliceIdx, pvalIdx := range iVec { | |
285 bufsiz += len(s.propVec[pvalSliceIdx][pvalIdx]) | |
286 } | |
287 buf := bytes.NewBuffer(make([]byte, 0, bufsiz)) | |
288 for pvalSliceIdx, pvalIdx := range iVec { | |
289 data := s.propVec[pvalSliceIdx][pvalIdx] | |
290 if s.orders[pvalSliceIdx] == qASC { | |
291 buf.Write(data) | |
292 } else { | |
293 for _, b := range data { | |
294 buf.WriteByte(b ^ 0xFF) | |
295 } | |
296 } | |
297 } | |
298 cb(buf.Bytes()) | |
299 if !incPos() { | |
300 break | |
301 } | |
302 } | |
303 } | |
304 | |
305 type matcher struct { | |
306 buf indexRowGen | |
307 } | |
308 | |
309 // matcher.match checks to see if the mapped, serialized property values | |
310 // match the index. If they do, it returns a indexRowGen. Do not write or modify | |
311 // the data in the indexRowGen. | |
312 func (m *matcher) match(idx *qIndex, mpvals mappedPlist) (indexRowGen, bool) { | |
313 m.buf.propVec = m.buf.propVec[:0] | |
314 m.buf.orders = m.buf.orders[:0] | |
315 for _, sb := range idx.sortby { | |
316 if pv, ok := mpvals[sb.prop]; ok { | |
317 m.buf.propVec = append(m.buf.propVec, pv) | |
318 m.buf.orders = append(m.buf.orders, sb.dir) | |
319 } else { | |
320 return indexRowGen{}, false | |
321 } | |
322 } | |
323 return m.buf, true | |
324 } | |
325 | |
326 func (c collatedProperties) indexEntries(k *datastore.Key, idxs []*qIndex) (*mem
Store, error) { | |
327 m, err := c.indexableMap() | |
328 if err != nil { | |
329 return nil, err | |
330 } | |
331 | |
332 ret := newMemStore() | |
333 idxColl := ret.SetCollection("idx", nil) | |
334 // getIdxEnts retrieves an index collection or adds it if it's not there
. | |
335 getIdxEnts := func(qi *qIndex) *memCollection { | |
336 buf := &bytes.Buffer{} | |
337 qi.WriteBinary(buf) | |
338 b := buf.Bytes() | |
339 idxColl.Set(b, []byte{}) | |
340 return ret.SetCollection(fmt.Sprintf("idx:%s:%s", k.Namespace(),
b), nil) | |
341 } | |
342 | |
343 buf := &bytes.Buffer{} | |
344 writeKey(buf, noNS, k) // ns is in idxEnts collection name. | |
345 keyData := buf.Bytes() | |
346 | |
347 walkPermutations := func(prefix []byte, irg indexRowGen, ents *memCollec
tion) { | |
348 prev := []byte{} // intentionally make a non-nil slice, gkvlite
hates nil. | |
349 irg.permute(func(data []byte) { | |
350 buf := bytes.NewBuffer(make([]byte, 0, len(prefix)+len(d
ata)+len(keyData))) | |
351 buf.Write(prefix) | |
352 buf.Write(data) | |
353 buf.Write(keyData) | |
354 ents.Set(buf.Bytes(), prev) | |
355 prev = data | |
356 }) | |
357 } | |
358 | |
359 mtch := matcher{} | |
360 for _, idx := range idxs { | |
361 if irg, ok := mtch.match(idx, m); ok { | |
362 idxEnts := getIdxEnts(idx) | |
363 if len(irg.propVec) == 0 { | |
364 idxEnts.Set(keyData, []byte{}) // propless index
, e.g. kind -> key = nil | |
365 } else if idx.ancestor { | |
366 for ancKey := k; ancKey != nil; ancKey = ancKey.
Parent() { | |
367 buf := &bytes.Buffer{} | |
368 writeKey(buf, noNS, ancKey) | |
369 walkPermutations(buf.Bytes(), irg, idxEn
ts) | |
370 } | |
371 } else { | |
372 walkPermutations(nil, irg, idxEnts) | |
373 } | |
374 } | |
375 } | |
376 | |
377 return ret, nil | |
378 } | |
379 | |
380 func (pl *propertyList) indexEntriesWithBuiltins(k *datastore.Key, complexIdxs [
]*qIndex) (ret *memStore, err error) { | |
381 c, err := pl.collate() | |
382 if err == nil { | |
383 ret, err = c.indexEntries(k, append(c.defaultIndicies(k.Kind()),
complexIdxs...)) | |
384 } | |
385 return | |
386 } | |
387 | |
388 func (pl *propertyList) collate() (collatedProperties, error) { | |
389 if pl == nil || len(*pl) == 0 { | |
390 return nil, nil | |
391 } | |
392 | |
393 cols := []*pvals{} | |
394 colIdx := map[string]int{} | |
395 | |
396 for _, p := range *pl { | |
397 if idx, ok := colIdx[p.Name]; ok { | |
398 c := cols[idx] | |
399 td, err := newTypData(p.NoIndex, p.Value) | |
400 if err != nil { | |
401 return nil, err | |
402 } | |
403 c.vals = append(c.vals, td) | |
404 } else { | |
405 colIdx[p.Name] = len(cols) | |
406 td, err := newTypData(p.NoIndex, p.Value) | |
407 if err != nil { | |
408 return nil, err | |
409 } | |
410 cols = append(cols, &pvals{p.Name, []*typData{td}}) | |
411 } | |
412 } | |
413 | |
414 return cols, nil | |
415 } | |
416 | |
417 func (pl *propertyList) addCollated(pv *pvals) { | |
418 for _, v := range pv.vals { | |
419 *pl = append(*pl, datastore.Property{ | |
420 Name: pv.name, | |
421 Multiple: len(pv.vals) > 1, | |
422 NoIndex: v.noIndex, | |
423 Value: v.data, | |
424 }) | |
425 } | |
426 } | |
427 | |
428 func updateIndicies(store *memStore, key *datastore.Key, oldEnt, newEnt *propert
yList) error { | |
429 var err error | |
430 | |
431 idxColl := store.GetCollection("idx") | |
432 if idxColl == nil { | |
433 idxColl = store.SetCollection("idx", nil) | |
434 } | |
435 | |
436 // load all current complex query index definitions. | |
437 compIdx := []*qIndex{} | |
438 idxColl.VisitItemsAscend(complexQueryPrefix, false, func(i *gkvlite.Item
) bool { | |
439 if !bytes.HasPrefix(i.Key, complexQueryPrefix) { | |
440 return false | |
441 } | |
442 qi := &qIndex{} | |
443 if err = qi.ReadBinary(bytes.NewBuffer(i.Key)); err != nil { | |
444 return false | |
445 } | |
446 compIdx = append(compIdx, qi) | |
447 return true | |
448 }) | |
449 if err != nil { | |
450 return err | |
451 } | |
452 | |
453 oldIdx, err := oldEnt.indexEntriesWithBuiltins(key, compIdx) | |
454 if err != nil { | |
455 return err | |
456 } | |
457 | |
458 newIdx, err := newEnt.indexEntriesWithBuiltins(key, compIdx) | |
459 if err != nil { | |
460 return err | |
461 } | |
462 | |
463 prefix := "idx:" + key.Namespace() + ":" | |
464 | |
465 gkvCollide(oldIdx.GetCollection("idx"), newIdx.GetCollection("idx"), fun
c(k, ov, nv []byte) { | |
466 ks := prefix + string(k) | |
467 idxColl.Set(k, []byte{}) | |
468 | |
469 coll := store.GetCollection(ks) | |
470 if coll == nil { | |
471 coll = store.SetCollection(ks, nil) | |
472 } | |
473 oldColl := oldIdx.GetCollection(ks) | |
474 newColl := newIdx.GetCollection(ks) | |
475 | |
476 switch { | |
477 case ov == nil && nv != nil: // all additions | |
478 newColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite
m) bool { | |
479 coll.Set(i.Key, i.Val) | |
480 return true | |
481 }) | |
482 case ov != nil && nv == nil: // all deletions | |
483 oldColl.VisitItemsAscend(nil, false, func(i *gkvlite.Ite
m) bool { | |
484 coll.Delete(i.Key) | |
485 return true | |
486 }) | |
487 case ov != nil && nv != nil: // merge | |
488 gkvCollide(oldColl, newColl, func(k, ov, nv []byte) { | |
489 if nv == nil { | |
490 coll.Delete(k) | |
491 } else { | |
492 coll.Set(k, nv) | |
493 } | |
494 }) | |
495 default: | |
496 panic("impossible") | |
497 } | |
498 // TODO(riannucci): remove entries from idxColl and remove index
collections | |
499 // when there are no index entries for that index any more. | |
500 }) | |
501 | |
502 return nil | |
503 } | |
504 | |
505 func (pl *propertyList) MarshalBinary() ([]byte, error) { | |
506 cols, err := pl.collate() | |
507 if err != nil || len(cols) == 0 { | |
508 return nil, err | |
509 } | |
510 | |
511 pieces := make([][]byte, 0, len(*pl)*2+1) | |
512 for _, pv := range cols { | |
513 // TODO(riannucci): estimate buffer size better. | |
514 buf := bytes.NewBuffer(make([]byte, 0, cmpbin.MaxIntLen64+len(pv
.name))) | |
515 writeString(buf, pv.name) | |
516 err := pv.WriteBinary(buf) | |
517 if err != nil { | |
518 return nil, err | |
519 } | |
520 pieces = append(pieces, buf.Bytes()) | |
521 } | |
522 return bytes.Join(pieces, nil), nil | |
523 } | |
524 | |
525 func (pl *propertyList) UnmarshalBinary(data []byte) error { | |
526 buf := bytes.NewBuffer(data) | |
527 for buf.Len() > 0 { | |
528 name, err := readString(buf) | |
529 if err != nil { | |
530 return err | |
531 } | |
532 | |
533 pv := &pvals{name: name} | |
534 err = pv.ReadBinary(buf) | |
535 if err != nil { | |
536 return err | |
537 } | |
538 pl.addCollated(pv) | |
539 } | |
540 | |
541 return nil | |
542 } | |
543 | |
544 func toPL(src interface{}) (ret *propertyList, err error) { | |
545 propchan := make(chan datastore.Property) | |
546 ret = &propertyList{} | |
547 go func() { err = datastore.SaveStruct(src, propchan) }() | |
548 err2 := ret.Load(propchan) | |
549 if err != nil { | |
550 return | |
551 } | |
552 return ret, err2 | |
553 } | |
554 | |
555 func fromPL(props *propertyList, dst interface{}) (err error) { | |
556 propchan := make(chan datastore.Property) | |
557 go func() { err = props.Save(propchan) }() | |
558 err2 := datastore.LoadStruct(dst, propchan) | |
559 if err != nil { | |
560 return err | |
561 } | |
562 return err2 | |
563 } | |
564 | |
565 type propValType byte | |
566 | |
567 var byteSliceType = reflect.TypeOf([]byte(nil)) | |
568 | |
569 // These constants are in the order described by | |
570 // https://cloud.google.com/appengine/docs/go/datastore/entities#Go_Value_type
_ordering | |
571 // with a slight divergence for the Int/Time split. | |
572 // NOTE: this enum can only occupy 7 bits, because we use the high bit to encode | |
573 // indexed/non-indexed. See typData.WriteBinary. | |
574 const ( | |
575 pvNull propValType = iota | |
576 pvInt | |
577 | |
578 // NOTE: this is a slight divergence; times and integers actually sort | |
579 // together (apparently?) in datastore. This is probably insane, and I d
on't | |
580 // want to add the complexity of field 'meaning' as a sparate concept fr
om the | |
581 // field's 'type' (which is what datastore seems to do, judging from the | |
582 // protobufs). So if you're here because you implemented an app which re
lies | |
583 // on time.Time and int64 sorting together, then this is why your app ac
ts | |
584 // differently in production. My advice is to NOT DO THAT. If you really
want | |
585 // this (and you probably don't), you should take care of the time.Time
<-> | |
586 // int64 conversion in your app and just use a property type of int64. | |
587 pvTime | |
588 | |
589 // NOTE: this is also a slight divergence, but not a semantic one. IIUC,
in | |
590 // datastore 'bool' is actually the type and the value is either 0 or | |
591 // 1 (taking another byte to store). Since we have plenty of space in th
is | |
592 // type byte, I just merge the value into the type for booleans. If this | |
593 // becomes problematic, consider changing this to just pvBool, and then | |
594 // encoding a 0 or 1 as a byte in the relevant marshalling routines. | |
595 pvBoolFalse | |
596 pvBoolTrue | |
597 pvBytes // []byte or datastore.ByteString | |
598 pvStr // string or string noindex | |
599 pvFloat | |
600 pvGeoPoint | |
601 | |
602 // These two are problematic, because they force us to bind to the appen
gine | |
603 // SDK code. If we can drop support for these and turn them into hard er
rors, | |
604 // that could let us decouple from the various appengine SDKs. Maybe. | |
605 pvKey // TODO(riannucci): remove support for this (use a string) | |
606 pvBlobKey // TODO(riannucci): remove support for this (use a string) | |
607 | |
608 pvUNKNOWN | |
609 ) | |
610 | |
611 func (p *pvals) ReadBinary(buf *bytes.Buffer) error { | |
612 n, _, err := cmpbin.ReadUint(buf) | |
613 if err != nil { | |
614 return err | |
615 } | |
616 | |
617 p.vals = make([]*typData, n) | |
618 for i := range p.vals { | |
619 p.vals[i] = &typData{} | |
620 err := p.vals[i].ReadBinary(buf, withNS, "") | |
621 if err != nil { | |
622 return err | |
623 } | |
624 } | |
625 | |
626 return nil | |
627 } | |
628 | |
629 func (p *pvals) WriteBinary(buf *bytes.Buffer) error { | |
630 cmpbin.WriteUint(buf, uint64(len(p.vals))) | |
631 for _, v := range p.vals { | |
632 if err := v.WriteBinary(buf, withNS); err != nil { | |
633 return err | |
634 } | |
635 } | |
636 return nil | |
637 } | |
OLD | NEW |