Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(433)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber_test.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/proto/logdog/logpb/butler.proto » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package subscriber 5 package subscriber
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "sort" 9 "sort"
10 "sync" 10 "sync"
(...skipping 13 matching lines...) Expand all
24 type event struct { 24 type event struct {
25 msg *pubsub.Message 25 msg *pubsub.Message
26 err error 26 err error
27 } 27 }
28 28
29 type testSource struct { 29 type testSource struct {
30 sub pubsub.Subscription 30 sub pubsub.Subscription
31 eventC chan event 31 eventC chan event
32 } 32 }
33 33
34 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) { 34 func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, erro r) {
35 » var e event
36
35 select { 37 select {
36 case <-c.Done(): 38 case <-c.Done():
37 » » return nil, c.Err() 39 » » // Enforce determinism, preferring events.
40 » » select {
41 » » case e = <-s.eventC:
42 » » » break
43 » » default:
44 » » » return nil, c.Err()
45 » » }
38 46
39 » case e := <-s.eventC: 47 » case e = <-s.eventC:
40 » » switch { 48 » » break
41 » » case e.err != nil: 49 » }
42 » » » return nil, e.err
43 50
44 » » case e.msg != nil: 51 » switch {
45 » » » return []*pubsub.Message{e.msg}, nil 52 » case e.err != nil:
53 » » return nil, e.err
46 54
47 » » default: 55 » case e.msg != nil:
48 » » » return nil, nil 56 » » return []*pubsub.Message{e.msg}, nil
49 » » } 57
58 » default:
59 » » return nil, nil
50 } 60 }
51 } 61 }
52 62
53 func (s *testSource) message(id ...string) { 63 func (s *testSource) message(id ...string) {
54 for _, v := range id { 64 for _, v := range id {
55 if v != "" { 65 if v != "" {
56 s.eventC <- event{msg: &pubsub.Message{ 66 s.eventC <- event{msg: &pubsub.Message{
57 ID: v, 67 ID: v,
58 AckID: v, 68 AckID: v,
59 Data: []byte(v), 69 Data: []byte(v),
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) 117 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
108 118
109 c, cancelFunc := context.WithCancel(c) 119 c, cancelFunc := context.WithCancel(c)
110 defer cancelFunc() 120 defer cancelFunc()
111 121
112 src := &testSource{ 122 src := &testSource{
113 eventC: make(chan event), 123 eventC: make(chan event),
114 } 124 }
115 ack := &testACK{} 125 ack := &testACK{}
116 s := Subscriber{ 126 s := Subscriber{
117 » » » S: src, 127 » » » S: src,
118 » » » A: ack, 128 » » » A: ack,
119 » » » PullWorkers: 8, 129 » » » Workers: 8,
120 } 130 }
121 131
132 // If not nil, processing goroutines will block on reads from th is
133 // channel, one per message.
134 var pullC chan string
135
122 var seenMu sync.Mutex 136 var seenMu sync.Mutex
123 seen := map[string]struct{}{} 137 seen := map[string]struct{}{}
124 blacklist := map[string]struct{}{} 138 blacklist := map[string]struct{}{}
125 runWith := func(cb func()) { 139 runWith := func(cb func()) {
126 doneC := make(chan struct{}) 140 doneC := make(chan struct{})
127 go func() { 141 go func() {
128 defer close(doneC) 142 defer close(doneC)
129 s.Run(c, func(msg *pubsub.Message) bool { 143 s.Run(c, func(msg *pubsub.Message) bool {
144 if pullC != nil {
145 pullC <- msg.ID
146 }
147
130 seenMu.Lock() 148 seenMu.Lock()
131 defer seenMu.Unlock() 149 defer seenMu.Unlock()
132 seen[msg.ID] = struct{}{} 150 seen[msg.ID] = struct{}{}
133 151
134 _, ok := blacklist[msg.ID] 152 _, ok := blacklist[msg.ID]
135 return !ok 153 return !ok
136 }) 154 })
137 }() 155 }()
138 156
139 cb() 157 cb()
140 cancelFunc() 158 cancelFunc()
141 <-doneC 159 <-doneC
142 } 160 }
143 161
144 Convey(`A Subscriber can pull and ACK messages.`, func() { 162 Convey(`A Subscriber can pull and ACK messages.`, func() {
145 var msgs []string 163 var msgs []string
164 pullC = make(chan string)
146 runWith(func() { 165 runWith(func() {
147 for i := 0; i < 1024; i++ { 166 for i := 0; i < 1024; i++ {
148 v := fmt.Sprintf("%08x", i) 167 v := fmt.Sprintf("%08x", i)
149 msgs = append(msgs, v) 168 msgs = append(msgs, v)
150 src.message(v) 169 src.message(v)
170
171 <-pullC
151 } 172 }
152 }) 173 })
153 174
154 So(dumpStringSet(seen), ShouldResemble, msgs) 175 So(dumpStringSet(seen), ShouldResemble, msgs)
155 So(ack.getACKs(), ShouldResemble, msgs) 176 So(ack.getACKs(), ShouldResemble, msgs)
156 }) 177 })
157 178
158 Convey(`A Subscriber that encounters an empty message set will s leep and try again.`, func() { 179 Convey(`A Subscriber that encounters an empty message set will s leep and try again.`, func() {
159 var count int32 180 var count int32
160 tc.SetTimerCallback(func(d time.Duration, t clock.Timer) { 181 tc.SetTimerCallback(func(d time.Duration, t clock.Timer) {
(...skipping 24 matching lines...) Expand all
185 src.message("a", "b") 206 src.message("a", "b")
186 src.error(errors.New("test error")) 207 src.error(errors.New("test error"))
187 src.message("c", "d") 208 src.message("c", "d")
188 }) 209 })
189 210
190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"}) 211 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"})
191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"}) 212 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"})
192 }) 213 })
193 }) 214 })
194 } 215 }
OLDNEW
« no previous file with comments | « common/gcloud/pubsub/subscriber/subscriber.go ('k') | common/proto/logdog/logpb/butler.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698