OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "bytes" | 8 "bytes" |
9 "runtime" | 9 "runtime" |
10 "sync" | |
11 | 10 |
12 "github.com/luci/gae/service/datastore" | 11 "github.com/luci/gae/service/datastore" |
13 "github.com/luci/gkvlite" | 12 "github.com/luci/gkvlite" |
14 ) | 13 ) |
15 | 14 |
16 func gkvCollide(o, n memCollection, f func(k, ov, nv []byte)) { | 15 func gkvCollide(o, n memCollection, f func(k, ov, nv []byte)) { |
17 if o != nil && !o.IsReadOnly() { | 16 if o != nil && !o.IsReadOnly() { |
18 panic("old collection is r/w") | 17 panic("old collection is r/w") |
19 } | 18 } |
20 if n != nil && !n.IsReadOnly() { | 19 if n != nil && !n.IsReadOnly() { |
21 panic("new collection is r/w") | 20 panic("new collection is r/w") |
22 } | 21 } |
23 | 22 |
24 // TODO(riannucci): reimplement in terms of *iterator. | 23 // TODO(riannucci): reimplement in terms of *iterator. |
25 oldItems, newItems := make(chan *gkvlite.Item), make(chan *gkvlite.Item) | 24 oldItems, newItems := make(chan *gkvlite.Item), make(chan *gkvlite.Item) |
26 » walker := func(c memCollection, ch chan<- *gkvlite.Item, wg *sync.WaitGr
oup) { | 25 » walker := func(c memCollection, ch chan<- *gkvlite.Item) { |
27 defer close(ch) | 26 defer close(ch) |
28 defer wg.Done() | |
29 if c != nil { | 27 if c != nil { |
30 c.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool
{ | 28 c.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool
{ |
31 ch <- i | 29 ch <- i |
32 return true | 30 return true |
33 }) | 31 }) |
34 } | 32 } |
35 } | 33 } |
36 | 34 |
37 » wg := &sync.WaitGroup{} | 35 » go walker(o, oldItems) |
38 » wg.Add(2) | 36 » go walker(n, newItems) |
39 » go walker(o, oldItems, wg) | |
40 » go walker(n, newItems, wg) | |
41 | 37 |
42 l, r := <-oldItems, <-newItems | 38 l, r := <-oldItems, <-newItems |
43 for { | 39 for { |
44 » » if l == nil && r == nil { | 40 » » switch { |
45 » » » break | 41 » » case l == nil && r == nil: |
46 » » } | 42 » » » return |
47 | 43 |
48 » » if l == nil { | 44 » » case l == nil: |
49 f(r.Key, nil, r.Val) | 45 f(r.Key, nil, r.Val) |
50 r = <-newItems | 46 r = <-newItems |
51 » » } else if r == nil { | 47 |
| 48 » » case r == nil: |
52 f(l.Key, l.Val, nil) | 49 f(l.Key, l.Val, nil) |
53 l = <-oldItems | 50 l = <-oldItems |
54 » » } else { | 51 |
| 52 » » default: |
55 switch bytes.Compare(l.Key, r.Key) { | 53 switch bytes.Compare(l.Key, r.Key) { |
56 case -1: // l < r | 54 case -1: // l < r |
57 f(l.Key, l.Val, nil) | 55 f(l.Key, l.Val, nil) |
58 l = <-oldItems | 56 l = <-oldItems |
59 case 0: // l == r | 57 case 0: // l == r |
60 f(l.Key, l.Val, r.Val) | 58 f(l.Key, l.Val, r.Val) |
61 l, r = <-oldItems, <-newItems | 59 l, r = <-oldItems, <-newItems |
62 case 1: // l > r | 60 case 1: // l > r |
63 f(r.Key, nil, r.Val) | 61 f(r.Key, nil, r.Val) |
64 r = <-newItems | 62 r = <-newItems |
65 } | 63 } |
66 } | 64 } |
67 } | 65 } |
68 wg.Wait() | |
69 } | 66 } |
70 | 67 |
71 // memStore is a gkvlite.Store which will panic for anything which might | 68 // memStore is a gkvlite.Store which will panic for anything which might |
72 // otherwise return an error. | 69 // otherwise return an error. |
73 // | 70 // |
74 // This is reasonable for in-memory Store objects, since the only errors that | 71 // This is reasonable for in-memory Store objects, since the only errors that |
75 // should occur happen with file IO on the underlying file (which of course | 72 // should occur happen with file IO on the underlying file (which of course |
76 // doesn't exist). | 73 // doesn't exist). |
77 type memStore interface { | 74 type memStore interface { |
78 datastore.TestingSnapshot | 75 datastore.TestingSnapshot |
(...skipping 113 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
192 } | 189 } |
193 err := mc.c.VisitItemsAscend(target, withValue, visitor) | 190 err := mc.c.VisitItemsAscend(target, withValue, visitor) |
194 memoryCorruption(err) | 191 memoryCorruption(err) |
195 } | 192 } |
196 | 193 |
197 func (mc *memCollectionImpl) GetTotals() (numItems, numBytes uint64) { | 194 func (mc *memCollectionImpl) GetTotals() (numItems, numBytes uint64) { |
198 numItems, numBytes, err := mc.c.GetTotals() | 195 numItems, numBytes, err := mc.c.GetTotals() |
199 memoryCorruption(err) | 196 memoryCorruption(err) |
200 return | 197 return |
201 } | 198 } |
OLD | NEW |