| Index: common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
|
| diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go b/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
|
| index f5aea5fdbb4dc37cfe107dfa1e19131160090a30..631b9749b089a91a19e1c2d46a73f9a188c7eef4 100644
|
| --- a/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
|
| +++ b/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
|
| @@ -5,15 +5,12 @@
|
| package ackbuffer
|
|
|
| import (
|
| - "sort"
|
| + "fmt"
|
| "sync"
|
| "testing"
|
| - "time"
|
|
|
| - "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/errors"
|
| - "github.com/luci/luci-go/common/stringset"
|
| "golang.org/x/net/context"
|
|
|
| . "github.com/smartystreets/goconvey/convey"
|
| @@ -23,11 +20,16 @@ type testACK struct {
|
| sync.Mutex
|
|
|
| err error
|
| - acks stringset.Set
|
| + ackC chan []string
|
| + acks []string
|
| batchSize int
|
| }
|
|
|
| func (ps *testACK) Ack(c context.Context, acks ...string) error {
|
| + if ps.ackC != nil {
|
| + ps.ackC <- acks
|
| + }
|
| +
|
| ps.Lock()
|
| defer ps.Unlock()
|
|
|
| @@ -35,11 +37,8 @@ func (ps *testACK) Ack(c context.Context, acks ...string) error {
|
| return ps.err
|
| }
|
|
|
| - if ps.acks == nil {
|
| - ps.acks = stringset.New(0)
|
| - }
|
| for _, ack := range acks {
|
| - ps.acks.Add(ack)
|
| + ps.acks = append(ps.acks, ack)
|
| }
|
| return nil
|
| }
|
| @@ -56,12 +55,8 @@ func (ps *testACK) ackIDs() []string {
|
| ps.Lock()
|
| defer ps.Unlock()
|
|
|
| - v := make([]string, 0, ps.acks.Len())
|
| - ps.acks.Iter(func(s string) bool {
|
| - v = append(v, s)
|
| - return true
|
| - })
|
| - sort.Strings(v)
|
| + v := make([]string, len(ps.acks))
|
| + copy(v, ps.acks)
|
| return v
|
| }
|
|
|
| @@ -102,37 +97,61 @@ func TestAckBuffer(t *testing.T) {
|
| }
|
| defer closeOnce()
|
|
|
| - Convey(`Can send ACKs.`, func() {
|
| + Convey(`Will buffer ACKs until enough are sent.`, func() {
|
| + ps.ackC = make(chan []string)
|
| + acks := make([]string, ps.AckBatchSize())
|
| +
|
| + // Fill up the entire batch, which will cause an automatic dump.
|
| + for i := range acks {
|
| + acks[i] = fmt.Sprintf("%d", i)
|
| + ab.Ack(acks[i])
|
| + }
|
| + <-ps.ackC
|
| +
|
| + So(ps.ackIDs(), ShouldResemble, acks)
|
| + So(discarded, ShouldBeNil)
|
| + })
|
| +
|
| + Convey(`Will buffer ACKs and send if time has expired.`, func() {
|
| + ps.ackC = make(chan []string)
|
| + ab.ackReceivedC = make(chan string)
|
| +
|
| acks := []string{"foo", "bar", "baz"}
|
| for _, v := range acks {
|
| ab.Ack(v)
|
| +
|
| + // Acknoweldge that all ACKs have been received before we advance
|
| + // our timer. This will ensure that the timer triggers AFTER the ACKs
|
| + // are buffered.
|
| + <-ab.ackReceivedC
|
| }
|
| tc.Add(DefaultMaxBufferTime)
|
|
|
| - closeOnce()
|
| - sort.Strings(acks)
|
| + <-ps.ackC
|
| So(ps.ackIDs(), ShouldResemble, acks)
|
| So(discarded, ShouldBeNil)
|
| })
|
|
|
| - Convey(`Will retry on transient Pub/Sub error`, func() {
|
| - tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
|
| - tc.Add(d)
|
| - })
|
| + Convey(`Will flush any remaining ACKs on close.`, func() {
|
| + acks := []string{"foo", "bar", "baz"}
|
| + for _, v := range acks {
|
| + ab.Ack(v)
|
| + }
|
| + closeOnce()
|
| +
|
| + So(ps.ackIDs(), ShouldResemble, acks)
|
| + So(discarded, ShouldBeNil)
|
| + })
|
|
|
| + Convey(`Will discard the ACK if it could not be sent`, func() {
|
| ps.err = errors.WrapTransient(errors.New("test error"))
|
| acks := []string{"foo", "bar", "baz"}
|
| for _, v := range acks {
|
| ab.Ack(v)
|
| }
|
| + closeOnce()
|
|
|
| - Convey(`And eventually discard the ACK.`, func() {
|
| - closeOnce()
|
| -
|
| - sort.Strings(acks)
|
| - sort.Strings(discarded)
|
| - So(discarded, ShouldResemble, acks)
|
| - })
|
| + So(discarded, ShouldResemble, acks)
|
| })
|
| })
|
| })
|
|
|