| Index: common/gcloud/pubsub/subscriber/subscriber_test.go
|
| diff --git a/common/gcloud/pubsub/subscriber/subscriber_test.go b/common/gcloud/pubsub/subscriber/subscriber_test.go
|
| index 5ae1c7fa492745a0b40a0c0099fd6a4aef68a56f..0eb773ad6a6fee9989ff5ac637dc4865a343ee0f 100644
|
| --- a/common/gcloud/pubsub/subscriber/subscriber_test.go
|
| +++ b/common/gcloud/pubsub/subscriber/subscriber_test.go
|
| @@ -31,22 +31,32 @@ type testSource struct {
|
| eventC chan event
|
| }
|
|
|
| -func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) {
|
| +func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, error) {
|
| + var e event
|
| +
|
| select {
|
| case <-c.Done():
|
| - return nil, c.Err()
|
| + // Enforce determinism, preferring events.
|
| + select {
|
| + case e = <-s.eventC:
|
| + break
|
| + default:
|
| + return nil, c.Err()
|
| + }
|
|
|
| - case e := <-s.eventC:
|
| - switch {
|
| - case e.err != nil:
|
| - return nil, e.err
|
| + case e = <-s.eventC:
|
| + break
|
| + }
|
|
|
| - case e.msg != nil:
|
| - return []*pubsub.Message{e.msg}, nil
|
| + switch {
|
| + case e.err != nil:
|
| + return nil, e.err
|
|
|
| - default:
|
| - return nil, nil
|
| - }
|
| + case e.msg != nil:
|
| + return []*pubsub.Message{e.msg}, nil
|
| +
|
| + default:
|
| + return nil, nil
|
| }
|
| }
|
|
|
| @@ -114,11 +124,15 @@ func TestSubscriber(t *testing.T) {
|
| }
|
| ack := &testACK{}
|
| s := Subscriber{
|
| - S: src,
|
| - A: ack,
|
| - PullWorkers: 8,
|
| + S: src,
|
| + A: ack,
|
| + Workers: 8,
|
| }
|
|
|
| + // If not nil, processing goroutines will block on reads from this
|
| + // channel, one per message.
|
| + var pullC chan string
|
| +
|
| var seenMu sync.Mutex
|
| seen := map[string]struct{}{}
|
| blacklist := map[string]struct{}{}
|
| @@ -127,6 +141,10 @@ func TestSubscriber(t *testing.T) {
|
| go func() {
|
| defer close(doneC)
|
| s.Run(c, func(msg *pubsub.Message) bool {
|
| + if pullC != nil {
|
| + pullC <- msg.ID
|
| + }
|
| +
|
| seenMu.Lock()
|
| defer seenMu.Unlock()
|
| seen[msg.ID] = struct{}{}
|
| @@ -143,11 +161,14 @@ func TestSubscriber(t *testing.T) {
|
|
|
| Convey(`A Subscriber can pull and ACK messages.`, func() {
|
| var msgs []string
|
| + pullC = make(chan string)
|
| runWith(func() {
|
| for i := 0; i < 1024; i++ {
|
| v := fmt.Sprintf("%08x", i)
|
| msgs = append(msgs, v)
|
| src.message(v)
|
| +
|
| + <-pullC
|
| }
|
| })
|
|
|
|
|