Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber_test.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Use "Topic" instead of "NewTopic" ... don't want to create :) Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/tsmon/iface.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package subscriber
6
7 import (
8 "fmt"
9 "sort"
10 "sync"
11 "sync/atomic"
12 "testing"
13 "time"
14
15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/clock/testclock"
17 "github.com/luci/luci-go/common/errors"
18 "github.com/luci/luci-go/common/gcloud/pubsub"
19 "golang.org/x/net/context"
20
21 . "github.com/smartystreets/goconvey/convey"
22 )
23
24 type event struct {
25 msg *pubsub.Message
26 err error
27 }
28
29 type testSource struct {
30 sub pubsub.Subscription
31 eventC chan event
32 }
33
34 func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, erro r) {
35 var e event
36
37 select {
38 case <-c.Done():
39 // Enforce determinism, preferring events.
40 select {
41 case e = <-s.eventC:
42 break
43 default:
44 return nil, c.Err()
45 }
46
47 case e = <-s.eventC:
48 break
49 }
50
51 switch {
52 case e.err != nil:
53 return nil, e.err
54
55 case e.msg != nil:
56 return []*pubsub.Message{e.msg}, nil
57
58 default:
59 return nil, nil
60 }
61 }
62
63 func (s *testSource) message(id ...string) {
64 for _, v := range id {
65 if v != "" {
66 s.eventC <- event{msg: &pubsub.Message{
67 ID: v,
68 AckID: v,
69 Data: []byte(v),
70 }}
71 } else {
72 s.eventC <- event{}
73 }
74 }
75 }
76
77 func (s *testSource) error(err error) {
78 s.eventC <- event{err: err}
79 }
80
81 type testACK struct {
82 sync.Mutex
83
84 acks map[string]struct{}
85 }
86
87 func (a *testACK) Ack(id string) {
88 a.Lock()
89 defer a.Unlock()
90
91 if a.acks == nil {
92 a.acks = make(map[string]struct{})
93 }
94 a.acks[id] = struct{}{}
95 }
96
97 func (a *testACK) getACKs() []string {
98 a.Lock()
99 defer a.Unlock()
100 return dumpStringSet(a.acks)
101 }
102
103 func dumpStringSet(s map[string]struct{}) []string {
104 v := make([]string, 0, len(s))
105 for a := range s {
106 v = append(v, a)
107 }
108 sort.Strings(v)
109 return v
110 }
111
112 func TestSubscriber(t *testing.T) {
113 t.Parallel()
114
115 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() {
116 c := context.Background()
117 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
118
119 c, cancelFunc := context.WithCancel(c)
120 defer cancelFunc()
121
122 src := &testSource{
123 eventC: make(chan event),
124 }
125 ack := &testACK{}
126 s := Subscriber{
127 S: src,
128 A: ack,
129 Workers: 8,
130 }
131
132 // If not nil, processing goroutines will block on reads from th is
133 // channel, one per message.
134 var pullC chan string
135
136 var seenMu sync.Mutex
137 seen := map[string]struct{}{}
138 blacklist := map[string]struct{}{}
139 runWith := func(cb func()) {
140 doneC := make(chan struct{})
141 go func() {
142 defer close(doneC)
143 s.Run(c, func(msg *pubsub.Message) bool {
144 if pullC != nil {
145 pullC <- msg.ID
146 }
147
148 seenMu.Lock()
149 defer seenMu.Unlock()
150 seen[msg.ID] = struct{}{}
151
152 _, ok := blacklist[msg.ID]
153 return !ok
154 })
155 }()
156
157 cb()
158 cancelFunc()
159 <-doneC
160 }
161
162 Convey(`A Subscriber can pull and ACK messages.`, func() {
163 var msgs []string
164 pullC = make(chan string)
165 runWith(func() {
166 for i := 0; i < 1024; i++ {
167 v := fmt.Sprintf("%08x", i)
168 msgs = append(msgs, v)
169 src.message(v)
170
171 <-pullC
172 }
173 })
174
175 So(dumpStringSet(seen), ShouldResemble, msgs)
176 So(ack.getACKs(), ShouldResemble, msgs)
177 })
178
179 Convey(`A Subscriber that encounters an empty message set will s leep and try again.`, func() {
180 var count int32
181 tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
182 if atomic.AddInt32(&count, 1) > 1 {
183 panic("should not have this many callbac ks")
184 }
185 tc.Add(d)
186 })
187
188 runWith(func() {
189 src.message("a", "b", "", "c", "d")
190 })
191
192 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"})
193 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"})
194 })
195
196 Convey(`A Subscriber that encounters a Source error will sleep a nd try again.`, func() {
197 var count int32
198 tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
199 if atomic.AddInt32(&count, 1) > 1 {
200 panic("should not have this many callbac ks")
201 }
202 tc.Add(d)
203 })
204
205 runWith(func() {
206 src.message("a", "b")
207 src.error(errors.New("test error"))
208 src.message("c", "d")
209 })
210
211 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"})
212 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"})
213 })
214 })
215 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/tsmon/iface.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698