Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(406)

Side by Side Diff: common/gcloud/pubsub/subscriber/subscriber_test.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package subscriber 5 package subscriber
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "sort" 9 "sort"
10 "sync" 10 "sync"
(...skipping 13 matching lines...) Expand all
24 type event struct { 24 type event struct {
25 msg *pubsub.Message 25 msg *pubsub.Message
26 err error 26 err error
27 } 27 }
28 28
29 type testSource struct { 29 type testSource struct {
30 sub pubsub.Subscription 30 sub pubsub.Subscription
31 eventC chan event 31 eventC chan event
32 } 32 }
33 33
34 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) { 34 func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, erro r) {
35 select { 35 select {
36 case <-c.Done(): 36 case <-c.Done():
37 return nil, c.Err() 37 return nil, c.Err()
38 38
39 case e := <-s.eventC: 39 case e := <-s.eventC:
40 switch { 40 switch {
41 case e.err != nil: 41 case e.err != nil:
42 return nil, e.err 42 return nil, e.err
43 43
44 case e.msg != nil: 44 case e.msg != nil:
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) 107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
108 108
109 c, cancelFunc := context.WithCancel(c) 109 c, cancelFunc := context.WithCancel(c)
110 defer cancelFunc() 110 defer cancelFunc()
111 111
112 src := &testSource{ 112 src := &testSource{
113 eventC: make(chan event), 113 eventC: make(chan event),
114 } 114 }
115 ack := &testACK{} 115 ack := &testACK{}
116 s := Subscriber{ 116 s := Subscriber{
117 » » » S: src, 117 » » » S: src,
118 » » » A: ack, 118 » » » A: ack,
119 » » » PullWorkers: 8, 119 » » » Workers: 8,
120 } 120 }
121 121
122 var seenMu sync.Mutex 122 var seenMu sync.Mutex
123 seen := map[string]struct{}{} 123 seen := map[string]struct{}{}
124 blacklist := map[string]struct{}{} 124 blacklist := map[string]struct{}{}
125 runWith := func(cb func()) { 125 runWith := func(cb func()) {
126 doneC := make(chan struct{}) 126 doneC := make(chan struct{})
127 go func() { 127 go func() {
128 defer close(doneC) 128 defer close(doneC)
129 s.Run(c, func(msg *pubsub.Message) bool { 129 s.Run(c, func(msg *pubsub.Message) bool {
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
185 src.message("a", "b") 185 src.message("a", "b")
186 src.error(errors.New("test error")) 186 src.error(errors.New("test error"))
187 src.message("c", "d") 187 src.message("c", "d")
188 }) 188 })
189 189
190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"}) 190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b ", "c", "d"})
191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"}) 191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c" , "d"})
192 }) 192 })
193 }) 193 })
194 } 194 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698