Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2858)

Unified Diff: test/forkable_stream_test.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: test/forkable_stream_test.dart
diff --git a/test/forkable_stream_test.dart b/test/forkable_stream_test.dart
new file mode 100644
index 0000000000000000000000000000000000000000..7f99401b9389e99e116ce24e3d1b1223ab4a9546
--- /dev/null
+++ b/test/forkable_stream_test.dart
@@ -0,0 +1,413 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ var controller;
+ var stream;
+ setUp(() {
+ var cancelFuture = new Future.value(42);
+ controller = new StreamController<int>(onCancel: () => cancelFuture);
+ stream = new ForkableStream<int>(controller.stream);
+ });
+
+ group("with no forks", () {
+ test("forwards events, errors, and close", () async {
+ var queue = new StreamQueue(stream);
+
+ controller.add(1);
+ expect(await queue.next, equals(1));
+
+ controller.add(2);
+ expect(await queue.next, equals(2));
+
+ controller.addError("error");
+ expect(queue.next, throwsA("error"));
+ await flushMicrotasks();
+
+ controller.add(3);
+ expect(await queue.next, equals(3));
+
+ controller.close();
+ expect(await queue.hasNext, isFalse);
+ });
+
+ test("listens to, pauses, and cancels the controller", () {
+ expect(controller.hasListener, isFalse);
+
+ var sub = stream.listen(null);
+ expect(controller.hasListener, isTrue);
+
+ sub.pause();
+ expect(controller.isPaused, isTrue);
+
+ sub.resume();
+ expect(controller.isPaused, isFalse);
+
+ sub.cancel();
+ expect(controller.hasListener, isFalse);
+ });
+
+ test("unpauses the controller when a fork is listened", () {
+ stream.listen(null).pause();
+ expect(controller.isPaused, isTrue);
+
+ var fork = stream.fork();
+ expect(controller.isPaused, isTrue);
+
+ fork.listen(null);
+ expect(controller.isPaused, isFalse);
+ });
+ });
+
+ group("with a fork created before the stream was listened", () {
+ var fork;
+ setUp(() {
+ fork = stream.fork();
+ });
+
+ test("forwards events, errors, and close to both branches", () async {
+ var queue = new StreamQueue(stream);
+ var forkQueue = new StreamQueue(fork);
+
+ controller.add(1);
+ expect(await queue.next, equals(1));
+ expect(await forkQueue.next, equals(1));
+
+ controller.add(2);
+ expect(await queue.next, equals(2));
+ expect(await forkQueue.next, equals(2));
+
+ controller.addError("error");
+ expect(queue.next, throwsA("error"));
+ expect(forkQueue.next, throwsA("error"));
+ await flushMicrotasks();
+
+ controller.add(3);
+ expect(await queue.next, equals(3));
+ expect(await forkQueue.next, equals(3));
+
+ controller.close();
+ expect(await queue.hasNext, isFalse);
+ expect(await forkQueue.hasNext, isFalse);
+ });
+
+ test('listens to the source when the original is listened', () {
+ expect(controller.hasListener, isFalse);
+ stream.listen(null);
+ expect(controller.hasListener, isTrue);
+ });
+
+ test('listens to the source when the fork is listened', () {
+ expect(controller.hasListener, isFalse);
+ fork.listen(null);
+ expect(controller.hasListener, isTrue);
+ });
+ });
+
+ test("with a fork created after the stream emitted a few events, forwards "
+ "future events, errors, and close to both branches", () async {
+ var queue = new StreamQueue(stream);
+
+ controller.add(1);
+ expect(await queue.next, equals(1));
+
+ controller.add(2);
+ expect(await queue.next, equals(2));
+
+ var fork = stream.fork();
+ var forkQueue = new StreamQueue(fork);
+
+ controller.add(3);
+ expect(await queue.next, equals(3));
+ expect(await forkQueue.next, equals(3));
+
+ controller.addError("error");
+ expect(queue.next, throwsA("error"));
+ expect(forkQueue.next, throwsA("error"));
+ await flushMicrotasks();
+
+ controller.close();
+ expect(await queue.hasNext, isFalse);
+ expect(await forkQueue.hasNext, isFalse);
+ });
+
+ group("with multiple forks", () {
+ var fork1;
+ var fork2;
+ var fork3;
+ var fork4;
+ setUp(() {
+ fork1 = stream.fork();
+ fork2 = stream.fork();
+ fork3 = stream.fork();
+ fork4 = stream.fork();
+ });
+
+ test("forwards events, errors, and close to all branches", () async {
+ var queue1 = new StreamQueue(stream);
+ var queue2 = new StreamQueue(fork1);
+ var queue3 = new StreamQueue(fork2);
+ var queue4 = new StreamQueue(fork3);
+ var queue5 = new StreamQueue(fork4);
+
+ controller.add(1);
+ expect(await queue1.next, equals(1));
+ expect(await queue2.next, equals(1));
+ expect(await queue3.next, equals(1));
+ expect(await queue4.next, equals(1));
+ expect(await queue5.next, equals(1));
+
+ controller.add(2);
+ expect(await queue1.next, equals(2));
+ expect(await queue2.next, equals(2));
+ expect(await queue3.next, equals(2));
+ expect(await queue4.next, equals(2));
+ expect(await queue5.next, equals(2));
+
+ controller.addError("error");
+ expect(queue1.next, throwsA("error"));
+ expect(queue2.next, throwsA("error"));
+ expect(queue3.next, throwsA("error"));
+ expect(queue4.next, throwsA("error"));
+ expect(queue5.next, throwsA("error"));
+ await flushMicrotasks();
+
+ controller.add(3);
+ expect(await queue1.next, equals(3));
+ expect(await queue2.next, equals(3));
+ expect(await queue3.next, equals(3));
+ expect(await queue4.next, equals(3));
+ expect(await queue5.next, equals(3));
+
+ controller.close();
+ expect(await queue1.hasNext, isFalse);
+ expect(await queue2.hasNext, isFalse);
+ expect(await queue3.hasNext, isFalse);
+ expect(await queue4.hasNext, isFalse);
+ expect(await queue5.hasNext, isFalse);
+ });
+
+ test("forwards events in order of forking", () async {
+ var queue1 = new StreamQueue(stream);
+ var queue2 = new StreamQueue(fork1);
+ var queue3 = new StreamQueue(fork2);
+ var queue4 = new StreamQueue(fork3);
+ var queue5 = new StreamQueue(fork4);
+
+ for (var i = 0; i < 4; i++) {
+ controller.add(i);
+
+ var queue1Fired = false;
+ var queue2Fired = false;
+ var queue3Fired = false;
+ var queue4Fired = false;
+ var queue5Fired = false;
+
+ queue5.next.then(expectAsync((_) {
+ queue5Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue2Fired, isTrue);
+ expect(queue3Fired, isTrue);
+ expect(queue4Fired, isTrue);
+ }));
+
+ queue1.next.then(expectAsync((_) {
+ queue1Fired = true;
+ expect(queue2Fired, isFalse);
+ expect(queue3Fired, isFalse);
+ expect(queue4Fired, isFalse);
+ expect(queue5Fired, isFalse);
+ }));
+
+ queue4.next.then(expectAsync((_) {
+ queue4Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue2Fired, isTrue);
+ expect(queue3Fired, isTrue);
+ expect(queue5Fired, isFalse);
+ }));
+
+ queue2.next.then(expectAsync((_) {
+ queue2Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue3Fired, isFalse);
+ expect(queue4Fired, isFalse);
+ expect(queue5Fired, isFalse);
+ }));
+
+ queue3.next.then(expectAsync((_) {
+ queue3Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue2Fired, isTrue);
+ expect(queue4Fired, isFalse);
+ expect(queue5Fired, isFalse);
+ }));
+ }
+ });
+
+ test("pauses the source when all forks are paused and/or not listening",
+ () {
+ var sub1 = stream.listen(null);
+ var sub2 = fork1.listen(null);
+ expect(controller.isPaused, isFalse);
+
+ sub1.pause();
+ expect(controller.isPaused, isFalse);
+
+ sub2.pause();
+ expect(controller.isPaused, isTrue);
+
+ var sub3 = fork2.listen(null);
+ expect(controller.isPaused, isFalse);
+
+ sub3.pause();
+ expect(controller.isPaused, isTrue);
+
+ sub2.resume();
+ expect(controller.isPaused, isFalse);
+
+ sub2.cancel();
+ expect(controller.isPaused, isTrue);
+ });
+
+ test("cancels the source when all forks are canceled", () async {
+ var sub1 = stream.listen(null);
+ expect(controller.hasListener, isTrue);
+
+ var sub2 = fork1.listen(null);
+ expect(controller.hasListener, isTrue);
+
+ expect(sub1.cancel(), isNull);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ expect(sub2.cancel(), isNull);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ expect(fork2.listen(null).cancel(), isNull);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ expect(fork3.listen(null).cancel(), isNull);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ expect(fork4.listen(null).cancel(), completion(equals(42)));
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+ });
+ });
+
+ group("modification during dispatch:", () {
+ test("forking during onCancel", () {
+ controller = new StreamController<int>(onCancel: expectAsync(() {
+ expect(stream.fork, throwsStateError);
+ }));
+ stream = new ForkableStream<int>(controller.stream);
+
+ stream.listen(null).cancel();
+ });
+
+ test("forking during onPause", () {
+ controller = new StreamController<int>(onPause: expectAsync(() {
+ stream.fork().listen(null);
+ }));
+ stream = new ForkableStream<int>(controller.stream);
+
+ stream.listen(null).pause();
+
+ // The fork created in onPause should have resumed the stream.
+ expect(controller.isPaused, isFalse);
+ });
+
+ test("forking during onData", () {
+ var sub;
+ sub = stream.listen(expectAsync((value1) {
+ expect(value1, equals(1));
+ stream.fork().listen(expectAsync((value2) {
+ expect(value2, equals(2));
+ }));
+ sub.cancel();
+ }));
+
+ controller.add(1);
+ controller.add(2);
+ });
+
+ test("canceling a fork during onData", () {
+ var fork = stream.fork();
+ var forkSub = fork.listen(expectAsync((_) {}, count: 0));
+
+ stream.listen(expectAsync((_) => forkSub.cancel()));
+ controller.add(null);
+ });
+
+ test("forking during onError", () {
+ var sub;
+ sub = stream.listen(null, onError: expectAsync((error1) {
+ expect(error1, equals("error 1"));
+ stream.fork().listen(null, onError: expectAsync((error2) {
+ expect(error2, equals("error 2"));
+ }));
+ sub.cancel();
+ }));
+
+ controller.addError("error 1");
+ controller.addError("error 2");
+ });
+
+ test("canceling a fork during onError", () {
+ var fork = stream.fork();
+ var forkSub = fork.listen(expectAsync((_) {}, count: 0));
+
+ stream.listen(null, onError: expectAsync((_) => forkSub.cancel()));
+ controller.addError("error");
+ });
+
+ test("forking during onDone", () {
+ stream.listen(null, onDone: expectAsync(() {
+ expect(stream.fork, throwsStateError);
+ }));
+
+ controller.close();
+ });
+
+ test("canceling a fork during onDone", () {
+ var fork = stream.fork();
+ var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0));
+
+ stream.listen(null, onDone: expectAsync(() => forkSub.cancel()));
+ controller.close();
+ });
+ });
+
+ group("throws an error when", () {
+ test("a cancelled stream is forked", () {
+ stream.listen(null).cancel();
+ expect(stream.fork, throwsStateError);
+ });
+
+ test("a cancelled stream is forked even when other forks are alive", () {
+ stream.fork().listen(null);
+ stream.listen(null).cancel();
+
+ expect(controller.hasListener, isTrue);
+ expect(stream.fork, throwsStateError);
+ });
+
+ test("a closed stream is forked", () async {
+ controller.close();
+ await stream.listen(null).asFuture();
+ expect(stream.fork, throwsStateError);
+ });
+ });
+}

Powered by Google App Engine
This is Rietveld 408576698