| Index: tests/lib/async/stream_controller_async_test.dart
|
| diff --git a/tests/lib/async/stream_controller_async_test.dart b/tests/lib/async/stream_controller_async_test.dart
|
| index 7c134055bc249134739f8f69a72e0a50b0bb5738..e97ab2c4e4f96e4a2ea4d44547114fdaebda830f 100644
|
| --- a/tests/lib/async/stream_controller_async_test.dart
|
| +++ b/tests/lib/async/stream_controller_async_test.dart
|
| @@ -464,7 +464,6 @@ void testBroadcastController() {
|
|
|
| test("broadcast-controller-individual-pause", () {
|
| StreamProtocolTest test = new StreamProtocolTest.broadcast();
|
| - test.trace = true;
|
| var sub1;
|
| test..expectListen()
|
| ..expectData(42)
|
| @@ -498,6 +497,114 @@ void testBroadcastController() {
|
| });
|
| }
|
|
|
| +void testSink({bool sync, bool broadcast, bool asBroadcast}) {
|
| + String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}";
|
| + test("$type-controller-sink", () {
|
| + var done = expectAsync0((){});
|
| + var c = broadcast ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + var expected = new Events()
|
| + ..add(42)..error("error")
|
| + ..add(1)..add(2)..add(3)..add(4)..add(5)
|
| + ..add(43)..close();
|
| + var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream()
|
| + : c.stream);
|
| + var sink = c.sink;
|
| + sink.add(42);
|
| + sink.addError("error");
|
| + sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
|
| + .then((_) {
|
| + sink.add(43);
|
| + return sink.close();
|
| + })
|
| + .then((_) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| +
|
| + test("$type-controller-sink-canceled", () {
|
| + var done = expectAsync0((){});
|
| + var c = broadcast ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + var expected = new Events()
|
| + ..add(42)..error("error")
|
| + ..add(1)..add(2)..add(3);
|
| + var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events();
|
| + var sub;
|
| + // Cancel subscription after receiving "3" event.
|
| + sub = stream.listen((v) {
|
| + if (v == 3) sub.cancel();
|
| + actual.add(v);
|
| + }, onError: actual.error);
|
| + var sink = c.sink;
|
| + sink.add(42);
|
| + sink.addError("error");
|
| + sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
|
| + .then((_) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + // Close controller as well. It has no listener. If it is a broadcast
|
| + // stream, it will still be open, and we read the "done" future before
|
| + // closing. A normal stream is already done when its listener cancels.
|
| + Future doneFuture = sink.done;
|
| + sink.close();
|
| + return doneFuture;
|
| + })
|
| + .then((_) {
|
| + // No change in events.
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| +
|
| + test("$type-controller-sink-paused", () {
|
| + var done = expectAsync0((){});
|
| + var c = broadcast ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + var expected = new Events()
|
| + ..add(42)..error("error")
|
| + ..add(1)..add(2)..add(3)
|
| + ..add(4)..add(5)..add(43)..close();
|
| + var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events();
|
| + var sub;
|
| + sub = stream.listen(
|
| + (v) {
|
| + if (v == 3) {
|
| + sub.pause(new Future.delayed(const Duration(milliseconds: 15),
|
| + () => null));
|
| + }
|
| + actual.add(v);
|
| + },
|
| + onError: actual.error,
|
| + onDone: actual.close);
|
| + var sink = c.sink;
|
| + sink.add(42);
|
| + sink.addError("error");
|
| + sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
|
| + .then((_) {
|
| + sink.add(43);
|
| + return sink.close();
|
| + })
|
| + .then((_) {
|
| + if (asBroadcast) {
|
| + // The done-future of the sink completes when it passes
|
| + // the done event to the asBroadcastStream controller, which is
|
| + // before the final listener gets the event.
|
| + // Wait for the pause to end before testing the events.
|
| + return new Future.delayed(const Duration(milliseconds: 50), () {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + } else {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + }
|
| + });
|
| + });
|
| +}
|
| +
|
| main() {
|
| testController();
|
| testSingleController();
|
| @@ -505,4 +612,10 @@ main() {
|
| testPause();
|
| testRethrow();
|
| testBroadcastController();
|
| + testSink(sync: true, broadcast: false, asBroadcast: false);
|
| + testSink(sync: true, broadcast: false, asBroadcast: true);
|
| + testSink(sync: true, broadcast: true, asBroadcast: false);
|
| + testSink(sync: false, broadcast: false, asBroadcast: false);
|
| + testSink(sync: false, broadcast: false, asBroadcast: true);
|
| + testSink(sync: false, broadcast: true, asBroadcast: false);
|
| }
|
|
|