Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(784)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber_test.go

Issue 1838303002: Use native Pub/Sub library primitives. (Closed) Base URL: https://github.com/luci/luci-go@logdog-go1.6
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package subscriber
6
7 import (
8 "fmt"
9 "sort"
10 "sync"
11 "sync/atomic"
12 "testing"
13 "time"
14
15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/clock/testclock"
17 "github.com/luci/luci-go/common/errors"
18 "github.com/luci/luci-go/common/gcloud/pubsub"
19 "golang.org/x/net/context"
20
21 . "github.com/smartystreets/goconvey/convey"
22 )
23
24 type event struct {
25 msg *pubsub.Message
26 err error
27 }
28
29 type testSource struct {
30 sub pubsub.Subscription
31 eventC chan event
32 }
33
34 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) {
35 select {
36 case <-c.Done():
37 return nil, c.Err()
38
39 case e := <-s.eventC:
40 switch {
41 case e.err != nil:
42 return nil, e.err
43
44 case e.msg != nil:
45 return []*pubsub.Message{e.msg}, nil
46
47 default:
48 return nil, nil
49 }
50 }
51 }
52
53 func (s *testSource) message(id ...string) {
54 for _, v := range id {
55 if v != "" {
56 s.eventC <- event{msg: &pubsub.Message{
57 ID: v,
58 AckID: v,
59 Data: []byte(v),
60 }}
61 } else {
62 s.eventC <- event{}
63 }
64 }
65 }
66
67 func (s *testSource) error(err error) {
68 s.eventC <- event{err: err}
69 }
70
71 type testACK struct {
72 sync.Mutex
73
74 acks map[string]struct{}
75 }
76
77 func (a *testACK) Ack(id string) {
78 a.Lock()
79 defer a.Unlock()
80
81 if a.acks == nil {
82 a.acks = make(map[string]struct{})
83 }
84 a.acks[id] = struct{}{}
85 }
86
87 func (a *testACK) getACKs() []string {
88 a.Lock()
89 defer a.Unlock()
90 return dumpStringSet(a.acks)
91 }
92
93 func dumpStringSet(s map[string]struct{}) []string {
94 v := make([]string, 0, len(s))
95 for a := range s {
96 v = append(v, a)
97 }
98 sort.Strings(v)
99 return v
100 }
101
102 func TestSubscriber(t *testing.T) {
103 t.Parallel()
104
105 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() {
106 c := context.Background()
107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
108
109 c, cancelFunc := context.WithCancel(c)
110 defer cancelFunc()
111
112 src := &testSource{
113 eventC: make(chan event),
114 }
115 ack := &testACK{}
116 s := Subscriber{
117 S: src,
118 A: ack,
119 PullWorkers: 8,
120 }
121
122 var seenMu sync.Mutex
123 seen := map[string]struct{}{}
124 blacklist := map[string]struct{}{}
125 runWith := func(cb func()) {
126 doneC := make(chan struct{})
127 go func() {
128 defer close(doneC)
129 s.Run(c, func(msg *pubsub.Message) bool {
130 seenMu.Lock()
131 defer seenMu.Unlock()
132 seen[msg.ID] = struct{}{}
133
134 _, ok := blacklist[msg.ID]
135 return !ok
136 })
137 }()
138
139 cb()
140 cancelFunc()
141 <-doneC
142 }
143
144 Convey(`A Subscriber can pull and ACK messages.`, func() {
145 var msgs []string
146 runWith(func() {
147 for i := 0; i < 1024; i++ {
148 v := fmt.Sprintf("%08x", i)
149 msgs = append(msgs, v)
150 src.message(v)
151 }
152 })
153
154 So(dumpStringSet(seen), ShouldResemble, msgs)
155 So(ack.getACKs(), ShouldResemble, msgs)
156 })
157
158 Convey(`A Subscriber that encounters an empty message set will s leep and try again.`, func() {
159 var count int32
160 tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
161 if atomic.AddInt32(&count, 1) > 1 {
162 panic("should not have this many callbac ks")
163 }
164 tc.Add(d)
165 })
166
167 runWith(func() {
168 src.message("a", "b", "", "c", "d")
169 })
170
171 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"})
172 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"})
173 })
174
175 Convey(`A Subscriber that encounters a Source error will sleep a nd try again.`, func() {
176 var count int32
177 tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
178 if atomic.AddInt32(&count, 1) > 1 {
179 panic("should not have this many callbac ks")
180 }
181 tc.Add(d)
182 })
183
184 runWith(func() {
185 src.message("a", "b")
186 src.error(errors.New("test error"))
187 src.message("c", "d")
188 })
189
190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"})
191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"})
192 })
193 })
194 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698