Index: test/multi_channel_test.dart |
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart |
index cc9ed1d2692a27f947fb2133baff2726644c2014..5be25053573f625317611ab19f7f72016bdcde4c 100644 |
--- a/test/multi_channel_test.dart |
+++ b/test/multi_channel_test.dart |
@@ -10,17 +10,13 @@ import 'package:test/test.dart'; |
import 'utils.dart'; |
void main() { |
- var oneToTwo; |
- var twoToOne; |
+ var controller; |
var channel1; |
var channel2; |
setUp(() { |
- oneToTwo = new StreamController(); |
- twoToOne = new StreamController(); |
- channel1 = new MultiChannel( |
- new StreamChannel(twoToOne.stream, oneToTwo.sink)); |
- channel2 = new MultiChannel( |
- new StreamChannel(oneToTwo.stream, twoToOne.sink)); |
+ controller = new StreamChannelController(); |
+ channel1 = new MultiChannel(controller.local); |
+ channel2 = new MultiChannel(controller.foreign); |
}); |
group("the default virtual channel", () { |
@@ -66,16 +62,16 @@ void main() { |
test("closes the underlying channel when it closes without any other " |
"virtual channels", () { |
- expect(oneToTwo.done, completes); |
- expect(twoToOne.done, completes); |
+ expect(controller.local.sink.done, completes); |
+ expect(controller.foreign.sink.done, completes); |
channel1.sink.close(); |
}); |
test("doesn't close the underlying channel when it closes with other " |
"virtual channels", () { |
- oneToTwo.done.then(expectAsync((_) {}, count: 0)); |
- twoToOne.done.then(expectAsync((_) {}, count: 0)); |
+ controller.local.sink.done.then(expectAsync((_) {}, count: 0)); |
+ controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); |
// Establish another virtual connection which should keep the underlying |
// connection open. |
@@ -149,16 +145,16 @@ void main() { |
channel1.sink.close(); |
await channel2.stream.toList(); |
- expect(oneToTwo.done, completes); |
- expect(twoToOne.done, completes); |
+ expect(controller.local.sink.done, completes); |
+ expect(controller.foreign.sink.done, completes); |
virtual1.sink.close(); |
}); |
test("doesn't close the underlying channel when it closes with other " |
"virtual channels", () { |
- oneToTwo.done.then(expectAsync((_) {}, count: 0)); |
- twoToOne.done.then(expectAsync((_) {}, count: 0)); |
+ controller.local.sink.done.then(expectAsync((_) {}, count: 0)); |
+ controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); |
virtual1.sink.close(); |
@@ -246,16 +242,16 @@ void main() { |
channel2.sink.close(); |
await channel1.stream.toList(); |
- expect(oneToTwo.done, completes); |
- expect(twoToOne.done, completes); |
+ expect(controller.local.sink.done, completes); |
+ expect(controller.foreign.sink.done, completes); |
virtual2.sink.close(); |
}); |
test("doesn't close the underlying channel when it closes with other " |
"virtual channels", () { |
- oneToTwo.done.then(expectAsync((_) {}, count: 0)); |
- twoToOne.done.then(expectAsync((_) {}, count: 0)); |
+ controller.local.sink.done.then(expectAsync((_) {}, count: 0)); |
+ controller.foreign.sink.done.then(expectAsync((_) {}, count: 0)); |
virtual2.sink.close(); |
@@ -288,16 +284,24 @@ void main() { |
expect(virtual2.stream.toList(), completion(isEmpty)); |
expect(virtual2.sink.done, completes); |
- oneToTwo.close(); |
+ controller.local.sink.close(); |
}); |
- test("closes, no more virtual channels may be created", () { |
- expect(channel1.sink.done.then((_) => channel1.virtualChannel()), |
- throwsStateError); |
- expect(channel2.sink.done.then((_) => channel2.virtualChannel()), |
- throwsStateError); |
+ test("closes, more virtual channels are created closed", () async { |
+ channel2.sink.close(); |
+ virtual2.sink.close(); |
+ |
+ // Wait for the existing channels to emit done events. |
+ await channel1.stream.toList(); |
+ await virtual1.stream.toList(); |
+ |
+ var virtual = channel1.virtualChannel(); |
+ expect(virtual.stream.toList(), completion(isEmpty)); |
+ expect(virtual.sink.done, completes); |
- oneToTwo.close(); |
+ virtual = channel1.virtualChannel(); |
+ expect(virtual.stream.toList(), completion(isEmpty)); |
+ expect(virtual.sink.done, completes); |
}); |
test("emits an error, the error is sent only to the default channel", () { |
@@ -306,7 +310,134 @@ void main() { |
virtual1.stream.listen(expectAsync((_) {}, count: 0), |
onError: expectAsync((_) {}, count: 0)); |
- twoToOne.addError("oh no"); |
+ controller.foreign.sink.addError("oh no"); |
+ }); |
+ }); |
+ |
+ group("stream channel rules", () { |
+ group("for the main stream:", () { |
+ test("closing the sink causes the stream to close before it emits any more " |
+ "events", () { |
+ channel1.sink.add(1); |
+ channel1.sink.add(2); |
+ channel1.sink.add(3); |
+ |
+ channel2.stream.listen(expectAsync((message) { |
+ expect(message, equals(1)); |
+ channel2.sink.close(); |
+ }, count: 1)); |
+ }); |
+ |
+ test("after the stream closes, the sink ignores events", () async { |
+ channel1.sink.close(); |
+ |
+ // Wait for the done event to be delivered. |
+ await channel2.stream.toList(); |
+ channel2.sink.add(1); |
+ channel2.sink.add(2); |
+ channel2.sink.add(3); |
+ channel2.sink.close(); |
+ |
+ // None of our channel.sink additions should make it to the other endpoint. |
+ channel1.stream.listen(expectAsync((_) {}, count: 0)); |
+ await pumpEventQueue(); |
+ }); |
+ |
+ test("canceling the stream's subscription has no effect on the sink", |
+ () async { |
+ channel1.stream.listen(null).cancel(); |
+ await pumpEventQueue(); |
+ |
+ channel1.sink.add(1); |
+ channel1.sink.add(2); |
+ channel1.sink.add(3); |
+ channel1.sink.close(); |
+ expect(channel2.stream.toList(), completion(equals([1, 2, 3]))); |
+ }); |
+ |
+ test("canceling the stream's subscription doesn't stop a done event", |
+ () async { |
+ channel1.stream.listen(null).cancel(); |
+ await pumpEventQueue(); |
+ |
+ channel2.sink.close(); |
+ await pumpEventQueue(); |
+ |
+ channel1.sink.add(1); |
+ channel1.sink.add(2); |
+ channel1.sink.add(3); |
+ channel1.sink.close(); |
+ |
+ // The sink should be ignoring events because the channel closed. |
+ channel2.stream.listen(expectAsync((_) {}, count: 0)); |
+ await pumpEventQueue(); |
+ }); |
+ }); |
+ |
+ group("for a virtual channel:", () { |
+ var virtual1; |
+ var virtual2; |
+ setUp(() { |
+ virtual1 = channel1.virtualChannel(); |
+ virtual2 = channel2.virtualChannel(virtual1.id); |
+ }); |
+ |
+ test("closing the sink causes the stream to close before it emits any more " |
+ "events", () { |
+ virtual1.sink.add(1); |
+ virtual1.sink.add(2); |
+ virtual1.sink.add(3); |
+ |
+ virtual2.stream.listen(expectAsync((message) { |
+ expect(message, equals(1)); |
+ virtual2.sink.close(); |
+ }, count: 1)); |
+ }); |
+ |
+ test("after the stream closes, the sink ignores events", () async { |
+ virtual1.sink.close(); |
+ |
+ // Wait for the done event to be delivered. |
+ await virtual2.stream.toList(); |
+ virtual2.sink.add(1); |
+ virtual2.sink.add(2); |
+ virtual2.sink.add(3); |
+ virtual2.sink.close(); |
+ |
+ // None of our virtual.sink additions should make it to the other endpoint. |
+ virtual1.stream.listen(expectAsync((_) {}, count: 0)); |
+ await pumpEventQueue(); |
+ }); |
+ |
+ test("canceling the stream's subscription has no effect on the sink", |
+ () async { |
+ virtual1.stream.listen(null).cancel(); |
+ await pumpEventQueue(); |
+ |
+ virtual1.sink.add(1); |
+ virtual1.sink.add(2); |
+ virtual1.sink.add(3); |
+ virtual1.sink.close(); |
+ expect(virtual2.stream.toList(), completion(equals([1, 2, 3]))); |
+ }); |
+ |
+ test("canceling the stream's subscription doesn't stop a done event", |
+ () async { |
+ virtual1.stream.listen(null).cancel(); |
+ await pumpEventQueue(); |
+ |
+ virtual2.sink.close(); |
+ await pumpEventQueue(); |
+ |
+ virtual1.sink.add(1); |
+ virtual1.sink.add(2); |
+ virtual1.sink.add(3); |
+ virtual1.sink.close(); |
+ |
+ // The sink should be ignoring events because the stream closed. |
+ virtual2.stream.listen(expectAsync((_) {}, count: 0)); |
+ await pumpEventQueue(); |
+ }); |
}); |
}); |
} |