Index: test/util/forkable_stream_test.dart |
diff --git a/test/util/forkable_stream_test.dart b/test/util/forkable_stream_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..c08df9d46b6c90e74539b355c57627c778c796bd |
--- /dev/null |
+++ b/test/util/forkable_stream_test.dart |
@@ -0,0 +1,417 @@ |
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+// TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ |
+// lands. |
+ |
+import 'dart:async'; |
+ |
+import 'package:test/test.dart'; |
+import 'package:test/src/util/forkable_stream.dart'; |
+import 'package:test/src/util/stream_queue.dart'; |
+ |
+void main() { |
+ var controller; |
+ var stream; |
+ setUp(() { |
+ var cancelFuture = new Future.value(42); |
+ controller = new StreamController<int>(onCancel: () => cancelFuture); |
+ stream = new ForkableStream<int>(controller.stream); |
+ }); |
+ |
+ group("with no forks", () { |
+ test("forwards events, errors, and close", () async { |
+ var queue = new StreamQueue(stream); |
+ |
+ controller.add(1); |
+ expect(await queue.next, equals(1)); |
+ |
+ controller.add(2); |
+ expect(await queue.next, equals(2)); |
+ |
+ controller.addError("error"); |
+ expect(queue.next, throwsA("error")); |
+ await flushMicrotasks(); |
+ |
+ controller.add(3); |
+ expect(await queue.next, equals(3)); |
+ |
+ controller.close(); |
+ expect(await queue.hasNext, isFalse); |
+ }); |
+ |
+ test("listens to, pauses, and cancels the controller", () { |
+ expect(controller.hasListener, isFalse); |
+ |
+ var sub = stream.listen(null); |
+ expect(controller.hasListener, isTrue); |
+ |
+ sub.pause(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ sub.resume(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub.cancel(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ test("unpauses the controller when a fork is listened", () { |
+ stream.listen(null).pause(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ var fork = stream.fork(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ fork.listen(null); |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ }); |
+ |
+ group("with a fork created before the stream was listened", () { |
+ var fork; |
+ setUp(() { |
+ fork = stream.fork(); |
+ }); |
+ |
+ test("forwards events, errors, and close to both branches", () async { |
+ var queue = new StreamQueue(stream); |
+ var forkQueue = new StreamQueue(fork); |
+ |
+ controller.add(1); |
+ expect(await queue.next, equals(1)); |
+ expect(await forkQueue.next, equals(1)); |
+ |
+ controller.add(2); |
+ expect(await queue.next, equals(2)); |
+ expect(await forkQueue.next, equals(2)); |
+ |
+ controller.addError("error"); |
+ expect(queue.next, throwsA("error")); |
+ expect(forkQueue.next, throwsA("error")); |
+ await flushMicrotasks(); |
+ |
+ controller.add(3); |
+ expect(await queue.next, equals(3)); |
+ expect(await forkQueue.next, equals(3)); |
+ |
+ controller.close(); |
+ expect(await queue.hasNext, isFalse); |
+ expect(await forkQueue.hasNext, isFalse); |
+ }); |
+ |
+ test('listens to the source when the original is listened', () { |
+ expect(controller.hasListener, isFalse); |
+ stream.listen(null); |
+ expect(controller.hasListener, isTrue); |
+ }); |
+ |
+ test('listens to the source when the fork is listened', () { |
+ expect(controller.hasListener, isFalse); |
+ fork.listen(null); |
+ expect(controller.hasListener, isTrue); |
+ }); |
+ }); |
+ |
+ test("with a fork created after the stream emitted a few events, forwards " |
+ "future events, errors, and close to both branches", () async { |
+ var queue = new StreamQueue(stream); |
+ |
+ controller.add(1); |
+ expect(await queue.next, equals(1)); |
+ |
+ controller.add(2); |
+ expect(await queue.next, equals(2)); |
+ |
+ var fork = stream.fork(); |
+ var forkQueue = new StreamQueue(fork); |
+ |
+ controller.add(3); |
+ expect(await queue.next, equals(3)); |
+ expect(await forkQueue.next, equals(3)); |
+ |
+ controller.addError("error"); |
+ expect(queue.next, throwsA("error")); |
+ expect(forkQueue.next, throwsA("error")); |
+ await flushMicrotasks(); |
+ |
+ controller.close(); |
+ expect(await queue.hasNext, isFalse); |
+ expect(await forkQueue.hasNext, isFalse); |
+ }); |
+ |
+ group("with multiple forks", () { |
+ var fork1; |
+ var fork2; |
+ var fork3; |
+ var fork4; |
+ setUp(() { |
+ fork1 = stream.fork(); |
+ fork2 = stream.fork(); |
+ fork3 = stream.fork(); |
+ fork4 = stream.fork(); |
+ }); |
+ |
+ test("forwards events, errors, and close to all branches", () async { |
+ var queue1 = new StreamQueue(stream); |
+ var queue2 = new StreamQueue(fork1); |
+ var queue3 = new StreamQueue(fork2); |
+ var queue4 = new StreamQueue(fork3); |
+ var queue5 = new StreamQueue(fork4); |
+ |
+ controller.add(1); |
+ expect(await queue1.next, equals(1)); |
+ expect(await queue2.next, equals(1)); |
+ expect(await queue3.next, equals(1)); |
+ expect(await queue4.next, equals(1)); |
+ expect(await queue5.next, equals(1)); |
+ |
+ controller.add(2); |
+ expect(await queue1.next, equals(2)); |
+ expect(await queue2.next, equals(2)); |
+ expect(await queue3.next, equals(2)); |
+ expect(await queue4.next, equals(2)); |
+ expect(await queue5.next, equals(2)); |
+ |
+ controller.addError("error"); |
+ expect(queue1.next, throwsA("error")); |
+ expect(queue2.next, throwsA("error")); |
+ expect(queue3.next, throwsA("error")); |
+ expect(queue4.next, throwsA("error")); |
+ expect(queue5.next, throwsA("error")); |
+ await flushMicrotasks(); |
+ |
+ controller.add(3); |
+ expect(await queue1.next, equals(3)); |
+ expect(await queue2.next, equals(3)); |
+ expect(await queue3.next, equals(3)); |
+ expect(await queue4.next, equals(3)); |
+ expect(await queue5.next, equals(3)); |
+ |
+ controller.close(); |
+ expect(await queue1.hasNext, isFalse); |
+ expect(await queue2.hasNext, isFalse); |
+ expect(await queue3.hasNext, isFalse); |
+ expect(await queue4.hasNext, isFalse); |
+ expect(await queue5.hasNext, isFalse); |
+ }); |
+ |
+ test("forwards events in order of forking", () async { |
+ var queue1 = new StreamQueue(stream); |
+ var queue2 = new StreamQueue(fork1); |
+ var queue3 = new StreamQueue(fork2); |
+ var queue4 = new StreamQueue(fork3); |
+ var queue5 = new StreamQueue(fork4); |
+ |
+ for (var i = 0; i < 4; i++) { |
+ controller.add(i); |
+ |
+ var queue1Fired = false; |
+ var queue2Fired = false; |
+ var queue3Fired = false; |
+ var queue4Fired = false; |
+ var queue5Fired = false; |
+ |
+ queue5.next.then(expectAsync((_) { |
+ queue5Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue2Fired, isTrue); |
+ expect(queue3Fired, isTrue); |
+ expect(queue4Fired, isTrue); |
+ })); |
+ |
+ queue1.next.then(expectAsync((_) { |
+ queue1Fired = true; |
+ expect(queue2Fired, isFalse); |
+ expect(queue3Fired, isFalse); |
+ expect(queue4Fired, isFalse); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ |
+ queue4.next.then(expectAsync((_) { |
+ queue4Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue2Fired, isTrue); |
+ expect(queue3Fired, isTrue); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ |
+ queue2.next.then(expectAsync((_) { |
+ queue2Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue3Fired, isFalse); |
+ expect(queue4Fired, isFalse); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ |
+ queue3.next.then(expectAsync((_) { |
+ queue3Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue2Fired, isTrue); |
+ expect(queue4Fired, isFalse); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ } |
+ }); |
+ |
+ test("pauses the source when all forks are paused and/or not listening", |
+ () { |
+ var sub1 = stream.listen(null); |
+ var sub2 = fork1.listen(null); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub1.pause(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub2.pause(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ var sub3 = fork2.listen(null); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub3.pause(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ sub2.resume(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub2.cancel(); |
+ expect(controller.isPaused, isTrue); |
+ }); |
+ |
+ test("cancels the source when all forks are canceled", () async { |
+ var sub1 = stream.listen(null); |
+ expect(controller.hasListener, isTrue); |
+ |
+ var sub2 = fork1.listen(null); |
+ expect(controller.hasListener, isTrue); |
+ |
+ expect(sub1.cancel(), isNull); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ expect(sub2.cancel(), isNull); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ expect(fork2.listen(null).cancel(), isNull); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ expect(fork3.listen(null).cancel(), isNull); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ expect(fork4.listen(null).cancel(), completion(equals(42))); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ }); |
+ |
+ group("modification during dispatch:", () { |
+ test("forking during onCancel", () { |
+ controller = new StreamController<int>(onCancel: expectAsync(() { |
+ expect(stream.fork().toList(), completion(isEmpty)); |
+ })); |
+ stream = new ForkableStream<int>(controller.stream); |
+ |
+ stream.listen(null).cancel(); |
+ }); |
+ |
+ test("forking during onPause", () { |
+ controller = new StreamController<int>(onPause: expectAsync(() { |
+ stream.fork().listen(null); |
+ })); |
+ stream = new ForkableStream<int>(controller.stream); |
+ |
+ stream.listen(null).pause(); |
+ |
+ // The fork created in onPause should have resumed the stream. |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ |
+ test("forking during onData", () { |
+ var sub; |
+ sub = stream.listen(expectAsync((value1) { |
+ expect(value1, equals(1)); |
+ stream.fork().listen(expectAsync((value2) { |
+ expect(value2, equals(2)); |
+ })); |
+ sub.cancel(); |
+ })); |
+ |
+ controller.add(1); |
+ controller.add(2); |
+ }); |
+ |
+ test("canceling a fork during onData", () { |
+ var fork = stream.fork(); |
+ var forkSub = fork.listen(expectAsync((_) {}, count: 0)); |
+ |
+ stream.listen(expectAsync((_) => forkSub.cancel())); |
+ controller.add(null); |
+ }); |
+ |
+ test("forking during onError", () { |
+ var sub; |
+ sub = stream.listen(null, onError: expectAsync((error1) { |
+ expect(error1, equals("error 1")); |
+ stream.fork().listen(null, onError: expectAsync((error2) { |
+ expect(error2, equals("error 2")); |
+ })); |
+ sub.cancel(); |
+ })); |
+ |
+ controller.addError("error 1"); |
+ controller.addError("error 2"); |
+ }); |
+ |
+ test("canceling a fork during onError", () { |
+ var fork = stream.fork(); |
+ var forkSub = fork.listen(expectAsync((_) {}, count: 0)); |
+ |
+ stream.listen(null, onError: expectAsync((_) => forkSub.cancel())); |
+ controller.addError("error"); |
+ }); |
+ |
+ test("forking during onDone", () { |
+ stream.listen(null, onDone: expectAsync(() { |
+ expect(stream.fork().toList(), completion(isEmpty)); |
+ })); |
+ |
+ controller.close(); |
+ }); |
+ |
+ test("canceling a fork during onDone", () { |
+ var fork = stream.fork(); |
+ var forkSub = fork.listen(null, onDone: expectAsync(() {}, count: 0)); |
+ |
+ stream.listen(null, onDone: expectAsync(() => forkSub.cancel())); |
+ controller.close(); |
+ }); |
+ }); |
+ |
+ group("throws an error when", () { |
+ test("a cancelled stream is forked", () { |
+ stream.listen(null).cancel(); |
+ expect(stream.fork().toList(), completion(isEmpty)); |
+ }); |
+ |
+ test("a cancelled stream is forked even when other forks are alive", () { |
+ stream.fork().listen(null); |
+ stream.listen(null).cancel(); |
+ |
+ expect(controller.hasListener, isTrue); |
+ expect(stream.fork().toList(), completion(isEmpty)); |
+ }); |
+ |
+ test("a closed stream is forked", () async { |
+ controller.close(); |
+ await stream.listen(null).asFuture(); |
+ expect(stream.fork().toList(), completion(isEmpty)); |
+ }); |
+ }); |
+} |
+ |
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |