| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package subscriber | 5 package subscriber |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "sort" | 9 "sort" |
| 10 "sync" | 10 "sync" |
| (...skipping 13 matching lines...) Expand all Loading... |
| 24 type event struct { | 24 type event struct { |
| 25 msg *pubsub.Message | 25 msg *pubsub.Message |
| 26 err error | 26 err error |
| 27 } | 27 } |
| 28 | 28 |
| 29 type testSource struct { | 29 type testSource struct { |
| 30 sub pubsub.Subscription | 30 sub pubsub.Subscription |
| 31 eventC chan event | 31 eventC chan event |
| 32 } | 32 } |
| 33 | 33 |
| 34 func (s *testSource) Pull(c context.Context) ([]*pubsub.Message, error) { | 34 func (s *testSource) Pull(c context.Context, batch int) ([]*pubsub.Message, erro
r) { |
| 35 select { | 35 select { |
| 36 case <-c.Done(): | 36 case <-c.Done(): |
| 37 return nil, c.Err() | 37 return nil, c.Err() |
| 38 | 38 |
| 39 case e := <-s.eventC: | 39 case e := <-s.eventC: |
| 40 switch { | 40 switch { |
| 41 case e.err != nil: | 41 case e.err != nil: |
| 42 return nil, e.err | 42 return nil, e.err |
| 43 | 43 |
| 44 case e.msg != nil: | 44 case e.msg != nil: |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) | 107 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) |
| 108 | 108 |
| 109 c, cancelFunc := context.WithCancel(c) | 109 c, cancelFunc := context.WithCancel(c) |
| 110 defer cancelFunc() | 110 defer cancelFunc() |
| 111 | 111 |
| 112 src := &testSource{ | 112 src := &testSource{ |
| 113 eventC: make(chan event), | 113 eventC: make(chan event), |
| 114 } | 114 } |
| 115 ack := &testACK{} | 115 ack := &testACK{} |
| 116 s := Subscriber{ | 116 s := Subscriber{ |
| 117 » » » S: src, | 117 » » » S: src, |
| 118 » » » A: ack, | 118 » » » A: ack, |
| 119 » » » PullWorkers: 8, | 119 » » » Workers: 8, |
| 120 } | 120 } |
| 121 | 121 |
| 122 var seenMu sync.Mutex | 122 var seenMu sync.Mutex |
| 123 seen := map[string]struct{}{} | 123 seen := map[string]struct{}{} |
| 124 blacklist := map[string]struct{}{} | 124 blacklist := map[string]struct{}{} |
| 125 runWith := func(cb func()) { | 125 runWith := func(cb func()) { |
| 126 doneC := make(chan struct{}) | 126 doneC := make(chan struct{}) |
| 127 go func() { | 127 go func() { |
| 128 defer close(doneC) | 128 defer close(doneC) |
| 129 s.Run(c, func(msg *pubsub.Message) bool { | 129 s.Run(c, func(msg *pubsub.Message) bool { |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 185 src.message("a", "b") | 185 src.message("a", "b") |
| 186 src.error(errors.New("test error")) | 186 src.error(errors.New("test error")) |
| 187 src.message("c", "d") | 187 src.message("c", "d") |
| 188 }) | 188 }) |
| 189 | 189 |
| 190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) | 190 So(dumpStringSet(seen), ShouldResemble, []string{"a", "b
", "c", "d"}) |
| 191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) | 191 So(ack.getACKs(), ShouldResemble, []string{"a", "b", "c"
, "d"}) |
| 192 }) | 192 }) |
| 193 }) | 193 }) |
| 194 } | 194 } |
| OLD | NEW |