| Index: common/gcloud/gcps/subscriber/subscriber_test.go
|
| diff --git a/common/gcloud/gcps/subscriber/subscriber_test.go b/common/gcloud/gcps/subscriber/subscriber_test.go
|
| index 59d589a6fb3de69fbcc575dbd16e187844a50136..82270f7a71b67dcdfdc1a1a6f2660ba856c39fbe 100644
|
| --- a/common/gcloud/gcps/subscriber/subscriber_test.go
|
| +++ b/common/gcloud/gcps/subscriber/subscriber_test.go
|
| @@ -8,6 +8,7 @@ import (
|
| "fmt"
|
| "sort"
|
| "sync"
|
| + "sync/atomic"
|
| "testing"
|
| "time"
|
|
|
| @@ -18,45 +19,86 @@ import (
|
| "golang.org/x/net/context"
|
| "google.golang.org/cloud/pubsub"
|
|
|
| + . "github.com/luci/luci-go/common/testing/assertions"
|
| . "github.com/smartystreets/goconvey/convey"
|
| )
|
|
|
| -type testPubSub struct {
|
| - c context.Context
|
| - errCB func() error
|
| - sub gcps.Subscription
|
| - msgC chan *pubsub.Message
|
| +type event struct {
|
| + msg *pubsub.Message
|
| + err error
|
| }
|
|
|
| -func (ps *testPubSub) Pull(s gcps.Subscription, amount int) ([]*pubsub.Message, error) {
|
| - if ps.errCB != nil {
|
| - if err := ps.errCB(); err != nil {
|
| - return nil, err
|
| +type testSource struct {
|
| + sub gcps.Subscription
|
| + eventC chan event
|
| +}
|
| +
|
| +func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) {
|
| + select {
|
| + case <-c.Done():
|
| + return nil, c.Err()
|
| +
|
| + case e := <-s.eventC:
|
| + switch {
|
| + case e.err != nil:
|
| + return nil, e.err
|
| +
|
| + case e.msg != nil:
|
| + return []*pubsub.Message{e.msg}, nil
|
| +
|
| + default:
|
| + return nil, nil
|
| }
|
| }
|
| - if ps.sub != s {
|
| - return nil, fmt.Errorf("unknown subscription %q", s)
|
| - }
|
| - if amount <= 0 {
|
| - return nil, fmt.Errorf("invalid amount: %d", amount)
|
| +}
|
| +
|
| +func (s *testSource) message(id ...string) {
|
| + for _, v := range id {
|
| + if v != "" {
|
| + s.eventC <- event{msg: &pubsub.Message{
|
| + ID: v,
|
| + AckID: v,
|
| + Data: []byte(v),
|
| + }}
|
| + } else {
|
| + s.eventC <- event{}
|
| + }
|
| }
|
| +}
|
|
|
| - select {
|
| - case <-ps.c.Done():
|
| - return nil, ps.c.Err()
|
| +func (s *testSource) error(err error) {
|
| + s.eventC <- event{err: err}
|
| +}
|
| +
|
| +type testACK struct {
|
| + sync.Mutex
|
| +
|
| + acks map[string]struct{}
|
| +}
|
|
|
| - case msg := <-ps.msgC:
|
| - return []*pubsub.Message{msg}, nil
|
| +func (a *testACK) Ack(id string) {
|
| + a.Lock()
|
| + defer a.Unlock()
|
| +
|
| + if a.acks == nil {
|
| + a.acks = make(map[string]struct{})
|
| }
|
| + a.acks[id] = struct{}{}
|
| }
|
|
|
| -func (ps *testPubSub) send(s ...string) {
|
| - for _, v := range s {
|
| - ps.msgC <- &pubsub.Message{
|
| - ID: v,
|
| - Data: []byte(v),
|
| - }
|
| +func (a *testACK) getACKs() []string {
|
| + a.Lock()
|
| + defer a.Unlock()
|
| + return dumpStringSet(a.acks)
|
| +}
|
| +
|
| +func dumpStringSet(s map[string]struct{}) []string {
|
| + v := make([]string, 0, len(s))
|
| + for a := range s {
|
| + v = append(v, a)
|
| }
|
| + sort.Strings(v)
|
| + return v
|
| }
|
|
|
| func TestSubscriber(t *testing.T) {
|
| @@ -69,82 +111,86 @@ func TestSubscriber(t *testing.T) {
|
| c, cancelFunc := context.WithCancel(c)
|
| defer cancelFunc()
|
|
|
| - ps := &testPubSub{
|
| - c: c,
|
| - sub: gcps.Subscription("testsub"),
|
| - msgC: make(chan *pubsub.Message),
|
| + src := &testSource{
|
| + eventC: make(chan event),
|
| }
|
| + ack := &testACK{}
|
| s := Subscriber{
|
| - PubSub: ps,
|
| - Subscription: ps.sub,
|
| + S: src,
|
| + A: ack,
|
| + Workers: 8,
|
| }
|
|
|
| - var received []string
|
| - var receivedMu sync.Mutex
|
| - cb := func(msg *pubsub.Message) {
|
| - receivedMu.Lock()
|
| - defer receivedMu.Unlock()
|
| - received = append(received, msg.ID)
|
| - }
|
| -
|
| - // If a subscriber goroutine is sleeping, advance to wake it.
|
| - tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
|
| - tc.Add(d)
|
| - })
|
| -
|
| - Convey(`A Subscriber can pull messages.`, func() {
|
| + var seenMu sync.Mutex
|
| + seen := map[string]struct{}{}
|
| + blacklist := map[string]struct{}{}
|
| + runWith := func(cb func()) {
|
| doneC := make(chan struct{})
|
| go func() {
|
| defer close(doneC)
|
| - s.Run(c, cb)
|
| + s.Run(c, func(msg *pubsub.Message) bool {
|
| + seenMu.Lock()
|
| + defer seenMu.Unlock()
|
| + seen[msg.ID] = struct{}{}
|
| +
|
| + _, ok := blacklist[msg.ID]
|
| + return !ok
|
| + })
|
| }()
|
|
|
| - var msgs []string
|
| - for i := 0; i < 1024; i++ {
|
| - msgs = append(msgs, fmt.Sprintf("%08x", i))
|
| - }
|
| - ps.send(msgs...)
|
| -
|
| + cb()
|
| cancelFunc()
|
| <-doneC
|
| + }
|
|
|
| - sort.Strings(msgs)
|
| - sort.Strings(received)
|
| - So(received, ShouldResemble, msgs)
|
| - })
|
| + Convey(`A Subscriber can pull and ACK messages.`, func() {
|
| + var msgs []string
|
| + runWith(func() {
|
| + for i := 0; i < 1024; i++ {
|
| + v := fmt.Sprintf("%08x", i)
|
| + msgs = append(msgs, v)
|
| + src.message(v)
|
| + }
|
| + })
|
|
|
| - Convey(`A Subscriber will continue retrying if there is a Pub/Sub error.`, func() {
|
| - var errMu sync.Mutex
|
| - count := 0
|
| - ps.errCB = func() error {
|
| - errMu.Lock()
|
| - defer errMu.Unlock()
|
| + So(dumpStringSet(seen), ShouldResembleV, msgs)
|
| + So(ack.getACKs(), ShouldResembleV, msgs)
|
| + })
|
|
|
| - if count < 1024 {
|
| - count++
|
| - return errors.WrapTransient(errors.New("test error"))
|
| + Convey(`A Subscriber that encounters an empty message set will sleep and try again.`, func() {
|
| + var count int32
|
| + tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
|
| + if atomic.AddInt32(&count, 1) > 1 {
|
| + panic("should not have this many callbacks")
|
| }
|
| - return nil
|
| - }
|
| + tc.Add(d)
|
| + })
|
|
|
| - doneC := make(chan struct{})
|
| - go func() {
|
| - defer close(doneC)
|
| - s.Run(c, cb)
|
| - }()
|
| + runWith(func() {
|
| + src.message("a", "b", "", "c", "d")
|
| + })
|
|
|
| - var msgs []string
|
| - for i := 0; i < 1024; i++ {
|
| - msgs = append(msgs, fmt.Sprintf("%08x", i))
|
| - }
|
| - ps.send(msgs...)
|
| + So(dumpStringSet(seen), ShouldResembleV, []string{"a", "b", "c", "d"})
|
| + So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c", "d"})
|
| + })
|
|
|
| - cancelFunc()
|
| - <-doneC
|
| + Convey(`A Subscriber that encounters a Source error will sleep and try again.`, func() {
|
| + var count int32
|
| + tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
|
| + if atomic.AddInt32(&count, 1) > 1 {
|
| + panic("should not have this many callbacks")
|
| + }
|
| + tc.Add(d)
|
| + })
|
| +
|
| + runWith(func() {
|
| + src.message("a", "b")
|
| + src.error(errors.New("test error"))
|
| + src.message("c", "d")
|
| + })
|
|
|
| - sort.Strings(msgs)
|
| - sort.Strings(received)
|
| - So(received, ShouldResemble, msgs)
|
| + So(dumpStringSet(seen), ShouldResembleV, []string{"a", "b", "c", "d"})
|
| + So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c", "d"})
|
| })
|
| })
|
| }
|
|
|