Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(314)

Unified Diff: tests/lib/async/merge_stream_test.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « tests/lib/async/future_test.dart ('k') | tests/lib/async/slow_consumer2_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: tests/lib/async/merge_stream_test.dart
diff --git a/tests/lib/async/merge_stream_test.dart b/tests/lib/async/merge_stream_test.dart
new file mode 100644
index 0000000000000000000000000000000000000000..1d47cf0ae807827d1b3b18df6d75ce660821f867
--- /dev/null
+++ b/tests/lib/async/merge_stream_test.dart
@@ -0,0 +1,172 @@
+// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// Test merging streams.
+import "dart:async";
+import '../../../pkg/unittest/lib/unittest.dart';
+import 'event_helper.dart';
+
+testSupercedeStream() {
+ { // Simple case of superceding lower priority streams.
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
+ Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close();
+ Events actual = new Events.capture(merge);
+ s1.add(1);
+ s2.add(2);
+ s1.add(1); // Ignored.
+ s2.add(3);
+ s3.add(4);
+ s2.add(3); // Ignored.
+ s3.close();
+ Expect.listEquals(expected.events, actual.events);
+ }
+
+ { // Superceding more than one stream at a time.
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
+ Events expected = new Events()..add(1)..add(2)..close();
+ Events actual = new Events.capture(merge);
+ s1.add(1);
+ s3.add(2);
+ s1.add(1); // Ignored.
+ s2.add(1); // Ignored.
+ s3.close();
+ Expect.listEquals(expected.events, actual.events);
+ }
+
+ { // Closing a stream before superceding it.
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
+ Events expected = new Events()..add(1)..add(2)..add(3)..close();
+ Events actual = new Events.capture(merge);
+ s1.add(1);
+ s1.close();
+ s3.close();
+ s2.add(2);
+ s2.add(3);
+ s2.close();
+ Expect.listEquals(expected.events, actual.events);
+ }
+
+ { // Errors from all non-superceded streams are forwarded.
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
+ Events expected =
+ new Events()..add(1)..error("1")..error("2")..error("3")
+ ..add(3)..error("6")..add(4)..close();
+ Events actual = new Events.capture(merge);
+ s1.add(1);
+ s1.signalError(new AsyncError("1"));
+ s2.signalError(new AsyncError("2"));
+ s3.signalError(new AsyncError("3"));
+ s3.add(3);
+ s1.signalError(new AsyncError("4"));
+ s2.signalError(new AsyncError("5"));
+ s3.signalError(new AsyncError("6"));
+ s1.close();
+ s2.close();
+ s3.add(4);
+ s3.close();
+ Expect.listEquals(expected.events, actual.events);
+ }
+
+ test("Pausing on a superceding stream", () {
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
+ Events expected = new Events()..add(1)..add(2)..add(3);
+ Events actual = new Events.capture(merge);
+ s1.add(1);
+ s2.add(2);
+ s2.add(3);
+ Expect.listEquals(expected.events, actual.events);
+ actual.pause(); // Pauses the stream that feeds the actual Events.
+ Events expected2 = expected.copy();
+ expected..add(5)..add(6)..close();
+ expected2..add(6)..close();
+ s1.add(4);
+ s2.add(5); // May or may not arrive before '6' when resuming.
+ s3.add(6);
+ s3.close();
+ actual.onDone(expectAsync0(() {
+ if (expected.events.length == actual.events.length) {
+ Expect.listEquals(expected.events, actual.events);
+ } else {
+ Expect.listEquals(expected2.events, actual.events);
+ }
+ }));
+ actual.resume();
+ });
+}
+
+void testCyclicStream() {
+ test("Simple case of superceding lower priority streams", () {
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
+ Events expected =
+ new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close();
+ Events actual = new Events.capture(merge);
+ Expect.isFalse(s1.isPaused);
+ Expect.isTrue(s2.isPaused);
+ Expect.isTrue(s3.isPaused);
+ s3.add(3);
+ s1.add(1);
+ s1.add(4);
+ s1.add(6);
+ s1.close();
+ s2.add(2);
+ s2.add(5);
+ s2.close();
+ s3.close();
+ actual.onDone(expectAsync0(() {
+ Expect.listEquals(expected.events, actual.events);
+ }));
+ });
+
+ test("Cyclic merge with errors", () {
+ StreamController s1 = new StreamController();
+ StreamController s2 = new StreamController();
+ StreamController s3 = new StreamController();
+ Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
+ Events expected =
+ new Events()..add(1)..error("1")..add(2)..add(3)..error("2")
+ ..add(4)..add(5)..error("3")..add(6)..close();
+ Events actual = new Events.capture(merge);
+ Expect.isFalse(s1.isPaused);
+ Expect.isTrue(s2.isPaused);
+ Expect.isTrue(s3.isPaused);
+ s3.add(3);
+ s3.signalError(new AsyncError("3")); // Error just before a "done".
+ s1.add(1);
+ s1.signalError(new AsyncError("2")); // Error between events.
+ s1.add(4);
+ s1.add(6);
+ s1.close();
+ s2.signalError(new AsyncError("1")); // Error as first event.
+ s2.add(2);
+ s2.add(5);
+ s2.close();
+ s3.close();
+ actual.onDone(expectAsync0(() {
+ Expect.listEquals(expected.events, actual.events);
+ }));
+ });
+}
+
+main() {
+ testSupercedeStream();
+ testCyclicStream();
+}
« no previous file with comments | « tests/lib/async/future_test.dart ('k') | tests/lib/async/slow_consumer2_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698