| Index: tests/lib_strong/async/stream_controller_test.dart
|
| diff --git a/tests/lib_strong/async/stream_controller_test.dart b/tests/lib_strong/async/stream_controller_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5b019320843378c7cb6cc2ba66f7d3a595bf4847
|
| --- /dev/null
|
| +++ b/tests/lib_strong/async/stream_controller_test.dart
|
| @@ -0,0 +1,959 @@
|
| +// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// Test the basic StreamController and StreamController.singleSubscription.
|
| +library stream_controller_test;
|
| +
|
| +import "package:expect/expect.dart";
|
| +import "package:async_helper/async_helper.dart";
|
| +import 'dart:async';
|
| +import 'event_helper.dart';
|
| +
|
| +const MS = const Duration(milliseconds: 1);
|
| +
|
| +fail(e) { Expect.fail("Unexepected error: $e"); }
|
| +
|
| +void testMultiController() {
|
| + // Test normal flow.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + Events expectedEvents = new Events()
|
| + ..add(42)
|
| + ..add("dibs")
|
| + ..error("error!")
|
| + ..error("error too!")
|
| + ..close();
|
| + CaptureEvents actualEvents =
|
| + new Events.capture(c.stream.asBroadcastStream());
|
| + expectedEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test automatic unsubscription on error.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add(42)..error("error");
|
| + var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
|
| + cancelOnError: true);
|
| + Events sentEvents =
|
| + new Events()..add(42)..error("error")..add("Are you there?");
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test manual unsubscription.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add(42)..error("error")..add(37);
|
| + var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
|
| + cancelOnError: false);
|
| + expectedEvents.replay(c);
|
| + actualEvents.subscription.cancel();
|
| + c.add("Are you there"); // Not sent to actualEvents.
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test filter.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()
|
| + ..add("a string")..add("another string")..close();
|
| + var sentEvents = new Events()
|
| + ..add("a string")..add(42)..add("another string")..close();
|
| + var actualEvents = new Events.capture(c.stream
|
| + .asBroadcastStream()
|
| + .where((v) => v is String));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test map.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add("abab")..error("error")..close();
|
| + var sentEvents = new Events()..add("ab")..error("error")..close();
|
| + var actualEvents = new Events.capture(c.stream
|
| + .asBroadcastStream()
|
| + .map((v) => "$v$v"));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test handleError.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add("ab")..error("[foo]");
|
| + var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
|
| + var actualEvents = new Events.capture(c.stream
|
| + .asBroadcastStream()
|
| + .handleError((error) {
|
| + if (error is String) {
|
| + // TODO(floitsch): this test originally changed the stacktrace.
|
| + throw "[${error}]";
|
| + }
|
| + }), cancelOnError: true);
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // reduce is tested asynchronously and therefore not in this file.
|
| +
|
| + // Test expand
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
|
| + var expectedEvents = new Events()..add(1)..add(2)..add(3)
|
| + ..add(1)..add(2)
|
| + ..add(1)..add(2)..add(3)..add(4)
|
| + ..close();
|
| + var actualEvents =
|
| + new Events.capture(c.stream.asBroadcastStream().expand((v) {
|
| + var l = [];
|
| + for (int i = 0; i < v; i++) l.add(i + 1);
|
| + return l;
|
| + }));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test transform.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
|
| + var expectedEvents =
|
| + new Events()..error("a")..add(42)..error("b")..add("foo")..close();
|
| + var actualEvents =
|
| + new Events.capture(c.stream.asBroadcastStream().transform(
|
| + new StreamTransformer.fromHandlers(
|
| + handleData: (v, s) { s.addError(v); },
|
| + handleError: (e, st, s) { s.add(e); },
|
| + handleDone: (s) {
|
| + s.add("foo");
|
| + s.close();
|
| + })));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test multiple filters.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sentEvents = new Events()..add(42)
|
| + ..add("snugglefluffy")
|
| + ..add(7)
|
| + ..add("42")
|
| + ..error("not FormatException") // Unsubscribes.
|
| + ..close();
|
| + var expectedEvents = new Events()..add(42)..error("not FormatException");
|
| + var actualEvents = new Events.capture(
|
| + c.stream.asBroadcastStream().where((v) => v is String)
|
| + .map((v) => int.parse(v))
|
| + .handleError((error) {
|
| + if (error is! FormatException) throw error;
|
| + })
|
| + .where((v) => v > 10),
|
| + cancelOnError: true);
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test subscription changes while firing.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sink = c.sink;
|
| + var stream = c.stream.asBroadcastStream();
|
| + var counter = 0;
|
| + var subscription = stream.listen(null);
|
| + subscription.onData((data) {
|
| + counter += data;
|
| + subscription.cancel();
|
| + stream.listen((data) {
|
| + counter += 10 * data;
|
| + });
|
| + var subscription2 = stream.listen(null);
|
| + subscription2.onData((data) {
|
| + counter += 100 * data;
|
| + if (data == 4) subscription2.cancel();
|
| + });
|
| + });
|
| + sink.add(1); // seen by stream 1
|
| + sink.add(2); // seen by stream 10 and 100
|
| + sink.add(3); // -"-
|
| + sink.add(4); // -"-
|
| + sink.add(5); // seen by stream 10
|
| + Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
|
| + }
|
| +}
|
| +
|
| +testSingleController() {
|
| + // Test normal flow.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + Events expectedEvents = new Events()
|
| + ..add(42)
|
| + ..add("dibs")
|
| + ..error("error!")
|
| + ..error("error too!")
|
| + ..close();
|
| + CaptureEvents actualEvents = new Events.capture(c.stream);
|
| + expectedEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test automatic unsubscription on error.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add(42)..error("error");
|
| + var actualEvents = new Events.capture(c.stream, cancelOnError: true);
|
| + Events sentEvents =
|
| + new Events()..add(42)..error("error")..add("Are you there?");
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test manual unsubscription.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add(42)..error("error")..add(37);
|
| + var actualEvents = new Events.capture(c.stream, cancelOnError: false);
|
| + expectedEvents.replay(c);
|
| + actualEvents.subscription.cancel();
|
| + c.add("Are you there"); // Not sent to actualEvents.
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test filter.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()
|
| + ..add("a string")..add("another string")..close();
|
| + var sentEvents = new Events()
|
| + ..add("a string")..add(42)..add("another string")..close();
|
| + var actualEvents = new Events.capture(c.stream.where((v) => v is String));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test map.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add("abab")..error("error")..close();
|
| + var sentEvents = new Events()..add("ab")..error("error")..close();
|
| + var actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test handleError.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var expectedEvents = new Events()..add("ab")..error("[foo]");
|
| + var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
|
| + var actualEvents = new Events.capture(c.stream.handleError((error) {
|
| + if (error is String) {
|
| + // TODO(floitsch): this error originally changed the stack trace.
|
| + throw "[${error}]";
|
| + }
|
| + }), cancelOnError: true);
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // reduce is tested asynchronously and therefore not in this file.
|
| +
|
| + // Test expand
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
|
| + var expectedEvents = new Events()..add(1)..add(2)..add(3)
|
| + ..add(1)..add(2)
|
| + ..add(1)..add(2)..add(3)..add(4)
|
| + ..close();
|
| + var actualEvents = new Events.capture(c.stream.expand((v) {
|
| + var l = [];
|
| + for (int i = 0; i < v; i++) l.add(i + 1);
|
| + return l;
|
| + }));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // test contains.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + // Error after match is not important.
|
| + var sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
|
| + Future<bool> contains = c.stream.contains("x");
|
| + contains.then((var c) {
|
| + Expect.isTrue(c);
|
| + });
|
| + sentEvents.replay(c);
|
| + }
|
| +
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + // Not matching is ok.
|
| + var sentEvents = new Events()..add("a")..add("x")..add("b")..close();
|
| + Future<bool> contains = c.stream.contains("y");
|
| + contains.then((var c) {
|
| + Expect.isFalse(c);
|
| + });
|
| + sentEvents.replay(c);
|
| + }
|
| +
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + // Error before match makes future err.
|
| + var sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
|
| + Future<bool> contains = c.stream.contains("b");
|
| + contains.then((var c) {
|
| + Expect.fail("no value expected");
|
| + }).catchError((error) {
|
| + Expect.equals("FAIL", error);
|
| + });
|
| + sentEvents.replay(c);
|
| + }
|
| +
|
| + // Test transform.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
|
| + var expectedEvents =
|
| + new Events()..error("a")..add(42)..error("b")..add("foo")..close();
|
| + var actualEvents = new Events.capture(c.stream.transform(
|
| + new StreamTransformer.fromHandlers(
|
| + handleData: (v, s) { s.addError(v); },
|
| + handleError: (e, st, s) { s.add(e); },
|
| + handleDone: (s) {
|
| + s.add("foo");
|
| + s.close();
|
| + })));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test multiple filters.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sentEvents = new Events()..add(42)
|
| + ..add("snugglefluffy")
|
| + ..add(7)
|
| + ..add("42")
|
| + ..error("not FormatException") // Unsubscribes.
|
| + ..close();
|
| + var expectedEvents = new Events()..add(42)..error("not FormatException");
|
| + var actualEvents = new Events.capture(
|
| + c.stream.where((v) => v is String)
|
| + .map((v) => int.parse(v))
|
| + .handleError((error) {
|
| + if (error is! FormatException) throw error;
|
| + })
|
| + .where((v) => v > 10),
|
| + cancelOnError: true);
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| + }
|
| +
|
| + // Test that only one subscription is allowed.
|
| + {
|
| + var c = new StreamController(sync: true);
|
| + var sink = c.sink;
|
| + var stream = c.stream;
|
| + var counter = 0;
|
| + var subscription = stream.listen((data) { counter += data; });
|
| + Expect.throws(() => stream.listen(null), (e) => e is StateError);
|
| + sink.add(1);
|
| + Expect.equals(1, counter);
|
| + c.close();
|
| + }
|
| +}
|
| +
|
| +testExtraMethods() {
|
| + Events sentEvents = new Events()..add(1)..add(2)..add(3)..close();
|
| +
|
| + var c = new StreamController(sync: true);
|
| + Events expectedEvents = new Events()..add(3)..close();
|
| + Events actualEvents = new Events.capture(c.stream.skip(2));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..close();
|
| + actualEvents = new Events.capture(c.stream.skip(3));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..close();
|
| + actualEvents = new Events.capture(c.stream.skip(7));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = sentEvents;
|
| + actualEvents = new Events.capture(c.stream.skip(0));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..add(3)..close();
|
| + actualEvents = new Events.capture(c.stream.skipWhile((x) => x <= 2));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..add(2)..add(3)..close();
|
| + actualEvents = new Events.capture(c.stream.skipWhile((x) => x <= 1));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..add(1)..add(2)..add(3)..close();
|
| + actualEvents = new Events.capture(c.stream.skipWhile((x) => false));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..add(1)..add(2)..close();
|
| + actualEvents = new Events.capture(c.stream.take(2));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + expectedEvents = new Events()..add(1)..add(2)..close();
|
| + actualEvents = new Events.capture(c.stream.takeWhile((x) => x <= 2));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + sentEvents = new Events()
|
| + ..add(1)..add(1)..add(2)..add(1)..add(2)..add(2)..add(2)..close();
|
| + expectedEvents = new Events()
|
| + ..add(1)..add(2)..add(1)..add(2)..close();
|
| + actualEvents = new Events.capture(c.stream.distinct());
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +
|
| +
|
| + c = new StreamController(sync: true);
|
| + sentEvents = new Events()
|
| + ..add(5)..add(6)..add(4)..add(6)..add(8)..add(3)..add(4)..add(1)..close();
|
| + expectedEvents = new Events()
|
| + ..add(5)..add(4)..add(3)..add(1)..close();
|
| + // Use 'distinct' as a filter with access to the previously emitted event.
|
| + actualEvents = new Events.capture(c.stream.distinct((a, b) => a < b));
|
| + sentEvents.replay(c);
|
| + Expect.listEquals(expectedEvents.events, actualEvents.events);
|
| +}
|
| +
|
| +void testClosed() {
|
| + StreamController c = new StreamController(sync: true);
|
| + Expect.isFalse(c.isClosed);
|
| + c.add(42);
|
| + Expect.isFalse(c.isClosed);
|
| + c.addError("bad");
|
| + Expect.isFalse(c.isClosed);
|
| + c.close();
|
| + Expect.isTrue(c.isClosed);
|
| +}
|
| +
|
| +void testCloseFuture() {
|
| + asyncStart();
|
| + asyncStart();
|
| + var c = new StreamController();
|
| + var f = c.close();
|
| + Expect.isTrue(c.isClosed);
|
| + bool doneSeen = false;
|
| + f.then((_) {
|
| + Expect.isTrue(doneSeen);
|
| + asyncEnd();
|
| + });
|
| + // Only listen after a while.
|
| + new Timer(MS * 250, () {
|
| + c.stream.listen(null, onDone: () {
|
| + asyncEnd();
|
| + doneSeen = true;
|
| + });
|
| + });
|
| +}
|
| +
|
| +void testCloseFuture2() {
|
| + asyncStart();
|
| + asyncStart();
|
| + var c = new StreamController.broadcast();
|
| + var f = c.close();
|
| + Expect.isTrue(c.isClosed);
|
| + bool doneSeen = false;
|
| + f.then((_) {
|
| + // Done future on broadcast stream can happen
|
| + // before a listener is added.
|
| + Expect.isFalse(doneSeen);
|
| + asyncEnd();
|
| + });
|
| + // Only listen after a while.
|
| + new Timer(MS * 250, () {
|
| + c.stream.listen(null, onDone: () {
|
| + doneSeen = true;
|
| + asyncEnd();
|
| + });
|
| + });
|
| +}
|
| +
|
| +void testCloseFuture3() {
|
| + asyncStart();
|
| + var c = new StreamController.broadcast();
|
| + c..add(1)..add(2)..add(3)..add(4);
|
| + c.stream.listen(null).cancel();
|
| + var f = c.close();
|
| + Expect.isTrue(c.isClosed);
|
| + f.then((_) {
|
| + asyncEnd();
|
| + });
|
| +}
|
| +
|
| +void testStreamEquals() {
|
| + StreamController c;
|
| + c = new StreamController(sync: false);
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController(sync: true);
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController(sync: false, onListen: () {});
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController(sync: true, onListen: () {});
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController.broadcast(sync: false);
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController.broadcast(sync: true);
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController.broadcast(sync: false, onListen: () {});
|
| + Expect.equals(c.stream, c.stream);
|
| + c = new StreamController.broadcast(sync: true, onListen: () {});
|
| + Expect.equals(c.stream, c.stream);
|
| +}
|
| +
|
| +void testCancelThrow() {
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + StreamController c = new StreamController(onCancel: () {
|
| + asyncEnd();
|
| + throw "ERROR";
|
| + });
|
| + c.add(1);
|
| + c.add(2);
|
| + c.add(3);
|
| + Future done = c.close();
|
| + StreamSubscription sub;
|
| + sub = c.stream.listen((v) {
|
| + Expect.equals(1, v);
|
| + Future f = sub.cancel();
|
| + f.catchError((e) {
|
| + // Must complete with error from onCancel.
|
| + Expect.equals("ERROR", e);
|
| + asyncEnd();
|
| + });
|
| + });
|
| + done.catchError(fail).whenComplete(asyncEnd); // Must complete without error.
|
| +}
|
| +
|
| +void testCancelThrow2() {
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + StreamController c2 = new StreamController(onCancel: () {
|
| + asyncEnd();
|
| + throw "ERROR";
|
| + });
|
| + c2.add(1);
|
| + c2.add(2);
|
| + Future done2 = c2.close();
|
| + done2.catchError(fail).whenComplete(asyncEnd); // Should not get error;
|
| +
|
| + StreamController c = new StreamController();
|
| + var sub;
|
| + sub = c.stream.listen((v) {
|
| + Expect.equals(1, v);
|
| + Future f = sub.cancel();
|
| + f.catchError((e) {
|
| + // Error from addStream stream's cancel must go only here.
|
| + asyncEnd();
|
| + Expect.equals("ERROR", e);
|
| + });
|
| + });
|
| + var addDone = c.addStream(c2.stream);
|
| + addDone.catchError(fail).whenComplete(asyncEnd); // Should not get error.
|
| + var done = c.done;
|
| + done.catchError(fail).whenComplete(asyncEnd); // Should not get error.
|
| +}
|
| +
|
| +void testCancelThrow3() {
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + asyncStart();
|
| + StreamController c2 = new StreamController(onCancel: () {
|
| + asyncEnd();
|
| + throw "ERROR2";
|
| + });
|
| + c2.add(1);
|
| + c2.add(2);
|
| + var done2 = c2.close();
|
| + done2.catchError(fail).whenComplete(asyncEnd); // Should not get error;
|
| +
|
| + StreamController c = new StreamController(onCancel: () {
|
| + asyncEnd();
|
| + throw "ERROR1";
|
| + });
|
| + var sub;
|
| + sub = c.stream.listen((v) {
|
| + Expect.equals(1, v);
|
| + Future f = sub.cancel();
|
| + f.catchError((e) {
|
| + // Only the last error ends up here.
|
| + Expect.equals("ERROR1", e);
|
| + asyncEnd();
|
| + });
|
| + });
|
| + var addDone = c.addStream(c2.stream);
|
| + addDone.catchError(fail).whenComplete(asyncEnd); // Error must not go here.
|
| + c.done.catchError(fail).whenComplete(asyncEnd); // Error must not go here.
|
| +}
|
| +
|
| +void testBroadcastListenAfterClose() {
|
| + asyncStart();
|
| + StreamController c = new StreamController.broadcast();
|
| + var f = c.close();
|
| + f.then((_) {
|
| + // Listening after close is allowed. The listener gets a done event.
|
| + c.stream.listen(null, onDone: asyncEnd);
|
| + });
|
| +}
|
| +
|
| +void testBroadcastListenAfterClosePaused() {
|
| + asyncStart();
|
| + StreamController c = new StreamController.broadcast();
|
| + var f = c.close();
|
| + f.then((_) {
|
| + // Listening after close is allowed. The listener gets a done event.
|
| + var sub = c.stream.listen(null, onDone: () {
|
| + Expect.fail("wrong done");
|
| + });
|
| + sub.pause();
|
| + sub.pause();
|
| + new Timer(MS * 100, () {
|
| + sub.asFuture().whenComplete(() { Expect.fail("Bad complete"); });
|
| + sub.resume();
|
| + new Timer(MS * 100, () {
|
| + sub.onDone(asyncEnd);
|
| + sub.resume();
|
| + });
|
| + });
|
| + });
|
| +}
|
| +
|
| +void testAsBroadcastListenAfterClose() {
|
| + asyncStart();
|
| + asyncStart();
|
| + StreamController c = new StreamController();
|
| + Stream s = c.stream.asBroadcastStream();
|
| + s.listen(null, onDone: asyncEnd);
|
| + var f = c.close();
|
| + f.then((_) {
|
| + // Listening after close is allowed. The listener gets a done event.
|
| + s.listen(null, onDone: asyncEnd);
|
| + });
|
| +}
|
| +
|
| +void testAsBroadcastListenAfterClosePaused() {
|
| + asyncStart();
|
| + asyncStart();
|
| + StreamController c = new StreamController();
|
| + Stream s = c.stream.asBroadcastStream();
|
| + s.listen(null, onDone: asyncEnd);
|
| + var f = c.close();
|
| + f.then((_) {
|
| + // Listening after close is allowed. The listener gets a done event.
|
| + var sub = s.listen(null, onDone: () {
|
| + Expect.fail("wrong done");
|
| + });
|
| + sub.pause();
|
| + sub.pause();
|
| + new Timer(MS * 100, () {
|
| + sub.asFuture().whenComplete(() { Expect.fail("Bad complete"); });
|
| + sub.resume();
|
| + new Timer(MS * 100, () {
|
| + sub.onDone(asyncEnd);
|
| + sub.resume();
|
| + });
|
| + });
|
| + });
|
| +}
|
| +
|
| +void testEventInListen() {
|
| + asyncStart();
|
| + // Regression test for http://dartbug.com/19722
|
| + var c;
|
| + void send() {
|
| + c.add(1);
|
| + }
|
| + int i = 1;
|
| + c = new StreamController.broadcast(onListen: send, sync: true);
|
| + c.stream.listen((v) {
|
| + Expect.equals(i++, v);
|
| + }, onDone: asyncEnd);
|
| + c.add(2);
|
| + c.close();
|
| +}
|
| +
|
| +void testSyncControllerNotReentrant() {
|
| + Stream emptyStream = (new StreamController.broadcast()..close()).stream;
|
| + asyncStart();
|
| + for (int listenerCount = 1; listenerCount <= 2; listenerCount++) {
|
| + StreamController c = new StreamController.broadcast(sync: true);
|
| + for (int i = 0; i < listenerCount; i++) {
|
| + asyncStart();
|
| + asyncStart();
|
| + c.stream.listen((v) {
|
| + Expect.equals(42, v);
|
| + Expect.throws(() {
|
| + c.add(37);
|
| + });
|
| + Expect.throws(() {
|
| + c.addError(37);
|
| + });
|
| + Expect.throws(() {
|
| + c.addStream(emptyStream);
|
| + });
|
| + Expect.throws(() {
|
| + c.close();
|
| + });
|
| + asyncEnd();
|
| + }, onError: (e, s) {
|
| + Expect.equals(87, e);
|
| + Expect.throws(() {
|
| + c.add(37);
|
| + });
|
| + Expect.throws(() {
|
| + c.addError(37);
|
| + });
|
| + Expect.throws(() {
|
| + c.addStream(emptyStream);
|
| + });
|
| + Expect.throws(() {
|
| + c.close();
|
| + });
|
| + asyncEnd();
|
| + });
|
| + }
|
| + c.add(42);
|
| + c.addError(87);
|
| + }
|
| + asyncEnd();
|
| +}
|
| +
|
| +void testSettingCallbacks() {
|
| + const int initial = 0;
|
| + const int running = 1;
|
| + const int paused = 2;
|
| + const int canceled = 3;
|
| +
|
| + var controller = new StreamController();
|
| + var stream = controller.stream;
|
| + var state = initial;
|
| +
|
| + var onListen = () { state = running; };
|
| + var onPause = () { state = paused; };
|
| + var onResume = () { state = running; };
|
| + var onCancel = () { state = canceled; };
|
| +
|
| + Expect.isNull(controller.onListen);
|
| + Expect.isNull(controller.onPause);
|
| + Expect.isNull(controller.onResume);
|
| + Expect.isNull(controller.onCancel);
|
| +
|
| + controller..onListen = onListen
|
| + ..onPause = onPause
|
| + ..onResume = onResume
|
| + ..onCancel = onCancel;
|
| +
|
| + Expect.equals(onListen, controller.onListen);
|
| + Expect.equals(onPause, controller.onPause);
|
| + Expect.equals(onResume, controller.onResume);
|
| + Expect.equals(onCancel, controller.onCancel);
|
| +
|
| + Expect.equals(initial, state);
|
| + var sub = stream.listen(null);
|
| + Expect.equals(running, state);
|
| + sub.pause();
|
| + Expect.equals(paused, state);
|
| + Expect.isTrue(controller.isPaused);
|
| + sub.resume();
|
| + Expect.equals(running, state);
|
| + Expect.isFalse(controller.isPaused);
|
| +
|
| + var onListen2 = () { state = -running; };
|
| + var onPause2 = () { state = -paused; };
|
| + var onResume2 = () { state = -running; };
|
| + var onCancel2 = () { state = -canceled; };
|
| + // Changing them later does make a difference.
|
| + controller..onListen = onListen2
|
| + ..onPause = onPause2
|
| + ..onResume = onResume2
|
| + ..onCancel = onCancel2;
|
| +
|
| + Expect.equals(onListen2, controller.onListen);
|
| + Expect.equals(onPause2, controller.onPause);
|
| + Expect.equals(onResume2, controller.onResume);
|
| + Expect.equals(onCancel2, controller.onCancel);
|
| +
|
| + Expect.equals(running, state);
|
| + sub.pause();
|
| + Expect.equals(-paused, state);
|
| + Expect.isTrue(controller.isPaused);
|
| + sub.resume();
|
| + Expect.equals(-running, state);
|
| + Expect.isFalse(controller.isPaused);
|
| + sub.cancel();
|
| + Expect.equals(-canceled, state);
|
| +}
|
| +
|
| +void testSettingNullCallbacks() {
|
| + failCallback() => fail("Callback should not be called");
|
| + var controller = new StreamController(onListen: failCallback,
|
| + onPause : failCallback,
|
| + onResume: failCallback,
|
| + onCancel: failCallback);
|
| +
|
| + var stream = controller.stream;
|
| +
|
| + Expect.isFalse(controller.hasListener);
|
| + Expect.isTrue(controller.isPaused);
|
| +
|
| + Expect.isNotNull(controller.onListen);
|
| + controller.onListen = null;
|
| + Expect.isNull(controller.onListen);
|
| +
|
| + var sub = stream.listen(null);
|
| +
|
| + Expect.isTrue(controller.hasListener);
|
| + Expect.isFalse(controller.isPaused);
|
| +
|
| + Expect.isNotNull(controller.onPause);
|
| + controller.onPause = null;
|
| + Expect.isNull(controller.onPause);
|
| +
|
| + sub.pause();
|
| +
|
| + Expect.isTrue(controller.hasListener);
|
| + Expect.isTrue(controller.isPaused);
|
| +
|
| + Expect.isNotNull(controller.onResume);
|
| + controller.onResume = null;
|
| + Expect.isNull(controller.onResume);
|
| +
|
| + sub.resume();
|
| +
|
| + Expect.isTrue(controller.hasListener);
|
| + Expect.isFalse(controller.isPaused);
|
| +
|
| + Expect.isNotNull(controller.onCancel);
|
| + controller.onCancel = null;
|
| + Expect.isNull(controller.onCancel);
|
| +
|
| + sub.cancel();
|
| +
|
| + Expect.isFalse(controller.hasListener);
|
| + Expect.isFalse(controller.isPaused);
|
| +}
|
| +
|
| +void testBroadcastSettingCallbacks() {
|
| + const int initial = 0;
|
| + const int running = 1;
|
| + const int canceled = 2;
|
| +
|
| + var controller = new StreamController.broadcast();
|
| + var stream = controller.stream;
|
| + var state = initial;
|
| +
|
| + Expect.throws(() { controller.onPause = () {}; },
|
| + (e) => e is UnsupportedError);
|
| + Expect.throws(() { controller.onResume = () {}; },
|
| + (e) => e is UnsupportedError);
|
| +
|
| + controller..onListen = () { state = running; }
|
| + ..onCancel = () { state = canceled; };
|
| +
|
| + Expect.equals(initial, state);
|
| + var sub = stream.listen(null);
|
| + Expect.equals(running, state);
|
| + sub.cancel();
|
| + Expect.equals(canceled, state);
|
| +
|
| + // Changing them later does make a difference.
|
| + controller..onListen = () { state = -running; }
|
| + ..onCancel = () { state = -canceled; };
|
| +
|
| + var sub2 = stream.listen(null);
|
| + Expect.equals(-running, state);
|
| + sub2.cancel();
|
| + Expect.equals(-canceled, state);
|
| +}
|
| +
|
| +void testBroadcastSettingNullCallbacks() {
|
| + failCallback() => fail("Callback should not be called");
|
| + var controller = new StreamController.broadcast(onListen: failCallback,
|
| + onCancel: failCallback);
|
| +
|
| + var stream = controller.stream;
|
| +
|
| + Expect.isFalse(controller.hasListener);
|
| +
|
| + controller.onListen = null;
|
| +
|
| + var sub = stream.listen(null);
|
| +
|
| + Expect.isTrue(controller.hasListener);
|
| +
|
| + controller.onCancel = null;
|
| +
|
| + sub.cancel();
|
| +
|
| + Expect.isFalse(controller.hasListener);
|
| +}
|
| +
|
| +main() {
|
| + asyncStart();
|
| + testMultiController();
|
| + testSingleController();
|
| + testExtraMethods();
|
| + testClosed();
|
| + testCloseFuture();
|
| + testCloseFuture2();
|
| + testCloseFuture3();
|
| + testStreamEquals();
|
| + testCancelThrow();
|
| + testCancelThrow2();
|
| + testCancelThrow3();
|
| + testBroadcastListenAfterClose();
|
| + testBroadcastListenAfterClosePaused();
|
| + testAsBroadcastListenAfterClose();
|
| + testAsBroadcastListenAfterClosePaused();
|
| + testEventInListen();
|
| + testSyncControllerNotReentrant();
|
| + testSettingCallbacks();
|
| + testSettingNullCallbacks();
|
| + testBroadcastSettingCallbacks();
|
| + testBroadcastSettingNullCallbacks();
|
| + asyncEnd();
|
| +}
|
|
|