Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(66)

Unified Diff: test/stream_group_test.dart

Issue 1178793006: Add a StreamGroup class for merging streams. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« lib/src/stream_group.dart ('K') | « lib/src/stream_group.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: test/stream_group_test.dart
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
new file mode 100644
index 0000000000000000000000000000000000000000..9fb1e6ad595abacea7af798c63a6d28ec8f95373
--- /dev/null
+++ b/test/stream_group_test.dart
@@ -0,0 +1,656 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.test.stream_group_test;
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+main() {
+ group("single-subscription", () {
+ var streamGroup;
+ setUp(() {
+ streamGroup = new StreamGroup<String>();
+ });
+
+ test("buffers events from multiple sources", () async {
+ var controller1 = new StreamController<String>();
+ streamGroup.add(controller1.stream);
+ controller1.add("first");
+ controller1.close();
+
+ var controller2 = new StreamController<String>();
+ streamGroup.add(controller2.stream);
+ controller2.add("second");
+ controller2.close();
+
+ await new Future.delayed(Duration.ZERO);
Lasse Reichstein Nielsen 2015/06/18 10:29:32 I like having a helper function: flushMicrotasks
nweiz 2015/06/19 00:44:18 Done.
+
+ expect(streamGroup.close(), completes);
+
+ expect(streamGroup.stream.toList(),
+ completion(equals(["first", "second"])));
Lasse Reichstein Nielsen 2015/06/18 10:29:32 This expects the events in a particular order - wh
nweiz 2015/06/19 00:44:17 Done.
+ });
+
+ test("buffers errors from multiple sources", () async {
+ var controller1 = new StreamController<String>();
+ streamGroup.add(controller1.stream);
+ controller1.addError("first");
+ controller1.close();
+
+ var controller2 = new StreamController<String>();
+ streamGroup.add(controller2.stream);
+ controller2.addError("second");
+ controller2.close();
+
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.close(), completes);
+
+ var transformed = streamGroup.stream.transform(
+ new StreamTransformer.fromHandlers(
+ handleError: (error, _, sink) => sink.add("error: $error")));
+ expect(transformed.toList(),
+ completion(equals(["error: first", "error: second"])));
+ });
+
+ test("buffers events and errors together", () async {
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
+
+ controller.add("first");
+ controller.addError("second");
+ controller.add("third");
+ controller.addError("fourth");
+ controller.addError("fifth");
+ controller.add("sixth");
+ controller.close();
+
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.close(), completes);
+
+ var transformed = streamGroup.stream.transform(
+ new StreamTransformer.fromHandlers(
+ handleData: (data, sink) => sink.add("data: $data"),
+ handleError: (error, _, sink) => sink.add("error: $error")));
+ expect(transformed.toList(), completion(equals([
+ "data: first",
+ "error: second",
+ "data: third",
+ "error: fourth",
+ "error: fifth",
+ "data: sixth"
+ ])));
+ });
+
+ test("emits events once there's a listener", () {
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
+
+ expect(streamGroup.stream.toList(),
+ completion(equals(["first", "second"])));
+
+ controller.add("first");
+ controller.add("second");
+ controller.close();
+
+ expect(streamGroup.close(), completes);
+ });
+
+ test("doesn't buffer events from a broadcast stream", () async {
+ var controller = new StreamController<String>.broadcast();
+ streamGroup.add(controller.stream);
+
+ controller.add("first");
+ controller.add("second");
+ controller.close();
+
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.close(), completes);
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ });
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Try this test again after listening to the group a
nweiz 2015/06/19 00:44:18 I've added the test, but the actual behavior is th
+
+ test("emits events from a broadcast stream once there's a listener", () {
+ var controller = new StreamController<String>.broadcast();
+ streamGroup.add(controller.stream);
+
+ expect(streamGroup.stream.toList(),
+ completion(equals(["first", "second"])));
+
+ controller.add("first");
+ controller.add("second");
+ controller.close();
+
+ expect(streamGroup.close(), completes);
+ });
+
+ test("forwards cancel errors", () async {
+ var subscription = streamGroup.stream.listen(null);
+
+ var controller = new StreamController<String>(
+ onCancel: () => throw "error");
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ expect(subscription.cancel(), throwsA("error"));
+ });
+
+ test("forwards a cancel futures", () async {
Lasse Reichstein Nielsen 2015/06/18 10:29:32 a cancel futures -> a cancel future/cancel futures
nweiz 2015/06/19 00:44:17 Done.
+ var subscription = streamGroup.stream.listen(null);
+
+ var completer = new Completer();
+ var controller = new StreamController<String>(
+ onCancel: () => completer.future);
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ var fired = false;
+ subscription.cancel().then((_) => fired = true);
+
+ await new Future.delayed(Duration.ZERO);
+ expect(fired, isFalse);
+
+ completer.complete();
+ await new Future.delayed(Duration.ZERO);
+ expect(fired, isTrue);
+ });
+ });
+
+ group("broadcast", () {
+ var streamGroup;
+ setUp(() {
+ streamGroup = new StreamGroup<String>.broadcast();
+ });
+
+ test("buffers events from multiple sources", () async {
+ var controller1 = new StreamController<String>();
+ streamGroup.add(controller1.stream);
+ controller1.add("first");
+ controller1.close();
+
+ var controller2 = new StreamController<String>();
+ streamGroup.add(controller2.stream);
+ controller2.add("second");
+ controller2.close();
+
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.close(), completes);
+
+ expect(streamGroup.stream.toList(),
+ completion(equals(["first", "second"])));
+ });
+
+ test("emits events from multiple sources once there's a listener", () {
+ var controller1 = new StreamController<String>();
+ streamGroup.add(controller1.stream);
+
+ var controller2 = new StreamController<String>();
+ streamGroup.add(controller2.stream);
+
+ expect(streamGroup.stream.toList(),
+ completion(equals(["first", "second"])));
+
+ controller1.add("first");
+ controller2.add("second");
+ controller1.close();
+ controller2.close();
+
+ expect(streamGroup.close(), completes);
+ });
+
+ test("doesn't buffer events once a listener has been added and removed",
+ () async {
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
+
+ streamGroup.stream.listen(null).cancel();
+ await new Future.delayed(Duration.ZERO);
+
+ controller.add("first");
+ controller.addError("second");
+ controller.close();
+
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.close(), completes);
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ });
+
+ test("doesn't buffer events from a broadcast stream", () async {
+ var controller = new StreamController<String>.broadcast();
+ streamGroup.add(controller.stream);
+ controller.add("first");
+ controller.addError("second");
+ controller.close();
+
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.close(), completes);
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ });
+
+ test("emits events from a broadcast stream once there's a listener", () {
+ var controller = new StreamController<String>.broadcast();
+ streamGroup.add(controller.stream);
+
+ expect(streamGroup.stream.toList(),
+ completion(equals(["first", "second"])));
+
+ controller.add("first");
+ controller.add("second");
+ controller.close();
+
+ expect(streamGroup.close(), completes);
+ });
+
+ test("cancels and re-listens broadcast streams", () async {
+ var subscription = streamGroup.stream.listen(null);
+
+ var listened = false;
+ var controller = new StreamController<String>.broadcast(onListen: () {
+ listened = true;
+ }, onCancel: () {
+ listened = false;
+ });
+
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isTrue);
+
+ subscription.cancel();
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isFalse);
+
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isTrue);
+ });
+
+ test("never cancels single-subscription streams", () async {
+ var subscription = streamGroup.stream.listen(null);
+
+ var controller = new StreamController<String>(
+ onCancel: expectAsync(() {}, count: 0));
+
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ subscription.cancel();
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+ });
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Add a test where you: add single-sub stream (cre
nweiz 2015/06/19 00:44:17 Done.
+ });
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Also test: create broadcast group add single-s
nweiz 2015/06/19 00:44:18 Done.
+
+ group("regardless of type", () {
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Could these tests be run for both broadcast and si
nweiz 2015/06/19 00:44:17 Done.
+ var streamGroup;
+ setUp(() {
+ streamGroup = new StreamGroup<String>();
+ });
+
+ group("add()", () {
+ group("while dormant", () {
+ test("doesn't listen to the stream until the group is listened to",
+ () async {
+ var listened = false;
+ var controller = new StreamController<String>(
+ onListen: () => listened = true);
+
+ expect(streamGroup.add(controller.stream), isNull);
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isFalse);
+
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isTrue);
+ });
+
+ test("is a no-op if the stream is already in the group", () {
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
+ streamGroup.add(controller.stream);
+ streamGroup.add(controller.stream);
+
+ // If the stream was actually listened to multiple times, this would
+ // throw a StateError.
+ streamGroup.stream.listen(null);
+ });
+ });
+
+ group("while active", () {
+ var subscription;
+ setUp(() async {
+ subscription = streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+ });
+
+ test("listens to the stream immediately", () async {
+ var listened = false;
+ var controller = new StreamController<String>(
+ onListen: () => listened = true);
+
+ expect(streamGroup.add(controller.stream), isNull);
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isTrue);
+ });
+
+ test("pauses the stream if the group is paused, then resumes once the "
+ "group resumes", () async {
+ var paused = false;
+ var controller = new StreamController<String>(
+ onPause: () => paused = true,
+ onResume: () => paused = false);
+
+ subscription.pause();
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+ expect(paused, isTrue);
+
+ subscription.resume();
+ await new Future.delayed(Duration.ZERO);
+ expect(paused, isFalse);
+ });
+
+ test("is a no-op if the stream is already in the group", () async {
+ var controller = new StreamController<String>();
+
+ // If the stream were actually listened to more than once, future
+ // calls to [add] would throw [StateError]s.
+ streamGroup.add(controller.stream);
+ streamGroup.add(controller.stream);
+ streamGroup.add(controller.stream);
+ });
+ });
+
+ group("while canceled", () {
+ setUp(() async {
+ streamGroup.stream.listen(null).cancel();
+ await new Future.delayed(Duration.ZERO);
+ });
+
+ test("immediately listens to and cancels the stream", () async {
+ var listened = false;
+ var canceled = false;
+ var controller = new StreamController<String>(onListen: () {
+ listened = true;
+ }, onCancel: expectAsync(() {
+ expect(listened, isTrue);
+ canceled = true;
+ }));
+
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+ expect(listened, isTrue);
+ expect(canceled, isTrue);
+ });
+
+ test("forwards cancel errors", () {
+ var controller = new StreamController<String>(
+ onCancel: () => throw "error");
+
+ expect(streamGroup.add(controller.stream), throwsA("error"));
+ });
+
+ test("forwards a cancel futures", () async {
Lasse Reichstein Nielsen 2015/06/18 10:29:32 a .. futures
nweiz 2015/06/19 00:44:17 Done.
+ var completer = new Completer();
+ var controller = new StreamController<String>(
+ onCancel: () => completer.future);
+
+ var fired = false;
+ streamGroup.add(controller.stream).then((_) => fired = true);
+
+ await new Future.delayed(Duration.ZERO);
+ expect(fired, isFalse);
+
+ completer.complete();
+ await new Future.delayed(Duration.ZERO);
+ expect(fired, isTrue);
+ });
+ });
+ });
+
+ group("remove()", () {
+ group("while dormant", () {
+ test("stops emitting events for a stream that's removed", () {
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Check that prior events do get through. So co
+
+ controller.add("first");
+ expect(streamGroup.remove(controller.stream), isNull);
+
+ expect(streamGroup.close(), completes);
+ expect(streamGroup.stream.toList(), completion(equals(isEmpty)));
+ });
+
+ test("is a no-op for an unknown stream", () {
+ var controller = new StreamController<String>();
+ expect(streamGroup.remove(controller.stream), isNull);
+ });
+
+ test("and closed closes the group when the last stream is removed",
+ () async {
+ var controller1 = new StreamController<String>();
+ var controller2 = new StreamController<String>();
+
+ streamGroup.add(controller1.stream);
+ streamGroup.add(controller2.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.close();
+
+ streamGroup.remove(controller1.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.remove(controller2.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ });
+ });
+
+ group("while listening", () {
+ test("doesn't emit events from a removed stream", () {
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
+
+ // The subscription to [controller.stream] is canceled synchronously, so
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Long line :)
nweiz 2015/06/19 00:44:18 Done.
+ // the first event is dropped even though it was added before the
+ // removal. This is documented in [StreamGroup.remove].
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+
+ controller.add("first");
+ expect(streamGroup.remove(controller.stream), isNull);
+ controller.add("second");
+
+ expect(streamGroup.close(), completes);
+ });
+
+ test("cancels the stream's subscription", () async {
+ var canceled = false;
+ var controller = new StreamController<String>(onCancel: () {
+ canceled = true;
+ });
+ streamGroup.add(controller.stream);
+
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+ expect(canceled, isFalse);
+
+ streamGroup.remove(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+ expect(canceled, isTrue);
+ });
+
+ test("forwards cancel errors", () async {
+ var controller = new StreamController<String>(
+ onCancel: () => throw "error");
+ streamGroup.add(controller.stream);
+
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.remove(controller.stream), throwsA("error"));
+ });
+
+ test("forwards cancel futures", () async {
+ var completer = new Completer();
+ var controller = new StreamController<String>(
+ onCancel: () => completer.future);
+
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ var fired = false;
+ streamGroup.remove(controller.stream).then((_) => fired = true);
+
+ await new Future.delayed(Duration.ZERO);
+ expect(fired, isFalse);
+
+ completer.complete();
+ await new Future.delayed(Duration.ZERO);
+ expect(fired, isTrue);
+ });
+
+ test("is a no-op for an unknown stream", () async {
+ var controller = new StreamController<String>();
+ streamGroup.stream.listen(null);
+ await new Future.delayed(Duration.ZERO);
+
+ expect(streamGroup.remove(controller.stream), isNull);
+ });
+
+ test("and closed closes the group when the last stream is removed",
+ () async {
+ var done = false;
+ streamGroup.stream.listen(null, onDone: () => done = true);
+ await new Future.delayed(Duration.ZERO);
+
+ var controller1 = new StreamController<String>();
+ var controller2 = new StreamController<String>();
+
+ streamGroup.add(controller1.stream);
+ streamGroup.add(controller2.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.close();
+
+ streamGroup.remove(controller1.stream);
+ await new Future.delayed(Duration.ZERO);
+ expect(done, isFalse);
+
+ streamGroup.remove(controller2.stream);
+ await new Future.delayed(Duration.ZERO);
+ expect(done, isTrue);
+ });
+ });
+ });
+
+ group("close()", () {
+ group("while dormant", () {
+ test("if there are no streams, closes the group", () {
+ expect(streamGroup.close(), completes);
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ });
+
+ test("if there are streams, closes the group once those streams close "
+ "and there's a listener", () async {
+ var controller1 = new StreamController<String>();
+ var controller2 = new StreamController<String>();
+
+ streamGroup.add(controller1.stream);
+ streamGroup.add(controller2.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.close();
+
+ controller1.close();
+ controller2.close();
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ });
+ });
+
+ group("while active", () {
+ test("if there are no streams, closes the group", () {
+ expect(streamGroup.stream.toList(), completion(isEmpty));
+ expect(streamGroup.close(), completes);
+ });
+
+ test("if there are streams, closes the group once those streams close",
+ () async {
+ var done = false;
+ streamGroup.stream.listen(null, onDone: () => done = true);
+ await new Future.delayed(Duration.ZERO);
+
+ var controller1 = new StreamController<String>();
+ var controller2 = new StreamController<String>();
+
+ streamGroup.add(controller1.stream);
+ streamGroup.add(controller2.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ streamGroup.close();
+ await new Future.delayed(Duration.ZERO);
+ expect(done, isFalse);
+
+ controller1.close();
+ await new Future.delayed(Duration.ZERO);
+ expect(done, isFalse);
+
+ controller2.close();
+ await new Future.delayed(Duration.ZERO);
+ expect(done, isTrue);
+ });
+ });
+
+ test("returns a Future that completes once all events are dispatched",
+ () async {
+ var events = [];
+ streamGroup.stream.listen(events.add);
+
+ var controller = new StreamController<String>();
+ streamGroup.add(controller.stream);
+ await new Future.delayed(Duration.ZERO);
+
+ // Add a bunch of events. Each of these will get dispatched in a
+ // separate microtask, so we can test that [close] only completes once
+ // all of them have dispatched.
+ controller.add("one");
+ controller.add("two");
+ controller.add("three");
+ controller.add("four");
+ controller.add("five");
+ controller.add("six");
+ controller.close();
+
+ await streamGroup.close();
+ expect(events, equals(["one", "two", "three", "four", "five", "six"]));
+ });
+ });
+ });
+
+ test("merge() emits events from all components streams", () {
+ var controller1 = new StreamController<String>();
+ var controller2 = new StreamController<String>();
+
+ var merged = StreamGroup.merge([controller1.stream, controller2.stream]);
+
+ controller1.add("first");
+ controller1.close();
+ controller2.add("second");
+ controller2.close();
+
+ expect(merged.toList(), completion(equals(["first", "second"])));
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Again, maybe only compare unordered since the orde
nweiz 2015/06/19 00:44:17 Done.
+ });
+}
« lib/src/stream_group.dart ('K') | « lib/src/stream_group.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698