Index: go/src/infra/gae/libs/wrapper/memory/gkvlite_utils.go |
diff --git a/go/src/infra/gae/libs/wrapper/memory/gkvlite_utils.go b/go/src/infra/gae/libs/wrapper/memory/gkvlite_utils.go |
index ea8fb200d9abf1b649d20f440b7e7452feed4834..ec431905f9430758aa11c335f9bddb04e96fa535 100644 |
--- a/go/src/infra/gae/libs/wrapper/memory/gkvlite_utils.go |
+++ b/go/src/infra/gae/libs/wrapper/memory/gkvlite_utils.go |
@@ -5,9 +5,61 @@ |
package memory |
import ( |
+ "bytes" |
+ "sync" |
+ |
"github.com/luci/gkvlite" |
) |
+func gkvCollide(o, n *memCollection, f func(k, ov, nv []byte)) { |
+ oldItems, newItems := make(chan *gkvlite.Item), make(chan *gkvlite.Item) |
+ wg := &sync.WaitGroup{} |
+ wg.Add(2) |
M-A Ruel
2015/05/31 23:03:15
Please do it right above the go statements, e.g. m
iannucci
2015/05/31 23:31:33
Done.
|
+ |
+ walker := func(c *memCollection, ch chan<- *gkvlite.Item) { |
+ defer close(ch) |
+ defer wg.Done() |
+ if c != nil { |
+ c.VisitItemsAscend(nil, true, func(i *gkvlite.Item) bool { |
+ ch <- i |
+ return true |
+ }) |
+ } |
+ } |
+ |
+ go walker(o, oldItems) |
+ go walker(n, newItems) |
+ |
+ l, r := <-oldItems, <-newItems |
+ for { |
+ if l == nil && r == nil { |
+ break |
+ } |
+ |
+ if l == nil { |
+ f(r.Key, nil, r.Val) |
+ r = <-newItems |
+ } else if r == nil { |
+ f(l.Key, l.Val, nil) |
+ l = <-oldItems |
+ } else { |
+ switch bytes.Compare(l.Key, r.Key) { |
+ case -1: // l < r |
+ f(l.Key, l.Val, nil) |
+ l = <-oldItems |
+ case 0: // l == r |
+ f(l.Key, l.Val, r.Val) |
+ l, r = <-oldItems, <-newItems |
+ case 1: // l > r |
+ f(r.Key, nil, r.Val) |
+ r = <-newItems |
+ } |
+ } |
+ |
M-A Ruel
2015/05/31 23:03:16
remove
iannucci
2015/05/31 23:31:33
Done.
|
+ } |
+ wg.Wait() |
M-A Ruel
2015/05/31 23:03:15
In theory if you synchronize with channels you don
iannucci
2015/05/31 23:31:33
yeah, but doesn't the WaitGroup use channels anywa
|
+} |
+ |
// memStore is a gkvlite.Store which will panic for anything which might |
// otherwise return an error. |
// |
@@ -40,6 +92,14 @@ func (ms *memStore) SetCollection(name string, cmp gkvlite.KeyCompare) *memColle |
return (*memCollection)((*gkvlite.Store)(ms).SetCollection(name, cmp)) |
} |
+func (ms *memStore) RemoveCollection(name string) { |
+ (*gkvlite.Store)(ms).RemoveCollection(name) |
+} |
+ |
+func (ms *memStore) GetCollectionNames() []string { |
+ return (*gkvlite.Store)(ms).GetCollectionNames() |
+} |
+ |
// memCollection is a gkvlite.Collection which will panic for anything which |
// might otherwise return an error. |
// |
@@ -56,6 +116,14 @@ func (mc *memCollection) Get(k []byte) []byte { |
return ret |
} |
+func (mc *memCollection) MinItem(withValue bool) *gkvlite.Item { |
+ ret, err := (*gkvlite.Collection)(mc).MinItem(withValue) |
+ if err != nil { |
+ panic(err) |
+ } |
+ return ret |
+} |
+ |
func (mc *memCollection) Set(k, v []byte) { |
if err := (*gkvlite.Collection)(mc).Set(k, v); err != nil { |
panic(err) |
@@ -76,12 +144,6 @@ func (mc *memCollection) VisitItemsAscend(target []byte, withValue bool, visitor |
} |
} |
-func (mc *memCollection) VisitItemsDescend(target []byte, withValue bool, visitor gkvlite.ItemVisitor) { |
- if err := (*gkvlite.Collection)(mc).VisitItemsDescend(target, withValue, visitor); err != nil { |
- panic(err) |
- } |
-} |
- |
func (mc *memCollection) GetTotals() (numItems, numBytes uint64) { |
numItems, numBytes, err := (*gkvlite.Collection)(mc).GetTotals() |
if err != nil { |