Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(691)

Unified Diff: test/stream_queue_test.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« lib/src/stream_queue.dart ('K') | « test/forkable_stream_test.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: test/stream_queue_test.dart
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index cad0ad511669f3f29ca9f44e4e639b31f57a35bd..3768bffce6e2beeeaef27953a368593b257afdbb 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -344,6 +344,14 @@ main() {
expect(() => events.cancel(), throwsStateError);
});
+ test("cancels underlying subscription when called before any event",
+ () async {
+ var cancelFuture = new Future.value(42);
+ var controller = new StreamController(onCancel: () => cancelFuture);
+ var events = new StreamQueue<int>(controller.stream);
+ expect(await events.cancel(), 42);
+ });
+
test("cancels underlying subscription, returns result", () async {
var cancelFuture = new Future.value(42);
var controller = new StreamController(onCancel: () => cancelFuture);
@@ -353,7 +361,7 @@ main() {
expect(await events.cancel(), 42);
});
- group("with immediate: true", () async {
+ group("with immediate: true", () {
test("closes the events, prevents any other operation", () async {
var events = new StreamQueue<int>(createStream());
await events.cancel(immediate: true);
@@ -376,6 +384,15 @@ main() {
await expect(controller.hasListener, isFalse);
});
+ test("cancels the underlying subscription when called before any event",
+ () async {
+ var cancelFuture = new Future.value(42);
+ var controller = new StreamController(onCancel: () => cancelFuture);
+
+ var events = new StreamQueue<int>(controller.stream);
+ expect(await events.cancel(immediate: true), 42);
+ });
+
test("closes pending requests", () async {
var events = new StreamQueue<int>(createStream());
expect(await events.next, 1);
@@ -612,6 +629,434 @@ main() {
});
});
+ group("fork operation", () {
+ test("produces a stream queue with the same events", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ expect(await queue1.next, 1);
+ expect(await queue1.next, 2);
+ expect(await queue1.next, 3);
+ expect(await queue1.next, 4);
+ expect(await queue1.hasNext, isFalse);
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("produces a stream queue with the same errors", () async {
+ var queue1 = new StreamQueue<int>(createErrorStream());
+ var queue2 = queue1.fork();
+
+ expect(await queue1.next, 1);
+ expect(await queue1.next, 2);
+ expect(queue1.next, throwsA("To err is divine!"));
+ expect(await queue1.next, 4);
+ expect(await queue1.hasNext, isFalse);
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(queue2.next, throwsA("To err is divine!"));
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("forks at the current point in the source queue", () {
+ var queue1 = new StreamQueue<int>(createStream());
+
+ expect(queue1.next, completion(1));
+ expect(queue1.next, completion(2));
+
+ var queue2 = queue1.fork();
+
+ expect(queue1.next, completion(3));
+ expect(queue1.next, completion(4));
+ expect(queue1.hasNext, completion(isFalse));
+
+ expect(queue2.next, completion(3));
+ expect(queue2.next, completion(4));
+ expect(queue2.hasNext, completion(isFalse));
+ });
+
+ test("can be created after there are pending values", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ await flushMicrotasks();
+
+ var queue2 = queue1.fork();
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("multiple forks can be created at different points", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+
+ var queue2 = queue1.fork();
+ expect(await queue1.next, 1);
+ expect(await queue2.next, 1);
+
+ var queue3 = queue1.fork();
+ expect(await queue1.next, 2);
+ expect(await queue2.next, 2);
+ expect(await queue3.next, 2);
+
+ var queue4 = queue1.fork();
+ expect(await queue1.next, 3);
+ expect(await queue2.next, 3);
+ expect(await queue3.next, 3);
+ expect(await queue4.next, 3);
+
+ var queue5 = queue1.fork();
+ expect(await queue1.next, 4);
+ expect(await queue2.next, 4);
+ expect(await queue3.next, 4);
+ expect(await queue4.next, 4);
+ expect(await queue5.next, 4);
+
+ var queue6 = queue1.fork();
+ expect(await queue1.hasNext, isFalse);
+ expect(await queue2.hasNext, isFalse);
+ expect(await queue3.hasNext, isFalse);
+ expect(await queue4.hasNext, isFalse);
+ expect(await queue5.hasNext, isFalse);
+ expect(await queue6.hasNext, isFalse);
+ });
+
+ test("same-level forks receive data in the order they were created",
+ () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+ var queue3 = queue1.fork();
+ var queue4 = queue1.fork();
+ var queue5 = queue1.fork();
+
+ for (var i = 0; i < 4; i++) {
+ var queue1Fired = false;
+ var queue2Fired = false;
+ var queue3Fired = false;
+ var queue4Fired = false;
+ var queue5Fired = false;
+
+ queue5.next.then(expectAsync((_) {
+ queue5Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue2Fired, isTrue);
+ expect(queue3Fired, isTrue);
+ expect(queue4Fired, isTrue);
+ }));
+
+ queue1.next.then(expectAsync((_) {
+ queue1Fired = true;
+ expect(queue2Fired, isFalse);
+ expect(queue3Fired, isFalse);
+ expect(queue4Fired, isFalse);
+ expect(queue5Fired, isFalse);
+ }));
+
+ queue4.next.then(expectAsync((_) {
+ queue4Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue2Fired, isTrue);
+ expect(queue3Fired, isTrue);
+ expect(queue5Fired, isFalse);
+ }));
+
+ queue2.next.then(expectAsync((_) {
+ queue2Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue3Fired, isFalse);
+ expect(queue4Fired, isFalse);
+ expect(queue5Fired, isFalse);
+ }));
+
+ queue3.next.then(expectAsync((_) {
+ queue3Fired = true;
+ expect(queue1Fired, isTrue);
+ expect(queue2Fired, isTrue);
+ expect(queue4Fired, isFalse);
+ expect(queue5Fired, isFalse);
+ }));
+ }
+ });
+
+ test("forks can be created from forks", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+
+ var queue2 = queue1.fork();
+ expect(await queue1.next, 1);
+ expect(await queue2.next, 1);
+
+ var queue3 = queue2.fork();
+ expect(await queue1.next, 2);
+ expect(await queue2.next, 2);
+ expect(await queue3.next, 2);
+
+ var queue4 = queue3.fork();
+ expect(await queue1.next, 3);
+ expect(await queue2.next, 3);
+ expect(await queue3.next, 3);
+ expect(await queue4.next, 3);
+
+ var queue5 = queue4.fork();
+ expect(await queue1.next, 4);
+ expect(await queue2.next, 4);
+ expect(await queue3.next, 4);
+ expect(await queue4.next, 4);
+ expect(await queue5.next, 4);
+
+ var queue6 = queue5.fork();
+ expect(await queue1.hasNext, isFalse);
+ expect(await queue2.hasNext, isFalse);
+ expect(await queue3.hasNext, isFalse);
+ expect(await queue4.hasNext, isFalse);
+ expect(await queue5.hasNext, isFalse);
+ expect(await queue6.hasNext, isFalse);
+ });
+
+ group("canceling:", () {
+ test("cancelling a fork doesn't cancel its source", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ queue2.cancel();
+ expect(() => queue2.next, throwsStateError);
+
+ expect(await queue1.next, 1);
+ expect(await queue1.next, 2);
+ expect(await queue1.next, 3);
+ expect(await queue1.next, 4);
+ expect(await queue1.hasNext, isFalse);
+ });
+
+ test("cancelling a source doesn't cancel its unmaterialized fork",
+ () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ queue1.cancel();
+ expect(() => queue1.next, throwsStateError);
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("cancelling a source doesn't cancel its materialized fork",
+ () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ expect(await queue1.next, 1);
+
+ queue1.cancel();
+ expect(() => queue1.next, throwsStateError);
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("the underlying stream is only canceled once all forks are canceled",
+ () async {
+ var controller = new StreamController();
+ var queue1 = new StreamQueue<int>(controller.stream);
+ var queue2 = queue1.fork();
+
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+
+ expect(queue1.next, completion(1));
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ queue2.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ controller.add(1);
+ queue1.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+ });
+
+ group("with immediate,", () {
+ test("cancelling a fork doesn't cancel its source", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ queue2.cancel(immediate: true);
+ expect(() => queue2.next, throwsStateError);
+
+ expect(await queue1.next, 1);
+ expect(await queue1.next, 2);
+ expect(await queue1.next, 3);
+ expect(await queue1.next, 4);
+ expect(await queue1.hasNext, isFalse);
+ });
+
+ test("cancelling a source doesn't cancel its unmaterialized fork",
+ () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ queue1.cancel(immediate: true);
+ expect(() => queue1.next, throwsStateError);
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("cancelling a source doesn't cancel its materialized fork",
+ () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ expect(await queue1.next, 1);
+
+ queue1.cancel(immediate: true);
+ expect(() => queue1.next, throwsStateError);
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("the underlying stream is only canceled once all forks are "
+ "canceled", () async {
+ var controller = new StreamController();
+ var queue1 = new StreamQueue<int>(controller.stream);
+ var queue2 = queue1.fork();
+
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+
+ expect(queue1.next, throwsStateError);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ queue2.cancel(immediate: true);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ queue1.cancel(immediate: true);
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+ });
+ });
+ });
+
+ group("pausing:", () {
+ test("the underlying stream is only implicitly paused when no forks are "
+ "awaiting input", () async {
+ var controller = new StreamController();
+ var queue1 = new StreamQueue<int>(controller.stream);
+ var queue2 = queue1.fork();
+
+ controller.add(1);
+ expect(await queue1.next, 1);
+ expect(controller.hasListener, isTrue);
+ expect(controller.isPaused, isTrue);
+
+ expect(queue1.next, completion(2));
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ controller.add(2);
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ expect(queue2.next, completion(1));
+ expect(queue2.next, completion(2));
+ expect(queue2.next, completion(3));
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ controller.add(3);
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+ });
+
+ test("pausing a fork doesn't pause its source", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ queue2.rest.listen(expectAsync((_) {}, count: 0)).pause();
+
+ expect(await queue1.next, 1);
+ expect(await queue1.next, 2);
+ expect(await queue1.next, 3);
+ expect(await queue1.next, 4);
+ expect(await queue1.hasNext, isFalse);
+ });
+
+ test("pausing a source doesn't pause its fork", () async {
+ var queue1 = new StreamQueue<int>(createStream());
+ var queue2 = queue1.fork();
+
+ queue1.rest.listen(expectAsync((_) {}, count: 0)).pause();
+
+ expect(await queue2.next, 1);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue2.next, 4);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("the underlying stream is only paused when all forks are paused",
+ () async {
+ var controller = new StreamController();
+ var queue1 = new StreamQueue<int>(controller.stream);
+ var queue2 = queue1.fork();
+
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+
+ var sub1 = queue1.rest.listen(null);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+ expect(controller.isPaused, isFalse);
+
+ sub1.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ expect(queue2.next, completion(1));
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ controller.add(1);
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ var sub2 = queue2.rest.listen(null);
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ sub2.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ sub1.resume();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ });
+ });
+ });
+
test("all combinations sequential skip/next/take operations", () async {
// Takes all combinations of two of next, skip and take, then ends with
// doing rest. Each of the first rounds do 10 events of each type,
« lib/src/stream_queue.dart ('K') | « test/forkable_stream_test.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698