| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package subscriber | 5 package subscriber |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "sort" | 9 "sort" |
| 10 "sync" | 10 "sync" |
| 11 "sync/atomic" |
| 11 "testing" | 12 "testing" |
| 12 "time" | 13 "time" |
| 13 | 14 |
| 14 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/clock/testclock" | 16 "github.com/luci/luci-go/common/clock/testclock" |
| 16 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 17 "github.com/luci/luci-go/common/gcloud/gcps" | 18 "github.com/luci/luci-go/common/gcloud/gcps" |
| 18 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
| 19 "google.golang.org/cloud/pubsub" | 20 "google.golang.org/cloud/pubsub" |
| 20 | 21 |
| 22 . "github.com/luci/luci-go/common/testing/assertions" |
| 21 . "github.com/smartystreets/goconvey/convey" | 23 . "github.com/smartystreets/goconvey/convey" |
| 22 ) | 24 ) |
| 23 | 25 |
| 24 type testPubSub struct { | 26 type event struct { |
| 25 » c context.Context | 27 » msg *pubsub.Message |
| 26 » errCB func() error | 28 » err error |
| 27 » sub gcps.Subscription | |
| 28 » msgC chan *pubsub.Message | |
| 29 } | 29 } |
| 30 | 30 |
| 31 func (ps *testPubSub) Pull(s gcps.Subscription, amount int) ([]*pubsub.Message,
error) { | 31 type testSource struct { |
| 32 » if ps.errCB != nil { | 32 » sub gcps.Subscription |
| 33 » » if err := ps.errCB(); err != nil { | 33 » eventC chan event |
| 34 » » » return nil, err | |
| 35 » » } | |
| 36 » } | |
| 37 » if ps.sub != s { | |
| 38 » » return nil, fmt.Errorf("unknown subscription %q", s) | |
| 39 » } | |
| 40 » if amount <= 0 { | |
| 41 » » return nil, fmt.Errorf("invalid amount: %d", amount) | |
| 42 » } | |
| 43 | |
| 44 » select { | |
| 45 » case <-ps.c.Done(): | |
| 46 » » return nil, ps.c.Err() | |
| 47 | |
| 48 » case msg := <-ps.msgC: | |
| 49 » » return []*pubsub.Message{msg}, nil | |
| 50 » } | |
| 51 } | 34 } |
| 52 | 35 |
| 53 func (ps *testPubSub) send(s ...string) { | 36 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) { |
| 54 » for _, v := range s { | 37 » select { |
| 55 » » ps.msgC <- &pubsub.Message{ | 38 » case <-c.Done(): |
| 56 » » » ID: v, | 39 » » return nil, c.Err() |
| 57 » » » Data: []byte(v), | 40 |
| 41 » case e := <-s.eventC: |
| 42 » » switch { |
| 43 » » case e.err != nil: |
| 44 » » » return nil, e.err |
| 45 |
| 46 » » case e.msg != nil: |
| 47 » » » return []*pubsub.Message{e.msg}, nil |
| 48 |
| 49 » » default: |
| 50 » » » return nil, nil |
| 58 } | 51 } |
| 59 } | 52 } |
| 60 } | 53 } |
| 61 | 54 |
| 55 func (s *testSource) message(id ...string) { |
| 56 for _, v := range id { |
| 57 if v != "" { |
| 58 s.eventC <- event{msg: &pubsub.Message{ |
| 59 ID: v, |
| 60 AckID: v, |
| 61 Data: []byte(v), |
| 62 }} |
| 63 } else { |
| 64 s.eventC <- event{} |
| 65 } |
| 66 } |
| 67 } |
| 68 |
| 69 func (s *testSource) error(err error) { |
| 70 s.eventC <- event{err: err} |
| 71 } |
| 72 |
| 73 type testACK struct { |
| 74 sync.Mutex |
| 75 |
| 76 acks map[string]struct{} |
| 77 } |
| 78 |
| 79 func (a *testACK) Ack(id string) { |
| 80 a.Lock() |
| 81 defer a.Unlock() |
| 82 |
| 83 if a.acks == nil { |
| 84 a.acks = make(map[string]struct{}) |
| 85 } |
| 86 a.acks[id] = struct{}{} |
| 87 } |
| 88 |
| 89 func (a *testACK) getACKs() []string { |
| 90 a.Lock() |
| 91 defer a.Unlock() |
| 92 return dumpStringSet(a.acks) |
| 93 } |
| 94 |
| 95 func dumpStringSet(s map[string]struct{}) []string { |
| 96 v := make([]string, 0, len(s)) |
| 97 for a := range s { |
| 98 v = append(v, a) |
| 99 } |
| 100 sort.Strings(v) |
| 101 return v |
| 102 } |
| 103 |
| 62 func TestSubscriber(t *testing.T) { | 104 func TestSubscriber(t *testing.T) { |
| 63 t.Parallel() | 105 t.Parallel() |
| 64 | 106 |
| 65 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() { | 107 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() { |
| 66 c := context.Background() | 108 c := context.Background() |
| 67 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | 109 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) |
| 68 | 110 |
| 69 c, cancelFunc := context.WithCancel(c) | 111 c, cancelFunc := context.WithCancel(c) |
| 70 defer cancelFunc() | 112 defer cancelFunc() |
| 71 | 113 |
| 72 » » ps := &testPubSub{ | 114 » » src := &testSource{ |
| 73 » » » c: c, | 115 » » » eventC: make(chan event), |
| 74 » » » sub: gcps.Subscription("testsub"), | |
| 75 » » » msgC: make(chan *pubsub.Message), | |
| 76 } | 116 } |
| 117 ack := &testACK{} |
| 77 s := Subscriber{ | 118 s := Subscriber{ |
| 78 » » » PubSub: ps, | 119 » » » S: src, |
| 79 » » » Subscription: ps.sub, | 120 » » » A: ack, |
| 121 » » » Workers: 8, |
| 80 } | 122 } |
| 81 | 123 |
| 82 » » var received []string | 124 » » var seenMu sync.Mutex |
| 83 » » var receivedMu sync.Mutex | 125 » » seen := map[string]struct{}{} |
| 84 » » cb := func(msg *pubsub.Message) { | 126 » » blacklist := map[string]struct{}{} |
| 85 » » » receivedMu.Lock() | 127 » » runWith := func(cb func()) { |
| 86 » » » defer receivedMu.Unlock() | |
| 87 » » » received = append(received, msg.ID) | |
| 88 » » } | |
| 89 | |
| 90 » » // If a subscriber goroutine is sleeping, advance to wake it. | |
| 91 » » tc.SetTimerCallback(func(d time.Duration, t clock.Timer) { | |
| 92 » » » tc.Add(d) | |
| 93 » » }) | |
| 94 | |
| 95 » » Convey(`A Subscriber can pull messages.`, func() { | |
| 96 doneC := make(chan struct{}) | 128 doneC := make(chan struct{}) |
| 97 go func() { | 129 go func() { |
| 98 defer close(doneC) | 130 defer close(doneC) |
| 99 » » » » s.Run(c, cb) | 131 » » » » s.Run(c, func(msg *pubsub.Message) bool { |
| 132 » » » » » seenMu.Lock() |
| 133 » » » » » defer seenMu.Unlock() |
| 134 » » » » » seen[msg.ID] = struct{}{} |
| 135 |
| 136 » » » » » _, ok := blacklist[msg.ID] |
| 137 » » » » » return !ok |
| 138 » » » » }) |
| 100 }() | 139 }() |
| 101 | 140 |
| 102 » » » var msgs []string | 141 » » » cb() |
| 103 » » » for i := 0; i < 1024; i++ { | |
| 104 » » » » msgs = append(msgs, fmt.Sprintf("%08x", i)) | |
| 105 » » » } | |
| 106 » » » ps.send(msgs...) | |
| 107 | |
| 108 cancelFunc() | 142 cancelFunc() |
| 109 <-doneC | 143 <-doneC |
| 144 } |
| 110 | 145 |
| 111 » » » sort.Strings(msgs) | 146 » » Convey(`A Subscriber can pull and ACK messages.`, func() { |
| 112 » » » sort.Strings(received) | 147 » » » var msgs []string |
| 113 » » » So(received, ShouldResemble, msgs) | 148 » » » runWith(func() { |
| 149 » » » » for i := 0; i < 1024; i++ { |
| 150 » » » » » v := fmt.Sprintf("%08x", i) |
| 151 » » » » » msgs = append(msgs, v) |
| 152 » » » » » src.message(v) |
| 153 » » » » } |
| 154 » » » }) |
| 155 |
| 156 » » » So(dumpStringSet(seen), ShouldResembleV, msgs) |
| 157 » » » So(ack.getACKs(), ShouldResembleV, msgs) |
| 114 }) | 158 }) |
| 115 | 159 |
| 116 » » Convey(`A Subscriber will continue retrying if there is a Pub/Su
b error.`, func() { | 160 » » Convey(`A Subscriber that encounters an empty message set will s
leep and try again.`, func() { |
| 117 » » » var errMu sync.Mutex | 161 » » » var count int32 |
| 118 » » » count := 0 | 162 » » » tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ |
| 119 » » » ps.errCB = func() error { | 163 » » » » if atomic.AddInt32(&count, 1) > 1 { |
| 120 » » » » errMu.Lock() | 164 » » » » » panic("should not have this many callbac
ks") |
| 121 » » » » defer errMu.Unlock() | 165 » » » » } |
| 166 » » » » tc.Add(d) |
| 167 » » » }) |
| 122 | 168 |
| 123 » » » » if count < 1024 { | 169 » » » runWith(func() { |
| 124 » » » » » count++ | 170 » » » » src.message("a", "b", "", "c", "d") |
| 125 » » » » » return errors.WrapTransient(errors.New("
test error")) | 171 » » » }) |
| 172 |
| 173 » » » So(dumpStringSet(seen), ShouldResembleV, []string{"a", "
b", "c", "d"}) |
| 174 » » » So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c
", "d"}) |
| 175 » » }) |
| 176 |
| 177 » » Convey(`A Subscriber that encounters a Source error will sleep a
nd try again.`, func() { |
| 178 » » » var count int32 |
| 179 » » » tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ |
| 180 » » » » if atomic.AddInt32(&count, 1) > 1 { |
| 181 » » » » » panic("should not have this many callbac
ks") |
| 126 } | 182 } |
| 127 » » » » return nil | 183 » » » » tc.Add(d) |
| 128 » » » } | 184 » » » }) |
| 129 | 185 |
| 130 » » » doneC := make(chan struct{}) | 186 » » » runWith(func() { |
| 131 » » » go func() { | 187 » » » » src.message("a", "b") |
| 132 » » » » defer close(doneC) | 188 » » » » src.error(errors.New("test error")) |
| 133 » » » » s.Run(c, cb) | 189 » » » » src.message("c", "d") |
| 134 » » » }() | 190 » » » }) |
| 135 | 191 |
| 136 » » » var msgs []string | 192 » » » So(dumpStringSet(seen), ShouldResembleV, []string{"a", "
b", "c", "d"}) |
| 137 » » » for i := 0; i < 1024; i++ { | 193 » » » So(ack.getACKs(), ShouldResembleV, []string{"a", "b", "c
", "d"}) |
| 138 » » » » msgs = append(msgs, fmt.Sprintf("%08x", i)) | |
| 139 » » » } | |
| 140 » » » ps.send(msgs...) | |
| 141 | |
| 142 » » » cancelFunc() | |
| 143 » » » <-doneC | |
| 144 | |
| 145 » » » sort.Strings(msgs) | |
| 146 » » » sort.Strings(received) | |
| 147 » » » So(received, ShouldResemble, msgs) | |
| 148 }) | 194 }) |
| 149 }) | 195 }) |
| 150 } | 196 } |
| OLD | NEW |