Index: packages/async/test/stream_splitter_test.dart |
diff --git a/packages/async/test/stream_splitter_test.dart b/packages/async/test/stream_splitter_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ffd0c87afb2899a0f6decf1ac804e93dcc67e447 |
--- /dev/null |
+++ b/packages/async/test/stream_splitter_test.dart |
@@ -0,0 +1,286 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import 'dart:async'; |
+ |
+import 'package:async/async.dart'; |
+import 'package:test/test.dart'; |
+ |
+main() { |
+ var controller; |
+ var splitter; |
+ setUp(() { |
+ controller = new StreamController<int>(); |
+ splitter = new StreamSplitter<int>(controller.stream); |
+ }); |
+ |
+ test("a branch that's created before the stream starts to replay it", |
+ () async { |
+ var events = []; |
+ var branch = splitter.split(); |
+ splitter.close(); |
+ branch.listen(events.add); |
+ |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(events, equals([1])); |
+ |
+ controller.add(2); |
+ await flushMicrotasks(); |
+ expect(events, equals([1, 2])); |
+ |
+ controller.add(3); |
+ await flushMicrotasks(); |
+ expect(events, equals([1, 2, 3])); |
+ |
+ controller.close(); |
+ }); |
+ |
+ test("a branch replays error events as well as data events", () { |
+ var branch = splitter.split(); |
+ splitter.close(); |
+ |
+ controller.add(1); |
+ controller.addError("error"); |
+ controller.add(3); |
+ controller.close(); |
+ |
+ var count = 0; |
+ branch.listen(expectAsync((value) { |
+ expect(count, anyOf(0, 2)); |
+ expect(value, equals(count + 1)); |
+ count++; |
+ }, count: 2), onError: expectAsync((error) { |
+ expect(count, equals(1)); |
+ expect(error, equals("error")); |
+ count++; |
+ }), onDone: expectAsync(() { |
+ expect(count, equals(3)); |
+ })); |
+ }); |
+ |
+ test("a branch that's created in the middle of a stream replays it", () async { |
+ controller.add(1); |
+ controller.add(2); |
+ await flushMicrotasks(); |
+ |
+ var branch = splitter.split(); |
+ splitter.close(); |
+ |
+ controller.add(3); |
+ controller.add(4); |
+ controller.close(); |
+ |
+ expect(branch.toList(), completion(equals([1, 2, 3, 4]))); |
+ }); |
+ |
+ test("a branch that's created after the stream is finished replays it", |
+ () async { |
+ controller.add(1); |
+ controller.add(2); |
+ controller.add(3); |
+ controller.close(); |
+ await flushMicrotasks(); |
+ |
+ expect(splitter.split().toList(), completion(equals([1, 2, 3]))); |
+ splitter.close(); |
+ }); |
+ |
+ test("creates single-subscription branches", () async { |
+ var branch = splitter.split(); |
+ expect(branch.isBroadcast, isFalse); |
+ branch.listen(null); |
+ expect(() => branch.listen(null), throwsStateError); |
+ expect(() => branch.listen(null), throwsStateError); |
+ }); |
+ |
+ // TODO(nweiz): Test that branches have the correct reified type once Dart |
+ // 1.11 is released. In 1.10, the stream exposed by a StreamController didn't |
+ // have a reified type. |
+ |
+ test("multiple branches each replay the stream", () async { |
+ var branch1 = splitter.split(); |
+ controller.add(1); |
+ controller.add(2); |
+ await flushMicrotasks(); |
+ |
+ var branch2 = splitter.split(); |
+ controller.add(3); |
+ controller.close(); |
+ await flushMicrotasks(); |
+ |
+ var branch3 = splitter.split(); |
+ splitter.close(); |
+ |
+ expect(branch1.toList(), completion(equals([1, 2, 3]))); |
+ expect(branch2.toList(), completion(equals([1, 2, 3]))); |
+ expect(branch3.toList(), completion(equals([1, 2, 3]))); |
+ }); |
+ |
+ test("a branch doesn't close until the source stream closes", () async { |
+ var branch = splitter.split(); |
+ splitter.close(); |
+ |
+ var closed = false; |
+ branch.last.then((_) => closed = true); |
+ |
+ controller.add(1); |
+ controller.add(2); |
+ controller.add(3); |
+ await flushMicrotasks(); |
+ expect(closed, isFalse); |
+ |
+ controller.close(); |
+ await flushMicrotasks(); |
+ expect(closed, isTrue); |
+ }); |
+ |
+ test("the source stream isn't listened to until a branch is", () async { |
+ expect(controller.hasListener, isFalse); |
+ |
+ var branch = splitter.split(); |
+ splitter.close(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ |
+ branch.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ }); |
+ |
+ test("the source stream is paused when all branches are paused", () async { |
+ var branch1 = splitter.split(); |
+ var branch2 = splitter.split(); |
+ var branch3 = splitter.split(); |
+ splitter.close(); |
+ |
+ var subscription1 = branch1.listen(null); |
+ var subscription2 = branch2.listen(null); |
+ var subscription3 = branch3.listen(null); |
+ |
+ subscription1.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ subscription2.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ subscription3.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ subscription2.resume(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ |
+ test("the source stream is paused when all branches are canceled", () async { |
+ var branch1 = splitter.split(); |
+ var branch2 = splitter.split(); |
+ var branch3 = splitter.split(); |
+ |
+ var subscription1 = branch1.listen(null); |
+ var subscription2 = branch2.listen(null); |
+ var subscription3 = branch3.listen(null); |
+ |
+ subscription1.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ subscription2.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ subscription3.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ var branch4 = splitter.split(); |
+ splitter.close(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ branch4.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ |
+ test("the source stream is canceled when it's closed after all branches have " |
+ "been canceled", () async { |
+ var branch1 = splitter.split(); |
+ var branch2 = splitter.split(); |
+ var branch3 = splitter.split(); |
+ |
+ var subscription1 = branch1.listen(null); |
+ var subscription2 = branch2.listen(null); |
+ var subscription3 = branch3.listen(null); |
+ |
+ subscription1.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ subscription2.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ subscription3.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ splitter.close(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("the source stream is canceled when all branches are canceled after it " |
+ "has been closed", () async { |
+ var branch1 = splitter.split(); |
+ var branch2 = splitter.split(); |
+ var branch3 = splitter.split(); |
+ splitter.close(); |
+ |
+ var subscription1 = branch1.listen(null); |
+ var subscription2 = branch2.listen(null); |
+ var subscription3 = branch3.listen(null); |
+ |
+ subscription1.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ subscription2.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ subscription3.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("a splitter that's closed before any branches are added never listens " |
+ "to the source stream", () { |
+ splitter.close(); |
+ |
+ // This would throw an error if the stream had already been listened to. |
+ controller.stream.listen(null); |
+ }); |
+ |
+ test("splitFrom splits a source stream into the designated number of " |
+ "branches", () { |
+ var branches = StreamSplitter.splitFrom(controller.stream, 5); |
+ |
+ controller.add(1); |
+ controller.add(2); |
+ controller.add(3); |
+ controller.close(); |
+ |
+ expect(branches[0].toList(), completion(equals([1, 2, 3]))); |
+ expect(branches[1].toList(), completion(equals([1, 2, 3]))); |
+ expect(branches[2].toList(), completion(equals([1, 2, 3]))); |
+ expect(branches[3].toList(), completion(equals([1, 2, 3]))); |
+ expect(branches[4].toList(), completion(equals([1, 2, 3]))); |
+ }); |
+} |
+ |
+/// Wait for all microtasks to complete. |
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |