| Index: mojo/public/dart/third_party/async/test/stream_splitter_test.dart
|
| diff --git a/mojo/public/dart/third_party/async/test/stream_splitter_test.dart b/mojo/public/dart/third_party/async/test/stream_splitter_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..ffd0c87afb2899a0f6decf1ac804e93dcc67e447
|
| --- /dev/null
|
| +++ b/mojo/public/dart/third_party/async/test/stream_splitter_test.dart
|
| @@ -0,0 +1,286 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +import 'dart:async';
|
| +
|
| +import 'package:async/async.dart';
|
| +import 'package:test/test.dart';
|
| +
|
| +main() {
|
| + var controller;
|
| + var splitter;
|
| + setUp(() {
|
| + controller = new StreamController<int>();
|
| + splitter = new StreamSplitter<int>(controller.stream);
|
| + });
|
| +
|
| + test("a branch that's created before the stream starts to replay it",
|
| + () async {
|
| + var events = [];
|
| + var branch = splitter.split();
|
| + splitter.close();
|
| + branch.listen(events.add);
|
| +
|
| + controller.add(1);
|
| + await flushMicrotasks();
|
| + expect(events, equals([1]));
|
| +
|
| + controller.add(2);
|
| + await flushMicrotasks();
|
| + expect(events, equals([1, 2]));
|
| +
|
| + controller.add(3);
|
| + await flushMicrotasks();
|
| + expect(events, equals([1, 2, 3]));
|
| +
|
| + controller.close();
|
| + });
|
| +
|
| + test("a branch replays error events as well as data events", () {
|
| + var branch = splitter.split();
|
| + splitter.close();
|
| +
|
| + controller.add(1);
|
| + controller.addError("error");
|
| + controller.add(3);
|
| + controller.close();
|
| +
|
| + var count = 0;
|
| + branch.listen(expectAsync((value) {
|
| + expect(count, anyOf(0, 2));
|
| + expect(value, equals(count + 1));
|
| + count++;
|
| + }, count: 2), onError: expectAsync((error) {
|
| + expect(count, equals(1));
|
| + expect(error, equals("error"));
|
| + count++;
|
| + }), onDone: expectAsync(() {
|
| + expect(count, equals(3));
|
| + }));
|
| + });
|
| +
|
| + test("a branch that's created in the middle of a stream replays it", () async {
|
| + controller.add(1);
|
| + controller.add(2);
|
| + await flushMicrotasks();
|
| +
|
| + var branch = splitter.split();
|
| + splitter.close();
|
| +
|
| + controller.add(3);
|
| + controller.add(4);
|
| + controller.close();
|
| +
|
| + expect(branch.toList(), completion(equals([1, 2, 3, 4])));
|
| + });
|
| +
|
| + test("a branch that's created after the stream is finished replays it",
|
| + () async {
|
| + controller.add(1);
|
| + controller.add(2);
|
| + controller.add(3);
|
| + controller.close();
|
| + await flushMicrotasks();
|
| +
|
| + expect(splitter.split().toList(), completion(equals([1, 2, 3])));
|
| + splitter.close();
|
| + });
|
| +
|
| + test("creates single-subscription branches", () async {
|
| + var branch = splitter.split();
|
| + expect(branch.isBroadcast, isFalse);
|
| + branch.listen(null);
|
| + expect(() => branch.listen(null), throwsStateError);
|
| + expect(() => branch.listen(null), throwsStateError);
|
| + });
|
| +
|
| + // TODO(nweiz): Test that branches have the correct reified type once Dart
|
| + // 1.11 is released. In 1.10, the stream exposed by a StreamController didn't
|
| + // have a reified type.
|
| +
|
| + test("multiple branches each replay the stream", () async {
|
| + var branch1 = splitter.split();
|
| + controller.add(1);
|
| + controller.add(2);
|
| + await flushMicrotasks();
|
| +
|
| + var branch2 = splitter.split();
|
| + controller.add(3);
|
| + controller.close();
|
| + await flushMicrotasks();
|
| +
|
| + var branch3 = splitter.split();
|
| + splitter.close();
|
| +
|
| + expect(branch1.toList(), completion(equals([1, 2, 3])));
|
| + expect(branch2.toList(), completion(equals([1, 2, 3])));
|
| + expect(branch3.toList(), completion(equals([1, 2, 3])));
|
| + });
|
| +
|
| + test("a branch doesn't close until the source stream closes", () async {
|
| + var branch = splitter.split();
|
| + splitter.close();
|
| +
|
| + var closed = false;
|
| + branch.last.then((_) => closed = true);
|
| +
|
| + controller.add(1);
|
| + controller.add(2);
|
| + controller.add(3);
|
| + await flushMicrotasks();
|
| + expect(closed, isFalse);
|
| +
|
| + controller.close();
|
| + await flushMicrotasks();
|
| + expect(closed, isTrue);
|
| + });
|
| +
|
| + test("the source stream isn't listened to until a branch is", () async {
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + var branch = splitter.split();
|
| + splitter.close();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + branch.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + });
|
| +
|
| + test("the source stream is paused when all branches are paused", () async {
|
| + var branch1 = splitter.split();
|
| + var branch2 = splitter.split();
|
| + var branch3 = splitter.split();
|
| + splitter.close();
|
| +
|
| + var subscription1 = branch1.listen(null);
|
| + var subscription2 = branch2.listen(null);
|
| + var subscription3 = branch3.listen(null);
|
| +
|
| + subscription1.pause();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + subscription2.pause();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + subscription3.pause();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + subscription2.resume();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| + });
|
| +
|
| + test("the source stream is paused when all branches are canceled", () async {
|
| + var branch1 = splitter.split();
|
| + var branch2 = splitter.split();
|
| + var branch3 = splitter.split();
|
| +
|
| + var subscription1 = branch1.listen(null);
|
| + var subscription2 = branch2.listen(null);
|
| + var subscription3 = branch3.listen(null);
|
| +
|
| + subscription1.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + subscription2.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + subscription3.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + var branch4 = splitter.split();
|
| + splitter.close();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + branch4.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| + });
|
| +
|
| + test("the source stream is canceled when it's closed after all branches have "
|
| + "been canceled", () async {
|
| + var branch1 = splitter.split();
|
| + var branch2 = splitter.split();
|
| + var branch3 = splitter.split();
|
| +
|
| + var subscription1 = branch1.listen(null);
|
| + var subscription2 = branch2.listen(null);
|
| + var subscription3 = branch3.listen(null);
|
| +
|
| + subscription1.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + subscription2.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + subscription3.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + splitter.close();
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + test("the source stream is canceled when all branches are canceled after it "
|
| + "has been closed", () async {
|
| + var branch1 = splitter.split();
|
| + var branch2 = splitter.split();
|
| + var branch3 = splitter.split();
|
| + splitter.close();
|
| +
|
| + var subscription1 = branch1.listen(null);
|
| + var subscription2 = branch2.listen(null);
|
| + var subscription3 = branch3.listen(null);
|
| +
|
| + subscription1.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + subscription2.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + subscription3.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + test("a splitter that's closed before any branches are added never listens "
|
| + "to the source stream", () {
|
| + splitter.close();
|
| +
|
| + // This would throw an error if the stream had already been listened to.
|
| + controller.stream.listen(null);
|
| + });
|
| +
|
| + test("splitFrom splits a source stream into the designated number of "
|
| + "branches", () {
|
| + var branches = StreamSplitter.splitFrom(controller.stream, 5);
|
| +
|
| + controller.add(1);
|
| + controller.add(2);
|
| + controller.add(3);
|
| + controller.close();
|
| +
|
| + expect(branches[0].toList(), completion(equals([1, 2, 3])));
|
| + expect(branches[1].toList(), completion(equals([1, 2, 3])));
|
| + expect(branches[2].toList(), completion(equals([1, 2, 3])));
|
| + expect(branches[3].toList(), completion(equals([1, 2, 3])));
|
| + expect(branches[4].toList(), completion(equals([1, 2, 3])));
|
| + });
|
| +}
|
| +
|
| +/// Wait for all microtasks to complete.
|
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
|
|
|