| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package subscriber | 5 package subscriber |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "sort" | 9 "sort" |
| 10 "sync" | 10 "sync" |
| (...skipping 13 matching lines...) Expand all Loading... |
| 24 type event struct { | 24 type event struct { |
| 25 msg *pubsub.Message | 25 msg *pubsub.Message |
| 26 err error | 26 err error |
| 27 } | 27 } |
| 28 | 28 |
| 29 type testSource struct { | 29 type testSource struct { |
| 30 sub pubsub.Subscription | 30 sub pubsub.Subscription |
| 31 eventC chan event | 31 eventC chan event |
| 32 } | 32 } |
| 33 | 33 |
| 34 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) { | 34 func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, erro
r) { |
| 35 » var e event |
| 36 |
| 35 select { | 37 select { |
| 36 case <-c.Done(): | 38 case <-c.Done(): |
| 37 » » return nil, c.Err() | 39 » » // Enforce determinism, preferring events. |
| 40 » » select { |
| 41 » » case e = <-s.eventC: |
| 42 » » » break |
| 43 » » default: |
| 44 » » » return nil, c.Err() |
| 45 » » } |
| 38 | 46 |
| 39 » case e := <-s.eventC: | 47 » case e = <-s.eventC: |
| 40 » » switch { | 48 » » break |
| 41 » » case e.err != nil: | 49 » } |
| 42 » » » return nil, e.err | |
| 43 | 50 |
| 44 » » case e.msg != nil: | 51 » switch { |
| 45 » » » return []*pubsub.Message{e.msg}, nil | 52 » case e.err != nil: |
| 53 » » return nil, e.err |
| 46 | 54 |
| 47 » » default: | 55 » case e.msg != nil: |
| 48 » » » return nil, nil | 56 » » return []*pubsub.Message{e.msg}, nil |
| 49 » » } | 57 |
| 58 » default: |
| 59 » » return nil, nil |
| 50 } | 60 } |
| 51 } | 61 } |
| 52 | 62 |
| 53 func (s *testSource) message(id ...string) { | 63 func (s *testSource) message(id ...string) { |
| 54 for _, v := range id { | 64 for _, v := range id { |
| 55 if v != "" { | 65 if v != "" { |
| 56 s.eventC <- event{msg: &pubsub.Message{ | 66 s.eventC <- event{msg: &pubsub.Message{ |
| 57 ID: v, | 67 ID: v, |
| 58 AckID: v, | 68 AckID: v, |
| 59 Data: []byte(v), | 69 Data: []byte(v), |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | 117 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) |
| 108 | 118 |
| 109 c, cancelFunc := context.WithCancel(c) | 119 c, cancelFunc := context.WithCancel(c) |
| 110 defer cancelFunc() | 120 defer cancelFunc() |
| 111 | 121 |
| 112 src := &testSource{ | 122 src := &testSource{ |
| 113 eventC: make(chan event), | 123 eventC: make(chan event), |
| 114 } | 124 } |
| 115 ack := &testACK{} | 125 ack := &testACK{} |
| 116 s := Subscriber{ | 126 s := Subscriber{ |
| 117 » » » S: src, | 127 » » » S: src, |
| 118 » » » A: ack, | 128 » » » A: ack, |
| 119 » » » PullWorkers: 8, | 129 » » » Workers: 8, |
| 120 } | 130 } |
| 121 | 131 |
| 132 // If not nil, processing goroutines will block on reads from th
is |
| 133 // channel, one per message. |
| 134 var pullC chan string |
| 135 |
| 122 var seenMu sync.Mutex | 136 var seenMu sync.Mutex |
| 123 seen := map[string]struct{}{} | 137 seen := map[string]struct{}{} |
| 124 blacklist := map[string]struct{}{} | 138 blacklist := map[string]struct{}{} |
| 125 runWith := func(cb func()) { | 139 runWith := func(cb func()) { |
| 126 doneC := make(chan struct{}) | 140 doneC := make(chan struct{}) |
| 127 go func() { | 141 go func() { |
| 128 defer close(doneC) | 142 defer close(doneC) |
| 129 s.Run(c, func(msg *pubsub.Message) bool { | 143 s.Run(c, func(msg *pubsub.Message) bool { |
| 144 if pullC != nil { |
| 145 pullC <- msg.ID |
| 146 } |
| 147 |
| 130 seenMu.Lock() | 148 seenMu.Lock() |
| 131 defer seenMu.Unlock() | 149 defer seenMu.Unlock() |
| 132 seen[msg.ID] = struct{}{} | 150 seen[msg.ID] = struct{}{} |
| 133 | 151 |
| 134 _, ok := blacklist[msg.ID] | 152 _, ok := blacklist[msg.ID] |
| 135 return !ok | 153 return !ok |
| 136 }) | 154 }) |
| 137 }() | 155 }() |
| 138 | 156 |
| 139 cb() | 157 cb() |
| 140 cancelFunc() | 158 cancelFunc() |
| 141 <-doneC | 159 <-doneC |
| 142 } | 160 } |
| 143 | 161 |
| 144 Convey(`A Subscriber can pull and ACK messages.`, func() { | 162 Convey(`A Subscriber can pull and ACK messages.`, func() { |
| 145 var msgs []string | 163 var msgs []string |
| 164 pullC = make(chan string) |
| 146 runWith(func() { | 165 runWith(func() { |
| 147 for i := 0; i < 1024; i++ { | 166 for i := 0; i < 1024; i++ { |
| 148 v := fmt.Sprintf("%08x", i) | 167 v := fmt.Sprintf("%08x", i) |
| 149 msgs = append(msgs, v) | 168 msgs = append(msgs, v) |
| 150 src.message(v) | 169 src.message(v) |
| 170 |
| 171 <-pullC |
| 151 } | 172 } |
| 152 }) | 173 }) |
| 153 | 174 |
| 154 So(dumpStringSet(seen), ShouldResemble, msgs) | 175 So(dumpStringSet(seen), ShouldResemble, msgs) |
| 155 So(ack.getACKs(), ShouldResemble, msgs) | 176 So(ack.getACKs(), ShouldResemble, msgs) |
| 156 }) | 177 }) |
| 157 | 178 |
| 158 Convey(`A Subscriber that encounters an empty message set will s
leep and try again.`, func() { | 179 Convey(`A Subscriber that encounters an empty message set will s
leep and try again.`, func() { |
| 159 var count int32 | 180 var count int32 |
| 160 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ | 181 tc.SetTimerCallback(func(d time.Duration, t clock.Timer)
{ |
| (...skipping 24 matching lines...) Expand all Loading... |
| 185 src.message("a", "b") | 206 src.message("a", "b") |
| 186 src.error(errors.New("test error")) | 207 src.error(errors.New("test error")) |
| 187 src.message("c", "d") | 208 src.message("c", "d") |
| 188 }) | 209 }) |
| 189 | 210 |
| 190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) | 211 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) |
| 191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) | 212 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) |
| 192 }) | 213 }) |
| 193 }) | 214 }) |
| 194 } | 215 } |
| OLD | NEW |