| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package ackbuffer | 5 package ackbuffer |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | |
| 9 "sort" | 8 "sort" |
| 10 "sync" | 9 "sync" |
| 11 "testing" | 10 "testing" |
| 12 "time" | 11 "time" |
| 13 | 12 |
| 14 "github.com/luci/luci-go/common/clock" | 13 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/clock/testclock" | 14 "github.com/luci/luci-go/common/clock/testclock" |
| 16 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 17 "github.com/luci/luci-go/common/gcloud/gcps" | |
| 18 "github.com/luci/luci-go/common/stringset" | 16 "github.com/luci/luci-go/common/stringset" |
| 19 "golang.org/x/net/context" | 17 "golang.org/x/net/context" |
| 20 | 18 |
| 21 . "github.com/smartystreets/goconvey/convey" | 19 . "github.com/smartystreets/goconvey/convey" |
| 22 ) | 20 ) |
| 23 | 21 |
| 24 type testPubSub struct { | 22 type testACK struct { |
| 25 sync.Mutex | 23 sync.Mutex |
| 26 | 24 |
| 27 » err error | 25 » err error |
| 28 » sub gcps.Subscription | 26 » acks stringset.Set |
| 29 » acks stringset.Set | 27 » batchSize int |
| 30 } | 28 } |
| 31 | 29 |
| 32 func (ps *testPubSub) Ack(s gcps.Subscription, acks ...string) error { | 30 func (ps *testACK) Ack(c context.Context, acks ...string) error { |
| 33 ps.Lock() | 31 ps.Lock() |
| 34 defer ps.Unlock() | 32 defer ps.Unlock() |
| 35 | 33 |
| 36 if ps.err != nil { | 34 if ps.err != nil { |
| 37 return ps.err | 35 return ps.err |
| 38 } | 36 } |
| 39 | 37 |
| 40 if s != ps.sub { | |
| 41 return fmt.Errorf("unknown subscription %q", s) | |
| 42 } | |
| 43 | |
| 44 if ps.acks == nil { | 38 if ps.acks == nil { |
| 45 ps.acks = stringset.New(0) | 39 ps.acks = stringset.New(0) |
| 46 } | 40 } |
| 47 for _, ack := range acks { | 41 for _, ack := range acks { |
| 48 ps.acks.Add(ack) | 42 ps.acks.Add(ack) |
| 49 } | 43 } |
| 50 return nil | 44 return nil |
| 51 } | 45 } |
| 52 | 46 |
| 53 func (ps *testPubSub) ackIDs() []string { | 47 func (ps *testACK) AckBatchSize() int { |
| 48 » size := ps.batchSize |
| 49 » if size <= 0 { |
| 50 » » size = 4 |
| 51 » } |
| 52 » return size |
| 53 } |
| 54 |
| 55 func (ps *testACK) ackIDs() []string { |
| 54 ps.Lock() | 56 ps.Lock() |
| 55 defer ps.Unlock() | 57 defer ps.Unlock() |
| 56 | 58 |
| 57 v := make([]string, 0, ps.acks.Len()) | 59 v := make([]string, 0, ps.acks.Len()) |
| 58 ps.acks.Iter(func(s string) bool { | 60 ps.acks.Iter(func(s string) bool { |
| 59 v = append(v, s) | 61 v = append(v, s) |
| 60 return true | 62 return true |
| 61 }) | 63 }) |
| 62 sort.Strings(v) | 64 sort.Strings(v) |
| 63 return v | 65 return v |
| 64 } | 66 } |
| 65 | 67 |
| 66 func TestAckBuffer(t *testing.T) { | 68 func TestAckBuffer(t *testing.T) { |
| 67 t.Parallel() | 69 t.Parallel() |
| 68 | 70 |
| 69 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { | 71 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { |
| 70 c := context.Background() | 72 c := context.Background() |
| 71 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | 73 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) |
| 72 » » ps := &testPubSub{ | 74 » » ps := &testACK{} |
| 73 » » » sub: gcps.Subscription("testsub"), | |
| 74 » » } | |
| 75 | 75 |
| 76 var discarded []string | 76 var discarded []string |
| 77 var discardedMu sync.Mutex | 77 var discardedMu sync.Mutex |
| 78 | 78 |
| 79 cfg := Config{ | 79 cfg := Config{ |
| 80 » » » PubSub: ps, | 80 » » » Ack: ps, |
| 81 » » » Subscription: ps.sub, | |
| 82 DiscardCallback: func(acks []string) { | 81 DiscardCallback: func(acks []string) { |
| 83 discardedMu.Lock() | 82 discardedMu.Lock() |
| 84 defer discardedMu.Unlock() | 83 defer discardedMu.Unlock() |
| 85 | 84 |
| 86 discarded = append(discarded, acks...) | 85 discarded = append(discarded, acks...) |
| 87 }, | 86 }, |
| 88 } | 87 } |
| 89 | 88 |
| 90 Convey(`Can instantiate an AckBuffer`, func() { | 89 Convey(`Can instantiate an AckBuffer`, func() { |
| 91 ab := New(c, cfg) | 90 ab := New(c, cfg) |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 131 closeOnce() | 130 closeOnce() |
| 132 | 131 |
| 133 sort.Strings(acks) | 132 sort.Strings(acks) |
| 134 sort.Strings(discarded) | 133 sort.Strings(discarded) |
| 135 So(discarded, ShouldResemble, acks) | 134 So(discarded, ShouldResemble, acks) |
| 136 }) | 135 }) |
| 137 }) | 136 }) |
| 138 }) | 137 }) |
| 139 }) | 138 }) |
| 140 } | 139 } |
| OLD | NEW |