| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package subscriber | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "sort" | |
| 10 "sync" | |
| 11 "sync/atomic" | |
| 12 "testing" | |
| 13 "time" | |
| 14 | |
| 15 "github.com/luci/luci-go/common/clock" | |
| 16 "github.com/luci/luci-go/common/clock/testclock" | |
| 17 "github.com/luci/luci-go/common/errors" | |
| 18 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 19 "golang.org/x/net/context" | |
| 20 | |
| 21 . "github.com/smartystreets/goconvey/convey" | |
| 22 ) | |
| 23 | |
| 24 type event struct { | |
| 25 msg *pubsub.Message | |
| 26 err error | |
| 27 } | |
| 28 | |
| 29 type testSource struct { | |
| 30 sub pubsub.Subscription | |
| 31 eventC chan event | |
| 32 } | |
| 33 | |
| 34 func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, erro
r) { | |
| 35 var e event | |
| 36 | |
| 37 select { | |
| 38 case <-c.Done(): | |
| 39 // Enforce determinism, preferring events. | |
| 40 select { | |
| 41 case e = <-s.eventC: | |
| 42 break | |
| 43 default: | |
| 44 return nil, c.Err() | |
| 45 } | |
| 46 | |
| 47 case e = <-s.eventC: | |
| 48 break | |
| 49 } | |
| 50 | |
| 51 switch { | |
| 52 case e.err != nil: | |
| 53 return nil, e.err | |
| 54 | |
| 55 case e.msg != nil: | |
| 56 return []*pubsub.Message{e.msg}, nil | |
| 57 | |
| 58 default: | |
| 59 return nil, nil | |
| 60 } | |
| 61 } | |
| 62 | |
| 63 func (s *testSource) message(id ...string) { | |
| 64 for _, v := range id { | |
| 65 if v != "" { | |
| 66 s.eventC <- event{msg: &pubsub.Message{ | |
| 67 ID: v, | |
| 68 AckID: v, | |
| 69 Data: []byte(v), | |
| 70 }} | |
| 71 } else { | |
| 72 s.eventC <- event{} | |
| 73 } | |
| 74 } | |
| 75 } | |
| 76 | |
| 77 func (s *testSource) error(err error) { | |
| 78 s.eventC <- event{err: err} | |
| 79 } | |
| 80 | |
| 81 type testACK struct { | |
| 82 sync.Mutex | |
| 83 | |
| 84 acks map[string]struct{} | |
| 85 } | |
| 86 | |
| 87 func (a *testACK) Ack(id string) { | |
| 88 a.Lock() | |
| 89 defer a.Unlock() | |
| 90 | |
| 91 if a.acks == nil { | |
| 92 a.acks = make(map[string]struct{}) | |
| 93 } | |
| 94 a.acks[id] = struct{}{} | |
| 95 } | |
| 96 | |
| 97 func (a *testACK) getACKs() []string { | |
| 98 a.Lock() | |
| 99 defer a.Unlock() | |
| 100 return dumpStringSet(a.acks) | |
| 101 } | |
| 102 | |
| 103 func dumpStringSet(s map[string]struct{}) []string { | |
| 104 v := make([]string, 0, len(s)) | |
| 105 for a := range s { | |
| 106 v = append(v, a) | |
| 107 } | |
| 108 sort.Strings(v) | |
| 109 return v | |
| 110 } | |
| 111 | |
| 112 func TestSubscriber(t *testing.T) { | |
| 113 t.Parallel() | |
| 114 | |
| 115 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() { | |
| 116 c := context.Background() | |
| 117 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | |
| 118 | |
| 119 c, cancelFunc := context.WithCancel(c) | |
| 120 defer cancelFunc() | |
| 121 | |
| 122 src := &testSource{ | |
| 123 eventC: make(chan event), | |
| 124 } | |
| 125 ack := &testACK{} | |
| 126 s := Subscriber{ | |
| 127 S: src, | |
| 128 A: ack, | |
| 129 Workers: 8, | |
| 130 } | |
| 131 | |
| 132 // If not nil, processing goroutines will block on reads from th
is | |
| 133 // channel, one per message. | |
| 134 var pullC chan string | |
| 135 | |
| 136 var seenMu sync.Mutex | |
| 137 seen := map[string]struct{}{} | |
| 138 blacklist := map[string]struct{}{} | |
| 139 runWith := func(cb func()) { | |
| 140 doneC := make(chan struct{}) | |
| 141 go func() { | |
| 142 defer close(doneC) | |
| 143 s.Run(c, func(msg *pubsub.Message) bool { | |
| 144 if pullC != nil { | |
| 145 pullC <- msg.ID | |
| 146 } | |
| 147 | |
| 148 seenMu.Lock() | |
| 149 defer seenMu.Unlock() | |
| 150 seen[msg.ID] = struct{}{} | |
| 151 | |
| 152 _, ok := blacklist[msg.ID] | |
| 153 return !ok | |
| 154 }) | |
| 155 }() | |
| 156 | |
| 157 cb() | |
| 158 cancelFunc() | |
| 159 <-doneC | |
| 160 } | |
| 161 | |
| 162 Convey(`A Subscriber can pull and ACK messages.`, func() { | |
| 163 var msgs []string | |
| 164 pullC = make(chan string) | |
| 165 runWith(func() { | |
| 166 for i := 0; i < 1024; i++ { | |
| 167 v := fmt.Sprintf("%08x", i) | |
| 168 msgs = append(msgs, v) | |
| 169 src.message(v) | |
| 170 | |
| 171 <-pullC | |
| 172 } | |
| 173 }) | |
| 174 | |
| 175 So(dumpStringSet(seen), ShouldResemble, msgs) | |
| 176 So(ack.getACKs(), ShouldResemble, msgs) | |
| 177 }) | |
| 178 | |
| 179 Convey(`A Subscriber that encounters an empty message set will s
leep and try again.`, func() { | |
| 180 var count int32 | |
| 181 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | |
| 182 if atomic.AddInt32(&count, 1) > 1 { | |
| 183 panic("should not have this many callbac
ks") | |
| 184 } | |
| 185 tc.Add(d) | |
| 186 }) | |
| 187 | |
| 188 runWith(func() { | |
| 189 src.message("a", "b", "", "c", "d") | |
| 190 }) | |
| 191 | |
| 192 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) | |
| 193 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) | |
| 194 }) | |
| 195 | |
| 196 Convey(`A Subscriber that encounters a Source error will sleep a
nd try again.`, func() { | |
| 197 var count int32 | |
| 198 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | |
| 199 if atomic.AddInt32(&count, 1) > 1 { | |
| 200 panic("should not have this many callbac
ks") | |
| 201 } | |
| 202 tc.Add(d) | |
| 203 }) | |
| 204 | |
| 205 runWith(func() { | |
| 206 src.message("a", "b") | |
| 207 src.error(errors.New("test error")) | |
| 208 src.message("c", "d") | |
| 209 }) | |
| 210 | |
| 211 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) | |
| 212 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) | |
| 213 }) | |
| 214 }) | |
| 215 } | |
| OLD | NEW |