| Index: tests/lib/async/merge_stream_test.dart
|
| diff --git a/tests/lib/async/merge_stream_test.dart b/tests/lib/async/merge_stream_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..1d47cf0ae807827d1b3b18df6d75ce660821f867
|
| --- /dev/null
|
| +++ b/tests/lib/async/merge_stream_test.dart
|
| @@ -0,0 +1,172 @@
|
| +// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// Test merging streams.
|
| +import "dart:async";
|
| +import '../../../pkg/unittest/lib/unittest.dart';
|
| +import 'event_helper.dart';
|
| +
|
| +testSupercedeStream() {
|
| + { // Simple case of superceding lower priority streams.
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
|
| + Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close();
|
| + Events actual = new Events.capture(merge);
|
| + s1.add(1);
|
| + s2.add(2);
|
| + s1.add(1); // Ignored.
|
| + s2.add(3);
|
| + s3.add(4);
|
| + s2.add(3); // Ignored.
|
| + s3.close();
|
| + Expect.listEquals(expected.events, actual.events);
|
| + }
|
| +
|
| + { // Superceding more than one stream at a time.
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
|
| + Events expected = new Events()..add(1)..add(2)..close();
|
| + Events actual = new Events.capture(merge);
|
| + s1.add(1);
|
| + s3.add(2);
|
| + s1.add(1); // Ignored.
|
| + s2.add(1); // Ignored.
|
| + s3.close();
|
| + Expect.listEquals(expected.events, actual.events);
|
| + }
|
| +
|
| + { // Closing a stream before superceding it.
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
|
| + Events expected = new Events()..add(1)..add(2)..add(3)..close();
|
| + Events actual = new Events.capture(merge);
|
| + s1.add(1);
|
| + s1.close();
|
| + s3.close();
|
| + s2.add(2);
|
| + s2.add(3);
|
| + s2.close();
|
| + Expect.listEquals(expected.events, actual.events);
|
| + }
|
| +
|
| + { // Errors from all non-superceded streams are forwarded.
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
|
| + Events expected =
|
| + new Events()..add(1)..error("1")..error("2")..error("3")
|
| + ..add(3)..error("6")..add(4)..close();
|
| + Events actual = new Events.capture(merge);
|
| + s1.add(1);
|
| + s1.signalError(new AsyncError("1"));
|
| + s2.signalError(new AsyncError("2"));
|
| + s3.signalError(new AsyncError("3"));
|
| + s3.add(3);
|
| + s1.signalError(new AsyncError("4"));
|
| + s2.signalError(new AsyncError("5"));
|
| + s3.signalError(new AsyncError("6"));
|
| + s1.close();
|
| + s2.close();
|
| + s3.add(4);
|
| + s3.close();
|
| + Expect.listEquals(expected.events, actual.events);
|
| + }
|
| +
|
| + test("Pausing on a superceding stream", () {
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
|
| + Events expected = new Events()..add(1)..add(2)..add(3);
|
| + Events actual = new Events.capture(merge);
|
| + s1.add(1);
|
| + s2.add(2);
|
| + s2.add(3);
|
| + Expect.listEquals(expected.events, actual.events);
|
| + actual.pause(); // Pauses the stream that feeds the actual Events.
|
| + Events expected2 = expected.copy();
|
| + expected..add(5)..add(6)..close();
|
| + expected2..add(6)..close();
|
| + s1.add(4);
|
| + s2.add(5); // May or may not arrive before '6' when resuming.
|
| + s3.add(6);
|
| + s3.close();
|
| + actual.onDone(expectAsync0(() {
|
| + if (expected.events.length == actual.events.length) {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + } else {
|
| + Expect.listEquals(expected2.events, actual.events);
|
| + }
|
| + }));
|
| + actual.resume();
|
| + });
|
| +}
|
| +
|
| +void testCyclicStream() {
|
| + test("Simple case of superceding lower priority streams", () {
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
|
| + Events expected =
|
| + new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close();
|
| + Events actual = new Events.capture(merge);
|
| + Expect.isFalse(s1.isPaused);
|
| + Expect.isTrue(s2.isPaused);
|
| + Expect.isTrue(s3.isPaused);
|
| + s3.add(3);
|
| + s1.add(1);
|
| + s1.add(4);
|
| + s1.add(6);
|
| + s1.close();
|
| + s2.add(2);
|
| + s2.add(5);
|
| + s2.close();
|
| + s3.close();
|
| + actual.onDone(expectAsync0(() {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + }));
|
| + });
|
| +
|
| + test("Cyclic merge with errors", () {
|
| + StreamController s1 = new StreamController();
|
| + StreamController s2 = new StreamController();
|
| + StreamController s3 = new StreamController();
|
| + Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
|
| + Events expected =
|
| + new Events()..add(1)..error("1")..add(2)..add(3)..error("2")
|
| + ..add(4)..add(5)..error("3")..add(6)..close();
|
| + Events actual = new Events.capture(merge);
|
| + Expect.isFalse(s1.isPaused);
|
| + Expect.isTrue(s2.isPaused);
|
| + Expect.isTrue(s3.isPaused);
|
| + s3.add(3);
|
| + s3.signalError(new AsyncError("3")); // Error just before a "done".
|
| + s1.add(1);
|
| + s1.signalError(new AsyncError("2")); // Error between events.
|
| + s1.add(4);
|
| + s1.add(6);
|
| + s1.close();
|
| + s2.signalError(new AsyncError("1")); // Error as first event.
|
| + s2.add(2);
|
| + s2.add(5);
|
| + s2.close();
|
| + s3.close();
|
| + actual.onDone(expectAsync0(() {
|
| + Expect.listEquals(expected.events, actual.events);
|
| + }));
|
| + });
|
| +}
|
| +
|
| +main() {
|
| + testSupercedeStream();
|
| + testCyclicStream();
|
| +}
|
|
|