Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Unified Diff: common/gcloud/pubsub/ackbuffer/ackbuffer_test.go

Issue 1679023005: Add Context cancellation to clock. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Actually upload the patch. Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ackbuffer.go ('k') | common/gcloud/pubsub/subscriber/subscriber.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
diff --git a/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go b/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
index f5aea5fdbb4dc37cfe107dfa1e19131160090a30..631b9749b089a91a19e1c2d46a73f9a188c7eef4 100644
--- a/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
+++ b/common/gcloud/pubsub/ackbuffer/ackbuffer_test.go
@@ -5,15 +5,12 @@
package ackbuffer
import (
- "sort"
+ "fmt"
"sync"
"testing"
- "time"
- "github.com/luci/luci-go/common/clock"
"github.com/luci/luci-go/common/clock/testclock"
"github.com/luci/luci-go/common/errors"
- "github.com/luci/luci-go/common/stringset"
"golang.org/x/net/context"
. "github.com/smartystreets/goconvey/convey"
@@ -23,11 +20,16 @@ type testACK struct {
sync.Mutex
err error
- acks stringset.Set
+ ackC chan []string
+ acks []string
batchSize int
}
func (ps *testACK) Ack(c context.Context, acks ...string) error {
+ if ps.ackC != nil {
+ ps.ackC <- acks
+ }
+
ps.Lock()
defer ps.Unlock()
@@ -35,11 +37,8 @@ func (ps *testACK) Ack(c context.Context, acks ...string) error {
return ps.err
}
- if ps.acks == nil {
- ps.acks = stringset.New(0)
- }
for _, ack := range acks {
- ps.acks.Add(ack)
+ ps.acks = append(ps.acks, ack)
}
return nil
}
@@ -56,12 +55,8 @@ func (ps *testACK) ackIDs() []string {
ps.Lock()
defer ps.Unlock()
- v := make([]string, 0, ps.acks.Len())
- ps.acks.Iter(func(s string) bool {
- v = append(v, s)
- return true
- })
- sort.Strings(v)
+ v := make([]string, len(ps.acks))
+ copy(v, ps.acks)
return v
}
@@ -102,37 +97,61 @@ func TestAckBuffer(t *testing.T) {
}
defer closeOnce()
- Convey(`Can send ACKs.`, func() {
+ Convey(`Will buffer ACKs until enough are sent.`, func() {
+ ps.ackC = make(chan []string)
+ acks := make([]string, ps.AckBatchSize())
+
+ // Fill up the entire batch, which will cause an automatic dump.
+ for i := range acks {
+ acks[i] = fmt.Sprintf("%d", i)
+ ab.Ack(acks[i])
+ }
+ <-ps.ackC
+
+ So(ps.ackIDs(), ShouldResemble, acks)
+ So(discarded, ShouldBeNil)
+ })
+
+ Convey(`Will buffer ACKs and send if time has expired.`, func() {
+ ps.ackC = make(chan []string)
+ ab.ackReceivedC = make(chan string)
+
acks := []string{"foo", "bar", "baz"}
for _, v := range acks {
ab.Ack(v)
+
+ // Acknoweldge that all ACKs have been received before we advance
+ // our timer. This will ensure that the timer triggers AFTER the ACKs
+ // are buffered.
+ <-ab.ackReceivedC
}
tc.Add(DefaultMaxBufferTime)
- closeOnce()
- sort.Strings(acks)
+ <-ps.ackC
So(ps.ackIDs(), ShouldResemble, acks)
So(discarded, ShouldBeNil)
})
- Convey(`Will retry on transient Pub/Sub error`, func() {
- tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
- tc.Add(d)
- })
+ Convey(`Will flush any remaining ACKs on close.`, func() {
+ acks := []string{"foo", "bar", "baz"}
+ for _, v := range acks {
+ ab.Ack(v)
+ }
+ closeOnce()
+
+ So(ps.ackIDs(), ShouldResemble, acks)
+ So(discarded, ShouldBeNil)
+ })
+ Convey(`Will discard the ACK if it could not be sent`, func() {
ps.err = errors.WrapTransient(errors.New("test error"))
acks := []string{"foo", "bar", "baz"}
for _, v := range acks {
ab.Ack(v)
}
+ closeOnce()
- Convey(`And eventually discard the ACK.`, func() {
- closeOnce()
-
- sort.Strings(acks)
- sort.Strings(discarded)
- So(discarded, ShouldResemble, acks)
- })
+ So(discarded, ShouldResemble, acks)
})
})
})
« no previous file with comments | « common/gcloud/pubsub/ackbuffer/ackbuffer.go ('k') | common/gcloud/pubsub/subscriber/subscriber.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698