Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(264)

Side by Side Diff: common/gcloud/gcps/subscriber/subscriber_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package subscriber 5 package subscriber
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "sort" 9 "sort"
10 "sync" 10 "sync"
11 "sync/atomic"
11 "testing" 12 "testing"
12 "time" 13 "time"
13 14
14 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/clock/testclock" 16 "github.com/luci/luci-go/common/clock/testclock"
16 "github.com/luci/luci-go/common/errors" 17 "github.com/luci/luci-go/common/errors"
17 "github.com/luci/luci-go/common/gcloud/gcps" 18 "github.com/luci/luci-go/common/gcloud/gcps"
18 "golang.org/x/net/context" 19 "golang.org/x/net/context"
19 "google.golang.org/cloud/pubsub" 20 "google.golang.org/cloud/pubsub"
20 21
22 . "github.com/luci/luci-go/common/testing/assertions"
21 . "github.com/smartystreets/goconvey/convey" 23 . "github.com/smartystreets/goconvey/convey"
22 ) 24 )
23 25
24 type testPubSub struct { 26 type event struct {
25 » c context.Context 27 » msg *pubsub.Message
26 » errCB func() error 28 » err error
27 » sub gcps.Subscription
28 » msgC chan *pubsub.Message
29 } 29 }
30 30
31 func (ps *testPubSub) Pull(s gcps.Subscription, amount int) ([]*pubsub.Message, error) { 31 type testSource struct {
32 » if ps.errCB != nil { 32 » sub gcps.Subscription
33 » » if err := ps.errCB(); err != nil { 33 » eventC chan event
34 » » » return nil, err
35 » » }
36 » }
37 » if ps.sub != s {
38 » » return nil, fmt.Errorf("unknown subscription %q", s)
39 » }
40 » if amount <= 0 {
41 » » return nil, fmt.Errorf("invalid amount: %d", amount)
42 » }
43
44 » select {
45 » case <-ps.c.Done():
46 » » return nil, ps.c.Err()
47
48 » case msg := <-ps.msgC:
49 » » return []*pubsub.Message{msg}, nil
50 » }
51 } 34 }
52 35
53 func (ps *testPubSub) send(s ...string) { 36 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) {
54 » for _, v := range s { 37 » select {
55 » » ps.msgC <- &pubsub.Message{ 38 » case <-c.Done():
56 » » » ID: v, 39 » » return nil, c.Err()
57 » » » Data: []byte(v), 40
41 » case e := <-s.eventC:
42 » » switch {
43 » » case e.err != nil:
44 » » » return nil, e.err
45
46 » » case e.msg != nil:
47 » » » return []*pubsub.Message{e.msg}, nil
48
49 » » default:
50 » » » return nil, nil
58 } 51 }
59 } 52 }
60 } 53 }
61 54
55 func (s *testSource) message(id ...string) {
56 for _, v := range id {
57 if v != "" {
58 s.eventC <- event{msg: &pubsub.Message{
59 ID: v,
60 AckID: v,
61 Data: []byte(v),
62 }}
63 } else {
64 s.eventC <- event{}
65 }
66 }
67 }
68
69 func (s *testSource) error(err error) {
70 s.eventC <- event{err: err}
71 }
72
73 type testACK struct {
74 sync.Mutex
75
76 acks map[string]struct{}
77 }
78
79 func (a *testACK) Ack(id string) {
80 a.Lock()
81 defer a.Unlock()
82
83 if a.acks == nil {
84 a.acks = make(map[string]struct{})
85 }
86 a.acks[id] = struct{}{}
87 }
88
89 func (a *testACK) getACKs() []string {
90 a.Lock()
91 defer a.Unlock()
92 return dumpStringSet(a.acks)
93 }
94
95 func dumpStringSet(s map[string]struct{}) []string {
96 v := make([]string, 0, len(s))
97 for a := range s {
98 v = append(v, a)
99 }
100 sort.Strings(v)
101 return v
102 }
103
62 func TestSubscriber(t *testing.T) { 104 func TestSubscriber(t *testing.T) {
63 t.Parallel() 105 t.Parallel()
64 106
65 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() { 107 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() {
66 c := context.Background() 108 c := context.Background()
67 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) 109 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
68 110
69 c, cancelFunc := context.WithCancel(c) 111 c, cancelFunc := context.WithCancel(c)
70 defer cancelFunc() 112 defer cancelFunc()
71 113
72 » » ps := &testPubSub{ 114 » » src := &testSource{
73 » » » c: c, 115 » » » eventC: make(chan event),
74 » » » sub: gcps.Subscription("testsub"),
75 » » » msgC: make(chan *pubsub.Message),
76 } 116 }
117 ack := &testACK{}
77 s := Subscriber{ 118 s := Subscriber{
78 » » » PubSub: ps, 119 » » » S: src,
79 » » » Subscription: ps.sub, 120 » » » A: ack,
121 » » » Workers: 8,
80 } 122 }
81 123
82 » » var received []string 124 » » var seenMu sync.Mutex
83 » » var receivedMu sync.Mutex 125 » » seen := map[string]struct{}{}
84 » » cb := func(msg *pubsub.Message) { 126 » » blacklist := map[string]struct{}{}
85 » » » receivedMu.Lock() 127 » » runWith := func(cb func()) {
86 » » » defer receivedMu.Unlock()
87 » » » received = append(received, msg.ID)
88 » » }
89
90 » » // If a subscriber goroutine is sleeping, advance to wake it.
91 » » tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
92 » » » tc.Add(d)
93 » » })
94
95 » » Convey(`A Subscriber can pull messages.`, func() {
96 doneC := make(chan struct{}) 128 doneC := make(chan struct{})
97 go func() { 129 go func() {
98 defer close(doneC) 130 defer close(doneC)
99 » » » » s.Run(c, cb) 131 » » » » s.Run(c, func(msg *pubsub.Message) bool {
132 » » » » » seenMu.Lock()
133 » » » » » defer seenMu.Unlock()
134 » » » » » seen[msg.ID] = struct{}{}
135
136 » » » » » _, ok := blacklist[msg.ID]
137 » » » » » return !ok
138 » » » » })
100 }() 139 }()
101 140
102 » » » var msgs []string 141 » » » cb()
103 » » » for i := 0; i < 1024; i++ {
104 » » » » msgs = append(msgs, fmt.Sprintf("%08x", i))
105 » » » }
106 » » » ps.send(msgs...)
107
108 cancelFunc() 142 cancelFunc()
109 <-doneC 143 <-doneC
144 }
110 145
111 » » » sort.Strings(msgs) 146 » » Convey(`A Subscriber can pull and ACK messages.`, func() {
112 » » » sort.Strings(received) 147 » » » var msgs []string
113 » » » So(received, ShouldResemble, msgs) 148 » » » runWith(func() {
149 » » » » for i := 0; i < 1024; i++ {
150 » » » » » v := fmt.Sprintf("%08x", i)
151 » » » » » msgs = append(msgs, v)
152 » » » » » src.message(v)
153 » » » » }
154 » » » })
155
156 » » » So(dumpStringSet(seen), ShouldResembleV, msgs)
157 » » » So(ack.getACKs(), ShouldResembleV, msgs)
114 }) 158 })
115 159
116 » » Convey(`A Subscriber will continue retrying if there is a Pub/Su b error.`, func() { 160 » » Convey(`A Subscriber that encounters an empty message set will s leep and try again.`, func() {
117 » » » var errMu sync.Mutex 161 » » » var count int32
118 » » » count := 0 162 » » » tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
119 » » » ps.errCB = func() error { 163 » » » » if atomic.AddInt32(&count, 1) > 1 {
120 » » » » errMu.Lock() 164 » » » » » panic("should not have this many callbac ks")
121 » » » » defer errMu.Unlock() 165 » » » » }
166 » » » » tc.Add(d)
167 » » » })
122 168
123 » » » » if count < 1024 { 169 » » » runWith(func() {
124 » » » » » count++ 170 » » » » src.message("a", "b", "", "c", "d")
125 » » » » » return errors.WrapTransient(errors.New(" test error")) 171 » » » })
172
173 » » » So(dumpStringSet(seen), ShouldResembleV, []string{"a", " b", "c", "d"})
174 » » » So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c ", "d"})
175 » » })
176
177 » » Convey(`A Subscriber that encounters a Source error will sleep a nd try again.`, func() {
178 » » » var count int32
179 » » » tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
180 » » » » if atomic.AddInt32(&count, 1) > 1 {
181 » » » » » panic("should not have this many callbac ks")
126 } 182 }
127 » » » » return nil 183 » » » » tc.Add(d)
128 » » » } 184 » » » })
129 185
130 » » » doneC := make(chan struct{}) 186 » » » runWith(func() {
131 » » » go func() { 187 » » » » src.message("a", "b")
132 » » » » defer close(doneC) 188 » » » » src.error(errors.New("test error"))
133 » » » » s.Run(c, cb) 189 » » » » src.message("c", "d")
134 » » » }() 190 » » » })
135 191
136 » » » var msgs []string 192 » » » So(dumpStringSet(seen), ShouldResembleV, []string{"a", " b", "c", "d"})
137 » » » for i := 0; i < 1024; i++ { 193 » » » So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c ", "d"})
138 » » » » msgs = append(msgs, fmt.Sprintf("%08x", i))
139 » » » }
140 » » » ps.send(msgs...)
141
142 » » » cancelFunc()
143 » » » <-doneC
144
145 » » » sort.Strings(msgs)
146 » » » sort.Strings(received)
147 » » » So(received, ShouldResemble, msgs)
148 }) 194 })
149 }) 195 })
150 } 196 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698