| Index: mojo/public/dart/third_party/async/test/stream_group_test.dart
|
| diff --git a/mojo/public/dart/third_party/async/test/stream_group_test.dart b/mojo/public/dart/third_party/async/test/stream_group_test.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..db6c9387136cc4f39c1a94710baf9d80133c376d
|
| --- /dev/null
|
| +++ b/mojo/public/dart/third_party/async/test/stream_group_test.dart
|
| @@ -0,0 +1,723 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +library async.test.stream_group_test;
|
| +
|
| +import 'dart:async';
|
| +
|
| +import 'package:async/async.dart';
|
| +import 'package:test/test.dart';
|
| +
|
| +main() {
|
| + group("single-subscription", () {
|
| + var streamGroup;
|
| + setUp(() {
|
| + streamGroup = new StreamGroup<String>();
|
| + });
|
| +
|
| + test("buffers events from multiple sources", () async {
|
| + var controller1 = new StreamController<String>();
|
| + streamGroup.add(controller1.stream);
|
| + controller1.add("first");
|
| + controller1.close();
|
| +
|
| + var controller2 = new StreamController<String>();
|
| + streamGroup.add(controller2.stream);
|
| + controller2.add("second");
|
| + controller2.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| +
|
| + expect(streamGroup.stream.toList(),
|
| + completion(unorderedEquals(["first", "second"])));
|
| + });
|
| +
|
| + test("buffers errors from multiple sources", () async {
|
| + var controller1 = new StreamController<String>();
|
| + streamGroup.add(controller1.stream);
|
| + controller1.addError("first");
|
| + controller1.close();
|
| +
|
| + var controller2 = new StreamController<String>();
|
| + streamGroup.add(controller2.stream);
|
| + controller2.addError("second");
|
| + controller2.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| +
|
| + var transformed = streamGroup.stream.transform(
|
| + new StreamTransformer.fromHandlers(
|
| + handleError: (error, _, sink) => sink.add("error: $error")));
|
| + expect(transformed.toList(),
|
| + completion(equals(["error: first", "error: second"])));
|
| + });
|
| +
|
| + test("buffers events and errors together", () async {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + controller.add("first");
|
| + controller.addError("second");
|
| + controller.add("third");
|
| + controller.addError("fourth");
|
| + controller.addError("fifth");
|
| + controller.add("sixth");
|
| + controller.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| +
|
| + var transformed = streamGroup.stream.transform(
|
| + new StreamTransformer.fromHandlers(
|
| + handleData: (data, sink) => sink.add("data: $data"),
|
| + handleError: (error, _, sink) => sink.add("error: $error")));
|
| + expect(transformed.toList(), completion(equals([
|
| + "data: first",
|
| + "error: second",
|
| + "data: third",
|
| + "error: fourth",
|
| + "error: fifth",
|
| + "data: sixth"
|
| + ])));
|
| + });
|
| +
|
| + test("emits events once there's a listener", () {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + expect(streamGroup.stream.toList(),
|
| + completion(equals(["first", "second"])));
|
| +
|
| + controller.add("first");
|
| + controller.add("second");
|
| + controller.close();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("doesn't buffer events from a broadcast stream", () async {
|
| + var controller = new StreamController<String>.broadcast();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + controller.add("first");
|
| + controller.add("second");
|
| + controller.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + });
|
| +
|
| + test("when paused, buffers events from a broadcast stream", () async {
|
| + var controller = new StreamController<String>.broadcast();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + var events = [];
|
| + var subscription = streamGroup.stream.listen(events.add);
|
| + subscription.pause();
|
| +
|
| + controller.add("first");
|
| + controller.add("second");
|
| + controller.close();
|
| + await flushMicrotasks();
|
| +
|
| + subscription.resume();
|
| + expect(streamGroup.close(), completes);
|
| + await flushMicrotasks();
|
| +
|
| + expect(events, equals(["first", "second"]));
|
| + });
|
| +
|
| + test("emits events from a broadcast stream once there's a listener", () {
|
| + var controller = new StreamController<String>.broadcast();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + expect(streamGroup.stream.toList(),
|
| + completion(equals(["first", "second"])));
|
| +
|
| + controller.add("first");
|
| + controller.add("second");
|
| + controller.close();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("forwards cancel errors", () async {
|
| + var subscription = streamGroup.stream.listen(null);
|
| +
|
| + var controller = new StreamController<String>(
|
| + onCancel: () => throw "error");
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + expect(subscription.cancel(), throwsA("error"));
|
| + });
|
| +
|
| + test("forwards a cancel future", () async {
|
| + var subscription = streamGroup.stream.listen(null);
|
| +
|
| + var completer = new Completer();
|
| + var controller = new StreamController<String>(
|
| + onCancel: () => completer.future);
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + var fired = false;
|
| + subscription.cancel().then((_) => fired = true);
|
| +
|
| + await flushMicrotasks();
|
| + expect(fired, isFalse);
|
| +
|
| + completer.complete();
|
| + await flushMicrotasks();
|
| + expect(fired, isTrue);
|
| + });
|
| +
|
| + test("add() while active pauses the stream if the group is paused, then "
|
| + "resumes once the group resumes", () async {
|
| + var subscription = streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| +
|
| + var paused = false;
|
| + var controller = new StreamController<String>(
|
| + onPause: () => paused = true,
|
| + onResume: () => paused = false);
|
| +
|
| + subscription.pause();
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| + expect(paused, isTrue);
|
| +
|
| + subscription.resume();
|
| + await flushMicrotasks();
|
| + expect(paused, isFalse);
|
| + });
|
| +
|
| + group("add() while canceled", () {
|
| + setUp(() async {
|
| + streamGroup.stream.listen(null).cancel();
|
| + await flushMicrotasks();
|
| + });
|
| +
|
| + test("immediately listens to and cancels the stream", () async {
|
| + var listened = false;
|
| + var canceled = false;
|
| + var controller = new StreamController<String>(onListen: () {
|
| + listened = true;
|
| + }, onCancel: expectAsync(() {
|
| + expect(listened, isTrue);
|
| + canceled = true;
|
| + }));
|
| +
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| + expect(listened, isTrue);
|
| + expect(canceled, isTrue);
|
| + });
|
| +
|
| + test("forwards cancel errors", () {
|
| + var controller = new StreamController<String>(
|
| + onCancel: () => throw "error");
|
| +
|
| + expect(streamGroup.add(controller.stream), throwsA("error"));
|
| + });
|
| +
|
| + test("forwards a cancel future", () async {
|
| + var completer = new Completer();
|
| + var controller = new StreamController<String>(
|
| + onCancel: () => completer.future);
|
| +
|
| + var fired = false;
|
| + streamGroup.add(controller.stream).then((_) => fired = true);
|
| +
|
| + await flushMicrotasks();
|
| + expect(fired, isFalse);
|
| +
|
| + completer.complete();
|
| + await flushMicrotasks();
|
| + expect(fired, isTrue);
|
| + });
|
| + });
|
| + });
|
| +
|
| + group("broadcast", () {
|
| + var streamGroup;
|
| + setUp(() {
|
| + streamGroup = new StreamGroup<String>.broadcast();
|
| + });
|
| +
|
| + test("buffers events from multiple sources", () async {
|
| + var controller1 = new StreamController<String>();
|
| + streamGroup.add(controller1.stream);
|
| + controller1.add("first");
|
| + controller1.close();
|
| +
|
| + var controller2 = new StreamController<String>();
|
| + streamGroup.add(controller2.stream);
|
| + controller2.add("second");
|
| + controller2.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| +
|
| + expect(streamGroup.stream.toList(),
|
| + completion(equals(["first", "second"])));
|
| + });
|
| +
|
| + test("emits events from multiple sources once there's a listener", () {
|
| + var controller1 = new StreamController<String>();
|
| + streamGroup.add(controller1.stream);
|
| +
|
| + var controller2 = new StreamController<String>();
|
| + streamGroup.add(controller2.stream);
|
| +
|
| + expect(streamGroup.stream.toList(),
|
| + completion(equals(["first", "second"])));
|
| +
|
| + controller1.add("first");
|
| + controller2.add("second");
|
| + controller1.close();
|
| + controller2.close();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("doesn't buffer events once a listener has been added and removed",
|
| + () async {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + streamGroup.stream.listen(null).cancel();
|
| + await flushMicrotasks();
|
| +
|
| + controller.add("first");
|
| + controller.addError("second");
|
| + controller.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + });
|
| +
|
| + test("doesn't buffer events from a broadcast stream", () async {
|
| + var controller = new StreamController<String>.broadcast();
|
| + streamGroup.add(controller.stream);
|
| + controller.add("first");
|
| + controller.addError("second");
|
| + controller.close();
|
| +
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + });
|
| +
|
| + test("emits events from a broadcast stream once there's a listener", () {
|
| + var controller = new StreamController<String>.broadcast();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + expect(streamGroup.stream.toList(),
|
| + completion(equals(["first", "second"])));
|
| +
|
| + controller.add("first");
|
| + controller.add("second");
|
| + controller.close();
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("cancels and re-listens broadcast streams", () async {
|
| + var subscription = streamGroup.stream.listen(null);
|
| +
|
| + var controller = new StreamController<String>.broadcast();
|
| +
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + subscription.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + });
|
| +
|
| + test("never cancels single-subscription streams", () async {
|
| + var subscription = streamGroup.stream.listen(null);
|
| +
|
| + var controller = new StreamController<String>(
|
| + onCancel: expectAsync(() {}, count: 0));
|
| +
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + subscription.cancel();
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| + });
|
| +
|
| + test("drops events from a single-subscription stream while dormant",
|
| + () async {
|
| + var events = [];
|
| + var subscription = streamGroup.stream.listen(events.add);
|
| +
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + controller.add("first");
|
| + await flushMicrotasks();
|
| + expect(events, equals(["first"]));
|
| +
|
| + subscription.cancel();
|
| + controller.add("second");
|
| + await flushMicrotasks();
|
| + expect(events, equals(["first"]));
|
| +
|
| + streamGroup.stream.listen(events.add);
|
| + controller.add("third");
|
| + await flushMicrotasks();
|
| + expect(events, equals(["first", "third"]));
|
| + });
|
| +
|
| + test("a single-subscription stream can be removed while dormant", () async {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.stream.listen(null).cancel();
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.remove(controller.stream);
|
| + expect(controller.hasListener, isFalse);
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + controller.add("first");
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| + });
|
| +
|
| + group("regardless of type", () {
|
| + group("single-subscription", () {
|
| + regardlessOfType(() => new StreamGroup<String>());
|
| + });
|
| +
|
| + group("broadcast", () {
|
| + regardlessOfType(() => new StreamGroup<String>.broadcast());
|
| + });
|
| + });
|
| +
|
| + test("merge() emits events from all components streams", () {
|
| + var controller1 = new StreamController<String>();
|
| + var controller2 = new StreamController<String>();
|
| +
|
| + var merged = StreamGroup.merge([controller1.stream, controller2.stream]);
|
| +
|
| + controller1.add("first");
|
| + controller1.close();
|
| + controller2.add("second");
|
| + controller2.close();
|
| +
|
| + expect(merged.toList(), completion(unorderedEquals(["first", "second"])));
|
| + });
|
| +}
|
| +
|
| +void regardlessOfType(StreamGroup<String> newStreamGroup()) {
|
| + var streamGroup;
|
| + setUp(() {
|
| + streamGroup = newStreamGroup();
|
| + });
|
| +
|
| + group("add()", () {
|
| + group("while dormant", () {
|
| + test("doesn't listen to the stream until the group is listened to",
|
| + () async {
|
| + var controller = new StreamController<String>();
|
| +
|
| + expect(streamGroup.add(controller.stream), isNull);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + });
|
| +
|
| + test("is a no-op if the stream is already in the group", () {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| + streamGroup.add(controller.stream);
|
| + streamGroup.add(controller.stream);
|
| +
|
| + // If the stream was actually listened to multiple times, this would
|
| + // throw a StateError.
|
| + streamGroup.stream.listen(null);
|
| + });
|
| + });
|
| +
|
| + group("while active", () {
|
| + setUp(() async {
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| + });
|
| +
|
| + test("listens to the stream immediately", () async {
|
| + var controller = new StreamController<String>();
|
| +
|
| + expect(streamGroup.add(controller.stream), isNull);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + });
|
| +
|
| + test("is a no-op if the stream is already in the group", () async {
|
| + var controller = new StreamController<String>();
|
| +
|
| + // If the stream were actually listened to more than once, future
|
| + // calls to [add] would throw [StateError]s.
|
| + streamGroup.add(controller.stream);
|
| + streamGroup.add(controller.stream);
|
| + streamGroup.add(controller.stream);
|
| + });
|
| + });
|
| + });
|
| +
|
| + group("remove()", () {
|
| + group("while dormant", () {
|
| + test("stops emitting events for a stream that's removed", () async {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + expect(streamGroup.stream.toList(), completion(equals(["first"])));
|
| +
|
| + controller.add("first");
|
| + await flushMicrotasks();
|
| + controller.add("second");
|
| +
|
| + expect(streamGroup.remove(controller.stream), isNull);
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("is a no-op for an unknown stream", () {
|
| + var controller = new StreamController<String>();
|
| + expect(streamGroup.remove(controller.stream), isNull);
|
| + });
|
| +
|
| + test("and closed closes the group when the last stream is removed",
|
| + () async {
|
| + var controller1 = new StreamController<String>();
|
| + var controller2 = new StreamController<String>();
|
| +
|
| + streamGroup.add(controller1.stream);
|
| + streamGroup.add(controller2.stream);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.close();
|
| +
|
| + streamGroup.remove(controller1.stream);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.remove(controller2.stream);
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + });
|
| + });
|
| +
|
| + group("while listening", () {
|
| + test("doesn't emit events from a removed stream", () {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + // The subscription to [controller.stream] is canceled synchronously, so
|
| + // the first event is dropped even though it was added before the
|
| + // removal. This is documented in [StreamGroup.remove].
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| +
|
| + controller.add("first");
|
| + expect(streamGroup.remove(controller.stream), isNull);
|
| + controller.add("second");
|
| +
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("cancels the stream's subscription", () async {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| +
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + streamGroup.remove(controller.stream);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + test("forwards cancel errors", () async {
|
| + var controller = new StreamController<String>(
|
| + onCancel: () => throw "error");
|
| + streamGroup.add(controller.stream);
|
| +
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.remove(controller.stream), throwsA("error"));
|
| + });
|
| +
|
| + test("forwards cancel futures", () async {
|
| + var completer = new Completer();
|
| + var controller = new StreamController<String>(
|
| + onCancel: () => completer.future);
|
| +
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + var fired = false;
|
| + streamGroup.remove(controller.stream).then((_) => fired = true);
|
| +
|
| + await flushMicrotasks();
|
| + expect(fired, isFalse);
|
| +
|
| + completer.complete();
|
| + await flushMicrotasks();
|
| + expect(fired, isTrue);
|
| + });
|
| +
|
| + test("is a no-op for an unknown stream", () async {
|
| + var controller = new StreamController<String>();
|
| + streamGroup.stream.listen(null);
|
| + await flushMicrotasks();
|
| +
|
| + expect(streamGroup.remove(controller.stream), isNull);
|
| + });
|
| +
|
| + test("and closed closes the group when the last stream is removed",
|
| + () async {
|
| + var done = false;
|
| + streamGroup.stream.listen(null, onDone: () => done = true);
|
| + await flushMicrotasks();
|
| +
|
| + var controller1 = new StreamController<String>();
|
| + var controller2 = new StreamController<String>();
|
| +
|
| + streamGroup.add(controller1.stream);
|
| + streamGroup.add(controller2.stream);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.close();
|
| +
|
| + streamGroup.remove(controller1.stream);
|
| + await flushMicrotasks();
|
| + expect(done, isFalse);
|
| +
|
| + streamGroup.remove(controller2.stream);
|
| + await flushMicrotasks();
|
| + expect(done, isTrue);
|
| + });
|
| + });
|
| + });
|
| +
|
| + group("close()", () {
|
| + group("while dormant", () {
|
| + test("if there are no streams, closes the group", () {
|
| + expect(streamGroup.close(), completes);
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + });
|
| +
|
| + test("if there are streams, closes the group once those streams close "
|
| + "and there's a listener", () async {
|
| + var controller1 = new StreamController<String>();
|
| + var controller2 = new StreamController<String>();
|
| +
|
| + streamGroup.add(controller1.stream);
|
| + streamGroup.add(controller2.stream);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.close();
|
| +
|
| + controller1.close();
|
| + controller2.close();
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + });
|
| + });
|
| +
|
| + group("while active", () {
|
| + test("if there are no streams, closes the group", () {
|
| + expect(streamGroup.stream.toList(), completion(isEmpty));
|
| + expect(streamGroup.close(), completes);
|
| + });
|
| +
|
| + test("if there are streams, closes the group once those streams close",
|
| + () async {
|
| + var done = false;
|
| + streamGroup.stream.listen(null, onDone: () => done = true);
|
| + await flushMicrotasks();
|
| +
|
| + var controller1 = new StreamController<String>();
|
| + var controller2 = new StreamController<String>();
|
| +
|
| + streamGroup.add(controller1.stream);
|
| + streamGroup.add(controller2.stream);
|
| + await flushMicrotasks();
|
| +
|
| + streamGroup.close();
|
| + await flushMicrotasks();
|
| + expect(done, isFalse);
|
| +
|
| + controller1.close();
|
| + await flushMicrotasks();
|
| + expect(done, isFalse);
|
| +
|
| + controller2.close();
|
| + await flushMicrotasks();
|
| + expect(done, isTrue);
|
| + });
|
| + });
|
| +
|
| + test("returns a Future that completes once all events are dispatched",
|
| + () async {
|
| + var events = [];
|
| + streamGroup.stream.listen(events.add);
|
| +
|
| + var controller = new StreamController<String>();
|
| + streamGroup.add(controller.stream);
|
| + await flushMicrotasks();
|
| +
|
| + // Add a bunch of events. Each of these will get dispatched in a
|
| + // separate microtask, so we can test that [close] only completes once
|
| + // all of them have dispatched.
|
| + controller.add("one");
|
| + controller.add("two");
|
| + controller.add("three");
|
| + controller.add("four");
|
| + controller.add("five");
|
| + controller.add("six");
|
| + controller.close();
|
| +
|
| + await streamGroup.close();
|
| + expect(events, equals(["one", "two", "three", "four", "five", "six"]));
|
| + });
|
| + });
|
| +}
|
| +
|
| +/// Wait for all microtasks to complete.
|
| +Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
|
|
|