Index: common/gcloud/pubsub/ackbuffer/ackbuffer_test.go |
diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go b/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go |
index f5aea5fdbb4dc37cfe107dfa1e19131160090a30..631b9749b089a91a19e1c2d46a73f9a188c7eef4 100644 |
--- a/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go |
+++ b/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go |
@@ -5,15 +5,12 @@ |
package ackbuffer |
import ( |
- "sort" |
+ "fmt" |
"sync" |
"testing" |
- "time" |
- "github.com/luci/luci-go/common/clock" |
"github.com/luci/luci-go/common/clock/testclock" |
"github.com/luci/luci-go/common/errors" |
- "github.com/luci/luci-go/common/stringset" |
"golang.org/x/net/context" |
. "github.com/smartystreets/goconvey/convey" |
@@ -23,11 +20,16 @@ type testACK struct { |
sync.Mutex |
err error |
- acks stringset.Set |
+ ackC chan []string |
+ acks []string |
batchSize int |
} |
func (ps *testACK) Ack(c context.Context, acks ...string) error { |
+ if ps.ackC != nil { |
+ ps.ackC <- acks |
+ } |
+ |
ps.Lock() |
defer ps.Unlock() |
@@ -35,11 +37,8 @@ func (ps *testACK) Ack(c context.Context, acks ...string) error { |
return ps.err |
} |
- if ps.acks == nil { |
- ps.acks = stringset.New(0) |
- } |
for _, ack := range acks { |
- ps.acks.Add(ack) |
+ ps.acks = append(ps.acks, ack) |
} |
return nil |
} |
@@ -56,12 +55,8 @@ func (ps *testACK) ackIDs() []string { |
ps.Lock() |
defer ps.Unlock() |
- v := make([]string, 0, ps.acks.Len()) |
- ps.acks.Iter(func(s string) bool { |
- v = append(v, s) |
- return true |
- }) |
- sort.Strings(v) |
+ v := make([]string, len(ps.acks)) |
+ copy(v, ps.acks) |
return v |
} |
@@ -102,37 +97,61 @@ func TestAckBuffer(t *testing.T) { |
} |
defer closeOnce() |
- Convey(`Can send ACKs.`, func() { |
+ Convey(`Will buffer ACKs until enough are sent.`, func() { |
+ ps.ackC = make(chan []string) |
+ acks := make([]string, ps.AckBatchSize()) |
+ |
+ // Fill up the entire batch, which will cause an automatic dump. |
+ for i := range acks { |
+ acks[i] = fmt.Sprintf("%d", i) |
+ ab.Ack(acks[i]) |
+ } |
+ <-ps.ackC |
+ |
+ So(ps.ackIDs(), ShouldResemble, acks) |
+ So(discarded, ShouldBeNil) |
+ }) |
+ |
+ Convey(`Will buffer ACKs and send if time has expired.`, func() { |
+ ps.ackC = make(chan []string) |
+ ab.ackReceivedC = make(chan string) |
+ |
acks := []string{"foo", "bar", "baz"} |
for _, v := range acks { |
ab.Ack(v) |
+ |
+ // Acknoweldge that all ACKs have been received before we advance |
+ // our timer. This will ensure that the timer triggers AFTER the ACKs |
+ // are buffered. |
+ <-ab.ackReceivedC |
} |
tc.Add(DefaultMaxBufferTime) |
- closeOnce() |
- sort.Strings(acks) |
+ <-ps.ackC |
So(ps.ackIDs(), ShouldResemble, acks) |
So(discarded, ShouldBeNil) |
}) |
- Convey(`Will retry on transient Pub/Sub error`, func() { |
- tc.SetTimerCallback(func(d time.Duration, t clock.Timer) { |
- tc.Add(d) |
- }) |
+ Convey(`Will flush any remaining ACKs on close.`, func() { |
+ acks := []string{"foo", "bar", "baz"} |
+ for _, v := range acks { |
+ ab.Ack(v) |
+ } |
+ closeOnce() |
+ |
+ So(ps.ackIDs(), ShouldResemble, acks) |
+ So(discarded, ShouldBeNil) |
+ }) |
+ Convey(`Will discard the ACK if it could not be sent`, func() { |
ps.err = errors.WrapTransient(errors.New("test error")) |
acks := []string{"foo", "bar", "baz"} |
for _, v := range acks { |
ab.Ack(v) |
} |
+ closeOnce() |
- Convey(`And eventually discard the ACK.`, func() { |
- closeOnce() |
- |
- sort.Strings(acks) |
- sort.Strings(discarded) |
- So(discarded, ShouldResemble, acks) |
- }) |
+ So(discarded, ShouldResemble, acks) |
}) |
}) |
}) |