Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Unified Diff: common/gcloud/pubsub/subscriber/subscriber_test.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/proto/logdog/logpb/butler.proto » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: common/gcloud/pubsub/subscriber/subscriber_test.go
diff --git a/common/gcloud/pubsub/subscriber/subscriber_test.go b/common/gcloud/pubsub/subscriber/subscriber_test.go
index 5ae1c7fa492745a0b40a0c0099fd6a4aef68a56f..0eb773ad6a6fee9989ff5ac637dc4865a343ee0f 100644
--- a/common/gcloud/pubsub/subscriber/subscriber_test.go
+++ b/common/gcloud/pubsub/subscriber/subscriber_test.go
@@ -31,22 +31,32 @@ type testSource struct {
eventC chan event
}
-func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) {
+func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, error) {
+ var e event
+
select {
case <-c.Done():
- return nil, c.Err()
+ // Enforce determinism, preferring events.
+ select {
+ case e = <-s.eventC:
+ break
+ default:
+ return nil, c.Err()
+ }
- case e := <-s.eventC:
- switch {
- case e.err != nil:
- return nil, e.err
+ case e = <-s.eventC:
+ break
+ }
- case e.msg != nil:
- return []*pubsub.Message{e.msg}, nil
+ switch {
+ case e.err != nil:
+ return nil, e.err
- default:
- return nil, nil
- }
+ case e.msg != nil:
+ return []*pubsub.Message{e.msg}, nil
+
+ default:
+ return nil, nil
}
}
@@ -114,11 +124,15 @@ func TestSubscriber(t *testing.T) {
}
ack := &testACK{}
s := Subscriber{
- S: src,
- A: ack,
- PullWorkers: 8,
+ S: src,
+ A: ack,
+ Workers: 8,
}
+ // If not nil, processing goroutines will block on reads from this
+ // channel, one per message.
+ var pullC chan string
+
var seenMu sync.Mutex
seen := map[string]struct{}{}
blacklist := map[string]struct{}{}
@@ -127,6 +141,10 @@ func TestSubscriber(t *testing.T) {
go func() {
defer close(doneC)
s.Run(c, func(msg *pubsub.Message) bool {
+ if pullC != nil {
+ pullC <- msg.ID
+ }
+
seenMu.Lock()
defer seenMu.Unlock()
seen[msg.ID] = struct{}{}
@@ -143,11 +161,14 @@ func TestSubscriber(t *testing.T) {
Convey(`A Subscriber can pull and ACK messages.`, func() {
var msgs []string
+ pullC = make(chan string)
runWith(func() {
for i := 0; i < 1024; i++ {
v := fmt.Sprintf("%08x", i)
msgs = append(msgs, v)
src.message(v)
+
+ <-pullC
}
})
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/proto/logdog/logpb/butler.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698