| Index: common/gcloud/gcps/ackbuffer/ackbuffer_test.go
|
| diff --git a/common/gcloud/gcps/ackbuffer/ackbuffer_test.go b/common/gcloud/gcps/ackbuffer/ackbuffer_test.go
|
| index 25d02014bc705b93350ba3cbde105661a3f5c6ce..f5aea5fdbb4dc37cfe107dfa1e19131160090a30 100644
|
| --- a/common/gcloud/gcps/ackbuffer/ackbuffer_test.go
|
| +++ b/common/gcloud/gcps/ackbuffer/ackbuffer_test.go
|
| @@ -5,7 +5,6 @@
|
| package ackbuffer
|
|
|
| import (
|
| - "fmt"
|
| "sort"
|
| "sync"
|
| "testing"
|
| @@ -14,22 +13,21 @@ import (
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/errors"
|
| - "github.com/luci/luci-go/common/gcloud/gcps"
|
| "github.com/luci/luci-go/common/stringset"
|
| "golang.org/x/net/context"
|
|
|
| . "github.com/smartystreets/goconvey/convey"
|
| )
|
|
|
| -type testPubSub struct {
|
| +type testACK struct {
|
| sync.Mutex
|
|
|
| - err error
|
| - sub gcps.Subscription
|
| - acks stringset.Set
|
| + err error
|
| + acks stringset.Set
|
| + batchSize int
|
| }
|
|
|
| -func (ps *testPubSub) Ack(s gcps.Subscription, acks ...string) error {
|
| +func (ps *testACK) Ack(c context.Context, acks ...string) error {
|
| ps.Lock()
|
| defer ps.Unlock()
|
|
|
| @@ -37,10 +35,6 @@ func (ps *testPubSub) Ack(s gcps.Subscription, acks ...string) error {
|
| return ps.err
|
| }
|
|
|
| - if s != ps.sub {
|
| - return fmt.Errorf("unknown subscription %q", s)
|
| - }
|
| -
|
| if ps.acks == nil {
|
| ps.acks = stringset.New(0)
|
| }
|
| @@ -50,7 +44,15 @@ func (ps *testPubSub) Ack(s gcps.Subscription, acks ...string) error {
|
| return nil
|
| }
|
|
|
| -func (ps *testPubSub) ackIDs() []string {
|
| +func (ps *testACK) AckBatchSize() int {
|
| + size := ps.batchSize
|
| + if size <= 0 {
|
| + size = 4
|
| + }
|
| + return size
|
| +}
|
| +
|
| +func (ps *testACK) ackIDs() []string {
|
| ps.Lock()
|
| defer ps.Unlock()
|
|
|
| @@ -69,16 +71,13 @@ func TestAckBuffer(t *testing.T) {
|
| Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() {
|
| c := context.Background()
|
| c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
|
| - ps := &testPubSub{
|
| - sub: gcps.Subscription("testsub"),
|
| - }
|
| + ps := &testACK{}
|
|
|
| var discarded []string
|
| var discardedMu sync.Mutex
|
|
|
| cfg := Config{
|
| - PubSub: ps,
|
| - Subscription: ps.sub,
|
| + Ack: ps,
|
| DiscardCallback: func(acks []string) {
|
| discardedMu.Lock()
|
| defer discardedMu.Unlock()
|
|
|