| Index: tests/lib_strong/async/stream_controller_async_test.dart
|
| diff --git a/tests/lib_strong/async/stream_controller_async_test.dart b/tests/lib_strong/async/stream_controller_async_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..3fed8a88cf890633e91db5a732220c8fadc21627
|
| --- /dev/null
|
| +++ b/tests/lib_strong/async/stream_controller_async_test.dart
|
| @@ -0,0 +1,771 @@
|
| +// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// Test the basic StreamController and StreamController.broadcast.
|
| +library stream_controller_async_test;
|
| +
|
| +import 'dart:async';
|
| +import "package:expect/expect.dart";
|
| +import 'package:unittest/unittest.dart';
|
| +import 'event_helper.dart';
|
| +import 'stream_state_helper.dart';
|
| +
|
| +void cancelSub(StreamSubscription sub) { sub.cancel(); }
|
| +
|
| +testController() {
|
| + // Test fold
|
| + test("StreamController.fold", () {
|
| + StreamController c = new StreamController();
|
| + Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
|
| + stream.fold(0, (a,b) => a + b)
|
| + .then(expectAsync((int v) {
|
| + Expect.equals(42, v);
|
| + }));
|
| + c.add(10);
|
| + c.add(32);
|
| + c.close();
|
| + });
|
| +
|
| + test("StreamController.fold throws", () {
|
| + StreamController c = new StreamController();
|
| + Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
|
| + stream.fold(0, (a,b) { throw "Fnyf!"; })
|
| + .catchError(expectAsync((error) { Expect.equals("Fnyf!", error); }));
|
| + c.add(42);
|
| + });
|
| +}
|
| +
|
| +testSingleController() {
|
| + test("Single-subscription StreamController.fold", () {
|
| + StreamController c = new StreamController();
|
| + Stream stream = c.stream;
|
| + stream.fold(0, (a,b) => a + b)
|
| + .then(expectAsync((int v) { Expect.equals(42, v); }));
|
| + c.add(10);
|
| + c.add(32);
|
| + c.close();
|
| + });
|
| +
|
| + test("Single-subscription StreamController.fold throws", () {
|
| + StreamController c = new StreamController();
|
| + Stream stream = c.stream;
|
| + stream.fold(0, (a,b) { throw "Fnyf!"; })
|
| + .catchError(expectAsync((e) { Expect.equals("Fnyf!", e); }));
|
| + c.add(42);
|
| + });
|
| +
|
| + test("Single-subscription StreamController events are buffered when"
|
| + " there is no subscriber",
|
| + () {
|
| + StreamController c = new StreamController();
|
| + EventSink sink = c.sink;
|
| + Stream stream = c.stream;
|
| + int counter = 0;
|
| + sink.add(1);
|
| + sink.add(2);
|
| + sink.close();
|
| + stream.listen(
|
| + (data) {
|
| + counter += data;
|
| + },
|
| + onDone: expectAsync(() {
|
| + Expect.equals(3, counter);
|
| + }));
|
| + });
|
| +}
|
| +
|
| +testExtraMethods() {
|
| + Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close();
|
| +
|
| + test("forEach", () {
|
| + StreamController c = new StreamController();
|
| + Events actualEvents = new Events();
|
| + Future f = c.stream.forEach(actualEvents.add);
|
| + f.then(expectAsync((_) {
|
| + actualEvents.close();
|
| + Expect.listEquals(sentEvents.events, actualEvents.events);
|
| + }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("forEachError", () {
|
| + Events sentEvents = new Events()..add(7)..error("bad")..add(87)..close();
|
| + StreamController c = new StreamController();
|
| + Events actualEvents = new Events();
|
| + Future f = c.stream.forEach(actualEvents.add);
|
| + f.catchError(expectAsync((error) {
|
| + Expect.equals("bad", error);
|
| + Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
|
| + }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("forEachError2", () {
|
| + Events sentEvents = new Events()..add(7)..add(9)..add(87)..close();
|
| + StreamController c = new StreamController();
|
| + Events actualEvents = new Events();
|
| + Future f = c.stream.forEach((x) {
|
| + if (x == 9) throw "bad";
|
| + actualEvents.add(x);
|
| + });
|
| + f.catchError(expectAsync((error) {
|
| + Expect.equals("bad", error);
|
| + Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
|
| + }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("firstWhere", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.firstWhere((x) => (x % 3) == 0);
|
| + f.then(expectAsync((v) { Expect.equals(9, v); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("firstWhere 2", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.firstWhere((x) => (x % 4) == 0);
|
| + f.catchError(expectAsync((e) {}));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("firstWhere 3", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.firstWhere((x) => (x % 4) == 0, defaultValue: () => 999);
|
| + f.then(expectAsync((v) { Expect.equals(999, v); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| +
|
| + test("lastWhere", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.lastWhere((x) => (x % 3) == 0);
|
| + f.then(expectAsync((v) { Expect.equals(87, v); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("lastWhere 2", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.lastWhere((x) => (x % 4) == 0);
|
| + f.catchError(expectAsync((e) {}));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("lastWhere 3", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.lastWhere((x) => (x % 4) == 0, defaultValue: () => 999);
|
| + f.then(expectAsync((v) { Expect.equals(999, v); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("singleWhere", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.singleWhere((x) => (x % 9) == 0);
|
| + f.then(expectAsync((v) { Expect.equals(9, v); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("singleWhere 2", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.singleWhere((x) => (x % 3) == 0); // Matches 9 and 87..
|
| + f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("first", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.first;
|
| + f.then(expectAsync((v) { Expect.equals(7, v);}));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("first empty", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.first;
|
| + f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); }));
|
| + Events emptyEvents = new Events()..close();
|
| + emptyEvents.replay(c);
|
| + });
|
| +
|
| + test("first error", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.first;
|
| + f.catchError(expectAsync((error) { Expect.equals("error", error); }));
|
| + Events errorEvents = new Events()..error("error")..close();
|
| + errorEvents.replay(c);
|
| + });
|
| +
|
| + test("first error 2", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.first;
|
| + f.catchError(expectAsync((error) { Expect.equals("error", error); }));
|
| + Events errorEvents = new Events()..error("error")..error("error2")..close();
|
| + errorEvents.replay(c);
|
| + });
|
| +
|
| + test("last", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.last;
|
| + f.then(expectAsync((v) { Expect.equals(87, v);}));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("last empty", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.last;
|
| + f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); }));
|
| + Events emptyEvents = new Events()..close();
|
| + emptyEvents.replay(c);
|
| + });
|
| +
|
| + test("last error", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.last;
|
| + f.catchError(expectAsync((error) { Expect.equals("error", error); }));
|
| + Events errorEvents = new Events()..error("error")..close();
|
| + errorEvents.replay(c);
|
| + });
|
| +
|
| + test("last error 2", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.last;
|
| + f.catchError(expectAsync((error) { Expect.equals("error", error); }));
|
| + Events errorEvents = new Events()..error("error")..error("error2")..close();
|
| + errorEvents.replay(c);
|
| + });
|
| +
|
| + test("elementAt", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.elementAt(2);
|
| + f.then(expectAsync((v) { Expect.equals(13, v);}));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("elementAt 2", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.elementAt(20);
|
| + f.catchError(expectAsync((error) { Expect.isTrue(error is RangeError); }));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("drain", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.drain();
|
| + f.then(expectAsync((v) { Expect.equals(null, v);}));
|
| + sentEvents.replay(c);
|
| + });
|
| +
|
| + test("drain error", () {
|
| + StreamController c = new StreamController();
|
| + Future f = c.stream.drain();
|
| + f.catchError(expectAsync((error) { Expect.equals("error", error); }));
|
| + Events errorEvents = new Events()..error("error")..error("error2")..close();
|
| + errorEvents.replay(c);
|
| + });
|
| +
|
| +}
|
| +
|
| +testPause() {
|
| + test("pause event-unpause", () {
|
| +
|
| + StreamProtocolTest test = new StreamProtocolTest();
|
| + Completer completer = new Completer();
|
| + test..expectListen()
|
| + ..expectData(42, () { test.pause(completer.future); })
|
| + ..expectPause(() {
|
| + completer.complete(null);
|
| + })
|
| + ..expectData(43)
|
| + ..expectData(44)
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test.listen();
|
| + test.add(42);
|
| + test.add(43);
|
| + test.add(44);
|
| + test.close();
|
| + });
|
| +
|
| + test("pause twice event-unpause", () {
|
| + StreamProtocolTest test = new StreamProtocolTest();
|
| + Completer completer = new Completer();
|
| + Completer completer2 = new Completer();
|
| + test..expectListen()
|
| + ..expectData(42, () {
|
| + test.pause(completer.future);
|
| + test.pause(completer2.future);
|
| + })
|
| + ..expectPause(() {
|
| + completer.future.then(completer2.complete);
|
| + completer.complete(null);
|
| + })
|
| + ..expectData(43)
|
| + ..expectData(44)
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test..listen()
|
| + ..add(42)
|
| + ..add(43)
|
| + ..add(44)
|
| + ..close();
|
| + });
|
| +
|
| + test("pause twice direct-unpause", () {
|
| + StreamProtocolTest test = new StreamProtocolTest();
|
| + test..expectListen()
|
| + ..expectData(42, () {
|
| + test.pause();
|
| + test.pause();
|
| + })
|
| + ..expectPause(() {
|
| + test.resume();
|
| + test.resume();
|
| + })
|
| + ..expectData(43)
|
| + ..expectData(44)
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test..listen()
|
| + ..add(42)
|
| + ..add(43)
|
| + ..add(44)
|
| + ..close();
|
| + });
|
| +
|
| + test("pause twice direct-event-unpause", () {
|
| + StreamProtocolTest test = new StreamProtocolTest();
|
| + Completer completer = new Completer();
|
| + test..expectListen()
|
| + ..expectData(42, () {
|
| + test.pause();
|
| + test.pause(completer.future);
|
| + test.add(43);
|
| + test.add(44);
|
| + test.close();
|
| + })
|
| + ..expectPause(() {
|
| + completer.future.then((v) => test.resume());
|
| + completer.complete(null);
|
| + })
|
| + ..expectData(43)
|
| + ..expectData(44)
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test..listen()
|
| + ..add(42);
|
| + });
|
| +}
|
| +
|
| +class TestError { const TestError(); }
|
| +
|
| +testRethrow() {
|
| + TestError error = const TestError();
|
| +
|
| + testStream(name, streamValueTransform) {
|
| + test("rethrow-$name-value", () {
|
| + StreamController c = new StreamController();
|
| + Stream s = streamValueTransform(c.stream, (v) { throw error; });
|
| + s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync(
|
| + (e) { Expect.identical(error, e); }));
|
| + c.add(null);
|
| + c.close();
|
| + });
|
| + }
|
| +
|
| + testStreamError(name, streamErrorTransform) {
|
| + test("rethrow-$name-error", () {
|
| + StreamController c = new StreamController();
|
| + Stream s = streamErrorTransform(c.stream, (e) { throw error; });
|
| + s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync(
|
| + (e) { Expect.identical(error, e); }));
|
| + c.addError("SOME ERROR");
|
| + c.close();
|
| + });
|
| + }
|
| +
|
| + testFuture(name, streamValueTransform) {
|
| + test("rethrow-$name-value", () {
|
| + StreamController c = new StreamController();
|
| + Future f = streamValueTransform(c.stream, (v) { throw error; });
|
| + f.then((v) { Expect.fail("unreachable"); },
|
| + onError: expectAsync((e) { Expect.identical(error, e); }));
|
| + // Need two values to trigger compare for reduce.
|
| + c.add(0);
|
| + c.add(1);
|
| + c.close();
|
| + });
|
| + }
|
| +
|
| + testStream("where", (s, act) => s.where(act));
|
| + testStream("map", (s, act) => s.map(act));
|
| + testStream("expand", (s, act) => s.expand(act));
|
| + testStream("where", (s, act) => s.where(act));
|
| + testStreamError("handleError", (s, act) => s.handleError(act));
|
| + testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act));
|
| + testFuture("forEach", (s, act) => s.forEach(act));
|
| + testFuture("every", (s, act) => s.every(act));
|
| + testFuture("any", (s, act) => s.any(act));
|
| + testFuture("reduce", (s, act) => s.reduce((a,b) => act(b)));
|
| + testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b)));
|
| + testFuture("drain", (s, act) => s.drain().then(act));
|
| +}
|
| +
|
| +void testBroadcastController() {
|
| + test("broadcast-controller-basic", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.broadcast();
|
| + test..expectListen()
|
| + ..expectData(42)
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test..listen()
|
| + ..add(42)
|
| + ..close();
|
| + });
|
| +
|
| + test("broadcast-controller-listen-twice", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.broadcast();
|
| + test..expectListen()
|
| + ..expectData(42, () {
|
| + test.listen();
|
| + test.add(37);
|
| + test.close();
|
| + })
|
| + // Order is not guaranteed between subscriptions if not sync.
|
| + ..expectData(37)
|
| + ..expectData(37)
|
| + ..expectDone()
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test.listen();
|
| + test.add(42);
|
| + });
|
| +
|
| + test("broadcast-controller-listen-twice-non-overlap", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.broadcast();
|
| + test
|
| + ..expectListen(() {
|
| + test.add(42);
|
| + })
|
| + ..expectData(42, () {
|
| + test.cancel();
|
| + })
|
| + ..expectCancel(() {
|
| + test.listen();
|
| + })..expectListen(() {
|
| + test.add(37);
|
| + })
|
| + ..expectData(37, () {
|
| + test.close();
|
| + })
|
| + ..expectCancel()
|
| + ..expectDone(test.terminate);
|
| + test.listen();
|
| + });
|
| +
|
| + test("broadcast-controller-individual-pause", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.broadcast();
|
| + var sub1;
|
| + test..expectListen()
|
| + ..expectData(42)
|
| + ..expectData(42, () { sub1.pause(); })
|
| + ..expectData(43, () {
|
| + sub1.cancel();
|
| + test.listen();
|
| + test.add(44);
|
| + test.expectData(44);
|
| + test.expectData(44, test.terminate);
|
| + });
|
| + sub1 = test.listen();
|
| + test.listen();
|
| + test.add(42);
|
| + test.add(43);
|
| + });
|
| +
|
| + test("broadcast-controller-add-in-callback", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.broadcast();
|
| + test.expectListen();
|
| + var sub = test.listen();
|
| + test.add(42);
|
| + sub.expectData(42, () {
|
| + test.add(87);
|
| + sub.cancel();
|
| + });
|
| + test.expectCancel(() {
|
| + test.add(37);
|
| + test.terminate();
|
| + });
|
| + });
|
| +}
|
| +
|
| +void testAsBroadcast() {
|
| + test("asBroadcast-not-canceled", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
|
| + var sub;
|
| + test..expectListen()
|
| + ..expectBroadcastListen((_) {
|
| + test.add(42);
|
| + })
|
| + ..expectData(42, () {
|
| + sub.cancel();
|
| + })
|
| + ..expectBroadcastCancel((_) {
|
| + sub = test.listen();
|
| + })
|
| + ..expectBroadcastListen((_) {
|
| + test.terminate();
|
| + });
|
| + sub = test.listen();
|
| + });
|
| +
|
| + test("asBroadcast-canceled", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
|
| + var sub;
|
| + test..expectListen()
|
| + ..expectBroadcastListen((_) {
|
| + test.add(42);
|
| + })
|
| + ..expectData(42, () {
|
| + sub.cancel();
|
| + })
|
| + ..expectBroadcastCancel((originalSub) {
|
| + originalSub.cancel();
|
| + })
|
| + ..expectCancel(test.terminate);
|
| + sub = test.listen();
|
| + });
|
| +
|
| + test("asBroadcast-pause-original", () {
|
| + StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
|
| + var sub;
|
| + test..expectListen()
|
| + ..expectBroadcastListen((_) {
|
| + test.add(42);
|
| + test.add(43);
|
| + })
|
| + ..expectData(42, () {
|
| + sub.cancel();
|
| + })
|
| + ..expectBroadcastCancel((originalSub) {
|
| + originalSub.pause(); // Pause before sending 43 from original sub.
|
| + })
|
| + ..expectPause(() {
|
| + sub = test.listen();
|
| + })
|
| + ..expectBroadcastListen((originalSub) {
|
| + originalSub.resume();
|
| + })
|
| + ..expectData(43)
|
| + ..expectResume(() {
|
| + test.close();
|
| + })
|
| + ..expectCancel()
|
| + ..expectDone()
|
| + ..expectBroadcastCancel((_) => test.terminate());
|
| + sub = test.listen();
|
| + });
|
| +}
|
| +
|
| +void testSink({bool sync, bool broadcast, bool asBroadcast}) {
|
| + String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}";
|
| + test("$type-controller-sink", () {
|
| + var done = expectAsync((){});
|
| + var c = broadcast ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + var expected = new Events()
|
| + ..add(42)..error("error")
|
| + ..add(1)..add(2)..add(3)..add(4)..add(5)
|
| + ..add(43)..close();
|
| + var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream()
|
| + : c.stream);
|
| + var sink = c.sink;
|
| + sink.add(42);
|
| + sink.addError("error");
|
| + sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
|
| + .then((_) {
|
| + sink.add(43);
|
| + return sink.close();
|
| + })
|
| + .then((_) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| +
|
| + test("$type-controller-sink-canceled", () {
|
| + var done = expectAsync((){});
|
| + var c = broadcast ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + var expected = new Events()
|
| + ..add(42)..error("error")
|
| + ..add(1)..add(2)..add(3);
|
| + var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events();
|
| + var sub;
|
| + // Cancel subscription after receiving "3" event.
|
| + sub = stream.listen((v) {
|
| + if (v == 3) sub.cancel();
|
| + actual.add(v);
|
| + }, onError: actual.error);
|
| + var sink = c.sink;
|
| + sink.add(42);
|
| + sink.addError("error");
|
| + sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
|
| + .then((_) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + // Close controller as well. It has no listener. If it is a broadcast
|
| + // stream, it will still be open, and we read the "done" future before
|
| + // closing. A normal stream is already done when its listener cancels.
|
| + Future doneFuture = sink.done;
|
| + sink.close();
|
| + return doneFuture;
|
| + })
|
| + .then((_) {
|
| + // No change in events.
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| +
|
| + test("$type-controller-sink-paused", () {
|
| + var done = expectAsync((){});
|
| + var c = broadcast ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + var expected = new Events()
|
| + ..add(42)..error("error")
|
| + ..add(1)..add(2)..add(3)
|
| + ..add(4)..add(5)..add(43)..close();
|
| + var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events();
|
| + var sub;
|
| + var pauseIsDone = false;
|
| + sub = stream.listen(
|
| + (v) {
|
| + if (v == 3) {
|
| + sub.pause(new Future.delayed(const Duration(milliseconds: 15),
|
| + () { pauseIsDone = true; }));
|
| + }
|
| + actual.add(v);
|
| + },
|
| + onError: actual.error,
|
| + onDone: actual.close);
|
| + var sink = c.sink;
|
| + sink.add(42);
|
| + sink.addError("error");
|
| + sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5]))
|
| + .then((_) {
|
| + sink.add(43);
|
| + return sink.close();
|
| + })
|
| + .then((_) {
|
| + if (asBroadcast || broadcast) {
|
| + // The done-future of the sink completes when it passes
|
| + // the done event to the asBroadcastStream controller, which is
|
| + // before the final listener gets the event.
|
| + // Wait for the done event to be *delivered* before testing the
|
| + // events.
|
| + actual.onDone(() {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + } else {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + }
|
| + });
|
| + });
|
| +
|
| + test("$type-controller-addstream-error-stop", () {
|
| + // Check that addStream defaults to ending after the first error.
|
| + var done = expectAsync((){});
|
| + StreamController c = broadcast
|
| + ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events.capture(stream);
|
| +
|
| + var source = new Events();
|
| + source..add(1)..add(2)..error("BAD")..add(3)..error("FAIL")..close();
|
| +
|
| + var expected = new Events()..add(1)..add(2)..error("BAD")..close();
|
| + StreamController sourceController = new StreamController();
|
| + c.addStream(sourceController.stream).then((_) {
|
| + c.close().then((_) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| +
|
| + source.replay(sourceController);
|
| + });
|
| +
|
| + test("$type-controller-addstream-error-forward", () {
|
| + // Check that addStream with cancelOnError:false passes all data and errors
|
| + // to the controller.
|
| + var done = expectAsync((){});
|
| + StreamController c = broadcast
|
| + ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events.capture(stream);
|
| +
|
| + var source = new Events();
|
| + source..add(1)..add(2)..addError("BAD")..add(3)..addError("FAIL")..close();
|
| +
|
| + StreamController sourceController = new StreamController();
|
| + c.addStream(sourceController.stream, cancelOnError: false).then((_) {
|
| + c.close().then((_) {
|
| + Expect.listEquals(source.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| +
|
| + source.replay(sourceController);
|
| + });
|
| +
|
| + test("$type-controller-addstream-twice", () {
|
| + // Using addStream twice on the same stream
|
| + var done = expectAsync((){});
|
| + StreamController c = broadcast
|
| + ? new StreamController.broadcast(sync: sync)
|
| + : new StreamController(sync: sync);
|
| + Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
|
| + var actual = new Events.capture(stream);
|
| +
|
| + // Streams of five events, throws on 3.
|
| + Stream s1 = new Stream.fromIterable([1,2,3,4,5])
|
| + .map((x) => (x == 3 ? throw x : x));
|
| + Stream s2 = new Stream.fromIterable([1,2,3,4,5])
|
| + .map((x) => (x == 3 ? throw x : x));
|
| +
|
| + Events expected = new Events();
|
| + expected..add(1)..add(2)..error(3);
|
| + expected..add(1)..add(2)..error(3)..add(4)..add(5);
|
| + expected..close();
|
| +
|
| + c.addStream(s1).then((_) {
|
| + c.addStream(s2, cancelOnError: false).then((_) {
|
| + c.close().then((_) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + done();
|
| + });
|
| + });
|
| + });
|
| + });
|
| +}
|
| +
|
| +main() {
|
| + testController();
|
| + testSingleController();
|
| + testExtraMethods();
|
| + testPause();
|
| + testRethrow();
|
| + testBroadcastController();
|
| + testAsBroadcast();
|
| + testSink(sync: true, broadcast: false, asBroadcast: false);
|
| + testSink(sync: true, broadcast: false, asBroadcast: true);
|
| + testSink(sync: true, broadcast: true, asBroadcast: false);
|
| + testSink(sync: false, broadcast: false, asBroadcast: false);
|
| + testSink(sync: false, broadcast: false, asBroadcast: true);
|
| + testSink(sync: false, broadcast: true, asBroadcast: false);
|
| +}
|
|
|