Index: test/stream_queue_test.dart |
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart |
index cad0ad511669f3f29ca9f44e4e639b31f57a35bd..3768bffce6e2beeeaef27953a368593b257afdbb 100644 |
--- a/test/stream_queue_test.dart |
+++ b/test/stream_queue_test.dart |
@@ -344,6 +344,14 @@ main() { |
expect(() => events.cancel(), throwsStateError); |
}); |
+ test("cancels underlying subscription when called before any event", |
+ () async { |
+ var cancelFuture = new Future.value(42); |
+ var controller = new StreamController(onCancel: () => cancelFuture); |
+ var events = new StreamQueue<int>(controller.stream); |
+ expect(await events.cancel(), 42); |
+ }); |
+ |
test("cancels underlying subscription, returns result", () async { |
var cancelFuture = new Future.value(42); |
var controller = new StreamController(onCancel: () => cancelFuture); |
@@ -353,7 +361,7 @@ main() { |
expect(await events.cancel(), 42); |
}); |
- group("with immediate: true", () async { |
+ group("with immediate: true", () { |
test("closes the events, prevents any other operation", () async { |
var events = new StreamQueue<int>(createStream()); |
await events.cancel(immediate: true); |
@@ -376,6 +384,15 @@ main() { |
await expect(controller.hasListener, isFalse); |
}); |
+ test("cancels the underlying subscription when called before any event", |
+ () async { |
+ var cancelFuture = new Future.value(42); |
+ var controller = new StreamController(onCancel: () => cancelFuture); |
+ |
+ var events = new StreamQueue<int>(controller.stream); |
+ expect(await events.cancel(immediate: true), 42); |
+ }); |
+ |
test("closes pending requests", () async { |
var events = new StreamQueue<int>(createStream()); |
expect(await events.next, 1); |
@@ -612,6 +629,434 @@ main() { |
}); |
}); |
+ group("fork operation", () { |
+ test("produces a stream queue with the same events", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ expect(await queue1.next, 1); |
+ expect(await queue1.next, 2); |
+ expect(await queue1.next, 3); |
+ expect(await queue1.next, 4); |
+ expect(await queue1.hasNext, isFalse); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("produces a stream queue with the same errors", () async { |
+ var queue1 = new StreamQueue<int>(createErrorStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ expect(await queue1.next, 1); |
+ expect(await queue1.next, 2); |
+ expect(queue1.next, throwsA("To err is divine!")); |
+ expect(await queue1.next, 4); |
+ expect(await queue1.hasNext, isFalse); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(queue2.next, throwsA("To err is divine!")); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("forks at the current point in the source queue", () { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ |
+ expect(queue1.next, completion(1)); |
+ expect(queue1.next, completion(2)); |
+ |
+ var queue2 = queue1.fork(); |
+ |
+ expect(queue1.next, completion(3)); |
+ expect(queue1.next, completion(4)); |
+ expect(queue1.hasNext, completion(isFalse)); |
+ |
+ expect(queue2.next, completion(3)); |
+ expect(queue2.next, completion(4)); |
+ expect(queue2.hasNext, completion(isFalse)); |
+ }); |
+ |
+ test("can be created after there are pending values", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ await flushMicrotasks(); |
+ |
+ var queue2 = queue1.fork(); |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("multiple forks can be created at different points", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ |
+ var queue2 = queue1.fork(); |
+ expect(await queue1.next, 1); |
+ expect(await queue2.next, 1); |
+ |
+ var queue3 = queue1.fork(); |
+ expect(await queue1.next, 2); |
+ expect(await queue2.next, 2); |
+ expect(await queue3.next, 2); |
+ |
+ var queue4 = queue1.fork(); |
+ expect(await queue1.next, 3); |
+ expect(await queue2.next, 3); |
+ expect(await queue3.next, 3); |
+ expect(await queue4.next, 3); |
+ |
+ var queue5 = queue1.fork(); |
+ expect(await queue1.next, 4); |
+ expect(await queue2.next, 4); |
+ expect(await queue3.next, 4); |
+ expect(await queue4.next, 4); |
+ expect(await queue5.next, 4); |
+ |
+ var queue6 = queue1.fork(); |
+ expect(await queue1.hasNext, isFalse); |
+ expect(await queue2.hasNext, isFalse); |
+ expect(await queue3.hasNext, isFalse); |
+ expect(await queue4.hasNext, isFalse); |
+ expect(await queue5.hasNext, isFalse); |
+ expect(await queue6.hasNext, isFalse); |
+ }); |
+ |
+ test("same-level forks receive data in the order they were created", |
+ () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ var queue3 = queue1.fork(); |
+ var queue4 = queue1.fork(); |
+ var queue5 = queue1.fork(); |
+ |
+ for (var i = 0; i < 4; i++) { |
+ var queue1Fired = false; |
+ var queue2Fired = false; |
+ var queue3Fired = false; |
+ var queue4Fired = false; |
+ var queue5Fired = false; |
+ |
+ queue5.next.then(expectAsync((_) { |
+ queue5Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue2Fired, isTrue); |
+ expect(queue3Fired, isTrue); |
+ expect(queue4Fired, isTrue); |
+ })); |
+ |
+ queue1.next.then(expectAsync((_) { |
+ queue1Fired = true; |
+ expect(queue2Fired, isFalse); |
+ expect(queue3Fired, isFalse); |
+ expect(queue4Fired, isFalse); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ |
+ queue4.next.then(expectAsync((_) { |
+ queue4Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue2Fired, isTrue); |
+ expect(queue3Fired, isTrue); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ |
+ queue2.next.then(expectAsync((_) { |
+ queue2Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue3Fired, isFalse); |
+ expect(queue4Fired, isFalse); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ |
+ queue3.next.then(expectAsync((_) { |
+ queue3Fired = true; |
+ expect(queue1Fired, isTrue); |
+ expect(queue2Fired, isTrue); |
+ expect(queue4Fired, isFalse); |
+ expect(queue5Fired, isFalse); |
+ })); |
+ } |
+ }); |
+ |
+ test("forks can be created from forks", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ |
+ var queue2 = queue1.fork(); |
+ expect(await queue1.next, 1); |
+ expect(await queue2.next, 1); |
+ |
+ var queue3 = queue2.fork(); |
+ expect(await queue1.next, 2); |
+ expect(await queue2.next, 2); |
+ expect(await queue3.next, 2); |
+ |
+ var queue4 = queue3.fork(); |
+ expect(await queue1.next, 3); |
+ expect(await queue2.next, 3); |
+ expect(await queue3.next, 3); |
+ expect(await queue4.next, 3); |
+ |
+ var queue5 = queue4.fork(); |
+ expect(await queue1.next, 4); |
+ expect(await queue2.next, 4); |
+ expect(await queue3.next, 4); |
+ expect(await queue4.next, 4); |
+ expect(await queue5.next, 4); |
+ |
+ var queue6 = queue5.fork(); |
+ expect(await queue1.hasNext, isFalse); |
+ expect(await queue2.hasNext, isFalse); |
+ expect(await queue3.hasNext, isFalse); |
+ expect(await queue4.hasNext, isFalse); |
+ expect(await queue5.hasNext, isFalse); |
+ expect(await queue6.hasNext, isFalse); |
+ }); |
+ |
+ group("canceling:", () { |
+ test("cancelling a fork doesn't cancel its source", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ queue2.cancel(); |
+ expect(() => queue2.next, throwsStateError); |
+ |
+ expect(await queue1.next, 1); |
+ expect(await queue1.next, 2); |
+ expect(await queue1.next, 3); |
+ expect(await queue1.next, 4); |
+ expect(await queue1.hasNext, isFalse); |
+ }); |
+ |
+ test("cancelling a source doesn't cancel its unmaterialized fork", |
+ () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ queue1.cancel(); |
+ expect(() => queue1.next, throwsStateError); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("cancelling a source doesn't cancel its materialized fork", |
+ () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ expect(await queue1.next, 1); |
+ |
+ queue1.cancel(); |
+ expect(() => queue1.next, throwsStateError); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("the underlying stream is only canceled once all forks are canceled", |
+ () async { |
+ var controller = new StreamController(); |
+ var queue1 = new StreamQueue<int>(controller.stream); |
+ var queue2 = queue1.fork(); |
+ |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ |
+ expect(queue1.next, completion(1)); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ queue2.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ controller.add(1); |
+ queue1.cancel(); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ |
+ group("with immediate,", () { |
+ test("cancelling a fork doesn't cancel its source", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ queue2.cancel(immediate: true); |
+ expect(() => queue2.next, throwsStateError); |
+ |
+ expect(await queue1.next, 1); |
+ expect(await queue1.next, 2); |
+ expect(await queue1.next, 3); |
+ expect(await queue1.next, 4); |
+ expect(await queue1.hasNext, isFalse); |
+ }); |
+ |
+ test("cancelling a source doesn't cancel its unmaterialized fork", |
+ () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ queue1.cancel(immediate: true); |
+ expect(() => queue1.next, throwsStateError); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("cancelling a source doesn't cancel its materialized fork", |
+ () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ expect(await queue1.next, 1); |
+ |
+ queue1.cancel(immediate: true); |
+ expect(() => queue1.next, throwsStateError); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("the underlying stream is only canceled once all forks are " |
+ "canceled", () async { |
+ var controller = new StreamController(); |
+ var queue1 = new StreamQueue<int>(controller.stream); |
+ var queue2 = queue1.fork(); |
+ |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ |
+ expect(queue1.next, throwsStateError); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ queue2.cancel(immediate: true); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ |
+ queue1.cancel(immediate: true); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ }); |
+ }); |
+ }); |
+ |
+ group("pausing:", () { |
+ test("the underlying stream is only implicitly paused when no forks are " |
+ "awaiting input", () async { |
+ var controller = new StreamController(); |
+ var queue1 = new StreamQueue<int>(controller.stream); |
+ var queue2 = queue1.fork(); |
+ |
+ controller.add(1); |
+ expect(await queue1.next, 1); |
+ expect(controller.hasListener, isTrue); |
+ expect(controller.isPaused, isTrue); |
+ |
+ expect(queue1.next, completion(2)); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ controller.add(2); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ expect(queue2.next, completion(1)); |
+ expect(queue2.next, completion(2)); |
+ expect(queue2.next, completion(3)); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ controller.add(3); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ }); |
+ |
+ test("pausing a fork doesn't pause its source", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ queue2.rest.listen(expectAsync((_) {}, count: 0)).pause(); |
+ |
+ expect(await queue1.next, 1); |
+ expect(await queue1.next, 2); |
+ expect(await queue1.next, 3); |
+ expect(await queue1.next, 4); |
+ expect(await queue1.hasNext, isFalse); |
+ }); |
+ |
+ test("pausing a source doesn't pause its fork", () async { |
+ var queue1 = new StreamQueue<int>(createStream()); |
+ var queue2 = queue1.fork(); |
+ |
+ queue1.rest.listen(expectAsync((_) {}, count: 0)).pause(); |
+ |
+ expect(await queue2.next, 1); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue2.next, 4); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("the underlying stream is only paused when all forks are paused", |
+ () async { |
+ var controller = new StreamController(); |
+ var queue1 = new StreamQueue<int>(controller.stream); |
+ var queue2 = queue1.fork(); |
+ |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isFalse); |
+ |
+ var sub1 = queue1.rest.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.hasListener, isTrue); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub1.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ expect(queue2.next, completion(1)); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ controller.add(1); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ var sub2 = queue2.rest.listen(null); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ |
+ sub2.pause(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isTrue); |
+ |
+ sub1.resume(); |
+ await flushMicrotasks(); |
+ expect(controller.isPaused, isFalse); |
+ }); |
+ }); |
+ }); |
+ |
test("all combinations sequential skip/next/take operations", () async { |
// Takes all combinations of two of next, skip and take, then ends with |
// doing rest. Each of the first rounds do 10 events of each type, |