| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package subscriber | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "sort" | |
| 10 "sync" | |
| 11 "sync/atomic" | |
| 12 "testing" | |
| 13 "time" | |
| 14 | |
| 15 "github.com/luci/luci-go/common/clock" | |
| 16 "github.com/luci/luci-go/common/clock/testclock" | |
| 17 "github.com/luci/luci-go/common/errors" | |
| 18 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 19 "golang.org/x/net/context" | |
| 20 | |
| 21 . "github.com/smartystreets/goconvey/convey" | |
| 22 ) | |
| 23 | |
| 24 type event struct { | |
| 25 msg *pubsub.Message | |
| 26 err error | |
| 27 } | |
| 28 | |
| 29 type testSource struct { | |
| 30 sub pubsub.Subscription | |
| 31 eventC chan event | |
| 32 } | |
| 33 | |
| 34 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) { | |
| 35 select { | |
| 36 case <-c.Done(): | |
| 37 return nil, c.Err() | |
| 38 | |
| 39 case e := <-s.eventC: | |
| 40 switch { | |
| 41 case e.err != nil: | |
| 42 return nil, e.err | |
| 43 | |
| 44 case e.msg != nil: | |
| 45 return []*pubsub.Message{e.msg}, nil | |
| 46 | |
| 47 default: | |
| 48 return nil, nil | |
| 49 } | |
| 50 } | |
| 51 } | |
| 52 | |
| 53 func (s *testSource) message(id ...string) { | |
| 54 for _, v := range id { | |
| 55 if v != "" { | |
| 56 s.eventC <- event{msg: &pubsub.Message{ | |
| 57 ID: v, | |
| 58 AckID: v, | |
| 59 Data: []byte(v), | |
| 60 }} | |
| 61 } else { | |
| 62 s.eventC <- event{} | |
| 63 } | |
| 64 } | |
| 65 } | |
| 66 | |
| 67 func (s *testSource) error(err error) { | |
| 68 s.eventC <- event{err: err} | |
| 69 } | |
| 70 | |
| 71 type testACK struct { | |
| 72 sync.Mutex | |
| 73 | |
| 74 acks map[string]struct{} | |
| 75 } | |
| 76 | |
| 77 func (a *testACK) Ack(id string) { | |
| 78 a.Lock() | |
| 79 defer a.Unlock() | |
| 80 | |
| 81 if a.acks == nil { | |
| 82 a.acks = make(map[string]struct{}) | |
| 83 } | |
| 84 a.acks[id] = struct{}{} | |
| 85 } | |
| 86 | |
| 87 func (a *testACK) getACKs() []string { | |
| 88 a.Lock() | |
| 89 defer a.Unlock() | |
| 90 return dumpStringSet(a.acks) | |
| 91 } | |
| 92 | |
| 93 func dumpStringSet(s map[string]struct{}) []string { | |
| 94 v := make([]string, 0, len(s)) | |
| 95 for a := range s { | |
| 96 v = append(v, a) | |
| 97 } | |
| 98 sort.Strings(v) | |
| 99 return v | |
| 100 } | |
| 101 | |
| 102 func TestSubscriber(t *testing.T) { | |
| 103 t.Parallel() | |
| 104 | |
| 105 Convey(`A Subscriber configuration using a testing Pub/Sub`, t, func() { | |
| 106 c := context.Background() | |
| 107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | |
| 108 | |
| 109 c, cancelFunc := context.WithCancel(c) | |
| 110 defer cancelFunc() | |
| 111 | |
| 112 src := &testSource{ | |
| 113 eventC: make(chan event), | |
| 114 } | |
| 115 ack := &testACK{} | |
| 116 s := Subscriber{ | |
| 117 S: src, | |
| 118 A: ack, | |
| 119 PullWorkers: 8, | |
| 120 } | |
| 121 | |
| 122 var seenMu sync.Mutex | |
| 123 seen := map[string]struct{}{} | |
| 124 blacklist := map[string]struct{}{} | |
| 125 runWith := func(cb func()) { | |
| 126 doneC := make(chan struct{}) | |
| 127 go func() { | |
| 128 defer close(doneC) | |
| 129 s.Run(c, func(msg *pubsub.Message) bool { | |
| 130 seenMu.Lock() | |
| 131 defer seenMu.Unlock() | |
| 132 seen[msg.ID] = struct{}{} | |
| 133 | |
| 134 _, ok := blacklist[msg.ID] | |
| 135 return !ok | |
| 136 }) | |
| 137 }() | |
| 138 | |
| 139 cb() | |
| 140 cancelFunc() | |
| 141 <-doneC | |
| 142 } | |
| 143 | |
| 144 Convey(`A Subscriber can pull and ACK messages.`, func() { | |
| 145 var msgs []string | |
| 146 runWith(func() { | |
| 147 for i := 0; i < 1024; i++ { | |
| 148 v := fmt.Sprintf("%08x", i) | |
| 149 msgs = append(msgs, v) | |
| 150 src.message(v) | |
| 151 } | |
| 152 }) | |
| 153 | |
| 154 So(dumpStringSet(seen), ShouldResemble, msgs) | |
| 155 So(ack.getACKs(), ShouldResemble, msgs) | |
| 156 }) | |
| 157 | |
| 158 Convey(`A Subscriber that encounters an empty message set will s
leep and try again.`, func() { | |
| 159 var count int32 | |
| 160 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | |
| 161 if atomic.AddInt32(&count, 1) > 1 { | |
| 162 panic("should not have this many callbac
ks") | |
| 163 } | |
| 164 tc.Add(d) | |
| 165 }) | |
| 166 | |
| 167 runWith(func() { | |
| 168 src.message("a", "b", "", "c", "d") | |
| 169 }) | |
| 170 | |
| 171 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) | |
| 172 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) | |
| 173 }) | |
| 174 | |
| 175 Convey(`A Subscriber that encounters a Source error will sleep a
nd try again.`, func() { | |
| 176 var count int32 | |
| 177 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | |
| 178 if atomic.AddInt32(&count, 1) > 1 { | |
| 179 panic("should not have this many callbac
ks") | |
| 180 } | |
| 181 tc.Add(d) | |
| 182 }) | |
| 183 | |
| 184 runWith(func() { | |
| 185 src.message("a", "b") | |
| 186 src.error(errors.New("test error")) | |
| 187 src.message("c", "d") | |
| 188 }) | |
| 189 | |
| 190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) | |
| 191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) | |
| 192 }) | |
| 193 }) | |
| 194 } | |
| OLD | NEW |