Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(220)

Unified Diff: common/gcloud/gcps/subscriber/subscriber_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: common/gcloud/gcps/subscriber/subscriber_test.go
diff --git a/common/gcloud/gcps/subscriber/subscriber_test.go b/common/gcloud/gcps/subscriber/subscriber_test.go
index 59d589a6fb3de69fbcc575dbd16e187844a50136..82270f7a71b67dcdfdc1a1a6f2660ba856c39fbe 100644
--- a/common/gcloud/gcps/subscriber/subscriber_test.go
+++ b/common/gcloud/gcps/subscriber/subscriber_test.go
@@ -8,6 +8,7 @@ import (
"fmt"
"sort"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -18,45 +19,86 @@ import (
"golang.org/x/net/context"
"google.golang.org/cloud/pubsub"
+ . "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
)
-type testPubSub struct {
- c context.Context
- errCB func() error
- sub gcps.Subscription
- msgC chan *pubsub.Message
+type event struct {
+ msg *pubsub.Message
+ err error
}
-func (ps *testPubSub) Pull(s gcps.Subscription, amount int) ([]*pubsub.Message, error) {
- if ps.errCB != nil {
- if err := ps.errCB(); err != nil {
- return nil, err
+type testSource struct {
+ sub gcps.Subscription
+ eventC chan event
+}
+
+func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) {
+ select {
+ case <-c.Done():
+ return nil, c.Err()
+
+ case e := <-s.eventC:
+ switch {
+ case e.err != nil:
+ return nil, e.err
+
+ case e.msg != nil:
+ return []*pubsub.Message{e.msg}, nil
+
+ default:
+ return nil, nil
}
}
- if ps.sub != s {
- return nil, fmt.Errorf("unknown subscription %q", s)
- }
- if amount <= 0 {
- return nil, fmt.Errorf("invalid amount: %d", amount)
+}
+
+func (s *testSource) message(id ...string) {
+ for _, v := range id {
+ if v != "" {
+ s.eventC <- event{msg: &pubsub.Message{
+ ID: v,
+ AckID: v,
+ Data: []byte(v),
+ }}
+ } else {
+ s.eventC <- event{}
+ }
}
+}
- select {
- case <-ps.c.Done():
- return nil, ps.c.Err()
+func (s *testSource) error(err error) {
+ s.eventC <- event{err: err}
+}
+
+type testACK struct {
+ sync.Mutex
+
+ acks map[string]struct{}
+}
- case msg := <-ps.msgC:
- return []*pubsub.Message{msg}, nil
+func (a *testACK) Ack(id string) {
+ a.Lock()
+ defer a.Unlock()
+
+ if a.acks == nil {
+ a.acks = make(map[string]struct{})
}
+ a.acks[id] = struct{}{}
}
-func (ps *testPubSub) send(s ...string) {
- for _, v := range s {
- ps.msgC <- &pubsub.Message{
- ID: v,
- Data: []byte(v),
- }
+func (a *testACK) getACKs() []string {
+ a.Lock()
+ defer a.Unlock()
+ return dumpStringSet(a.acks)
+}
+
+func dumpStringSet(s map[string]struct{}) []string {
+ v := make([]string, 0, len(s))
+ for a := range s {
+ v = append(v, a)
}
+ sort.Strings(v)
+ return v
}
func TestSubscriber(t *testing.T) {
@@ -69,82 +111,86 @@ func TestSubscriber(t *testing.T) {
c, cancelFunc := context.WithCancel(c)
defer cancelFunc()
- ps := &testPubSub{
- c: c,
- sub: gcps.Subscription("testsub"),
- msgC: make(chan *pubsub.Message),
+ src := &testSource{
+ eventC: make(chan event),
}
+ ack := &testACK{}
s := Subscriber{
- PubSub: ps,
- Subscription: ps.sub,
+ S: src,
+ A: ack,
+ Workers: 8,
}
- var received []string
- var receivedMu sync.Mutex
- cb := func(msg *pubsub.Message) {
- receivedMu.Lock()
- defer receivedMu.Unlock()
- received = append(received, msg.ID)
- }
-
- // If a subscriber goroutine is sleeping, advance to wake it.
- tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
- tc.Add(d)
- })
-
- Convey(`A Subscriber can pull messages.`, func() {
+ var seenMu sync.Mutex
+ seen := map[string]struct{}{}
+ blacklist := map[string]struct{}{}
+ runWith := func(cb func()) {
doneC := make(chan struct{})
go func() {
defer close(doneC)
- s.Run(c, cb)
+ s.Run(c, func(msg *pubsub.Message) bool {
+ seenMu.Lock()
+ defer seenMu.Unlock()
+ seen[msg.ID] = struct{}{}
+
+ _, ok := blacklist[msg.ID]
+ return !ok
+ })
}()
- var msgs []string
- for i := 0; i < 1024; i++ {
- msgs = append(msgs, fmt.Sprintf("%08x", i))
- }
- ps.send(msgs...)
-
+ cb()
cancelFunc()
<-doneC
+ }
- sort.Strings(msgs)
- sort.Strings(received)
- So(received, ShouldResemble, msgs)
- })
+ Convey(`A Subscriber can pull and ACK messages.`, func() {
+ var msgs []string
+ runWith(func() {
+ for i := 0; i < 1024; i++ {
+ v := fmt.Sprintf("%08x", i)
+ msgs = append(msgs, v)
+ src.message(v)
+ }
+ })
- Convey(`A Subscriber will continue retrying if there is a Pub/Sub error.`, func() {
- var errMu sync.Mutex
- count := 0
- ps.errCB = func() error {
- errMu.Lock()
- defer errMu.Unlock()
+ So(dumpStringSet(seen), ShouldResembleV, msgs)
+ So(ack.getACKs(), ShouldResembleV, msgs)
+ })
- if count < 1024 {
- count++
- return errors.WrapTransient(errors.New("test error"))
+ Convey(`A Subscriber that encounters an empty message set will sleep and try again.`, func() {
+ var count int32
+ tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
+ if atomic.AddInt32(&count, 1) > 1 {
+ panic("should not have this many callbacks")
}
- return nil
- }
+ tc.Add(d)
+ })
- doneC := make(chan struct{})
- go func() {
- defer close(doneC)
- s.Run(c, cb)
- }()
+ runWith(func() {
+ src.message("a", "b", "", "c", "d")
+ })
- var msgs []string
- for i := 0; i < 1024; i++ {
- msgs = append(msgs, fmt.Sprintf("%08x", i))
- }
- ps.send(msgs...)
+ So(dumpStringSet(seen), ShouldResembleV, []string{"a", "b", "c", "d"})
+ So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c", "d"})
+ })
- cancelFunc()
- <-doneC
+ Convey(`A Subscriber that encounters a Source error will sleep and try again.`, func() {
+ var count int32
+ tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
+ if atomic.AddInt32(&count, 1) > 1 {
+ panic("should not have this many callbacks")
+ }
+ tc.Add(d)
+ })
+
+ runWith(func() {
+ src.message("a", "b")
+ src.error(errors.New("test error"))
+ src.message("c", "d")
+ })
- sort.Strings(msgs)
- sort.Strings(received)
- So(received, ShouldResemble, msgs)
+ So(dumpStringSet(seen), ShouldResembleV, []string{"a", "b", "c", "d"})
+ So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c", "d"})
})
})
}

Powered by Google App Engine
This is Rietveld 408576698