| Index: test/stream_queue_test.dart
|
| diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
|
| index cad0ad511669f3f29ca9f44e4e639b31f57a35bd..3768bffce6e2beeeaef27953a368593b257afdbb 100644
|
| --- a/test/stream_queue_test.dart
|
| +++ b/test/stream_queue_test.dart
|
| @@ -344,6 +344,14 @@ main() {
|
| expect(() => events.cancel(), throwsStateError);
|
| });
|
|
|
| + test("cancels underlying subscription when called before any event",
|
| + () async {
|
| + var cancelFuture = new Future.value(42);
|
| + var controller = new StreamController(onCancel: () => cancelFuture);
|
| + var events = new StreamQueue<int>(controller.stream);
|
| + expect(await events.cancel(), 42);
|
| + });
|
| +
|
| test("cancels underlying subscription, returns result", () async {
|
| var cancelFuture = new Future.value(42);
|
| var controller = new StreamController(onCancel: () => cancelFuture);
|
| @@ -353,7 +361,7 @@ main() {
|
| expect(await events.cancel(), 42);
|
| });
|
|
|
| - group("with immediate: true", () async {
|
| + group("with immediate: true", () {
|
| test("closes the events, prevents any other operation", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| await events.cancel(immediate: true);
|
| @@ -376,6 +384,15 @@ main() {
|
| await expect(controller.hasListener, isFalse);
|
| });
|
|
|
| + test("cancels the underlying subscription when called before any event",
|
| + () async {
|
| + var cancelFuture = new Future.value(42);
|
| + var controller = new StreamController(onCancel: () => cancelFuture);
|
| +
|
| + var events = new StreamQueue<int>(controller.stream);
|
| + expect(await events.cancel(immediate: true), 42);
|
| + });
|
| +
|
| test("closes pending requests", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| expect(await events.next, 1);
|
| @@ -612,6 +629,434 @@ main() {
|
| });
|
| });
|
|
|
| + group("fork operation", () {
|
| + test("produces a stream queue with the same events", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + expect(await queue1.next, 1);
|
| + expect(await queue1.next, 2);
|
| + expect(await queue1.next, 3);
|
| + expect(await queue1.next, 4);
|
| + expect(await queue1.hasNext, isFalse);
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("produces a stream queue with the same errors", () async {
|
| + var queue1 = new StreamQueue<int>(createErrorStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + expect(await queue1.next, 1);
|
| + expect(await queue1.next, 2);
|
| + expect(queue1.next, throwsA("To err is divine!"));
|
| + expect(await queue1.next, 4);
|
| + expect(await queue1.hasNext, isFalse);
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(queue2.next, throwsA("To err is divine!"));
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("forks at the current point in the source queue", () {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| +
|
| + expect(queue1.next, completion(1));
|
| + expect(queue1.next, completion(2));
|
| +
|
| + var queue2 = queue1.fork();
|
| +
|
| + expect(queue1.next, completion(3));
|
| + expect(queue1.next, completion(4));
|
| + expect(queue1.hasNext, completion(isFalse));
|
| +
|
| + expect(queue2.next, completion(3));
|
| + expect(queue2.next, completion(4));
|
| + expect(queue2.hasNext, completion(isFalse));
|
| + });
|
| +
|
| + test("can be created after there are pending values", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + await flushMicrotasks();
|
| +
|
| + var queue2 = queue1.fork();
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("multiple forks can be created at different points", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| +
|
| + var queue2 = queue1.fork();
|
| + expect(await queue1.next, 1);
|
| + expect(await queue2.next, 1);
|
| +
|
| + var queue3 = queue1.fork();
|
| + expect(await queue1.next, 2);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue3.next, 2);
|
| +
|
| + var queue4 = queue1.fork();
|
| + expect(await queue1.next, 3);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue3.next, 3);
|
| + expect(await queue4.next, 3);
|
| +
|
| + var queue5 = queue1.fork();
|
| + expect(await queue1.next, 4);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue3.next, 4);
|
| + expect(await queue4.next, 4);
|
| + expect(await queue5.next, 4);
|
| +
|
| + var queue6 = queue1.fork();
|
| + expect(await queue1.hasNext, isFalse);
|
| + expect(await queue2.hasNext, isFalse);
|
| + expect(await queue3.hasNext, isFalse);
|
| + expect(await queue4.hasNext, isFalse);
|
| + expect(await queue5.hasNext, isFalse);
|
| + expect(await queue6.hasNext, isFalse);
|
| + });
|
| +
|
| + test("same-level forks receive data in the order they were created",
|
| + () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| + var queue3 = queue1.fork();
|
| + var queue4 = queue1.fork();
|
| + var queue5 = queue1.fork();
|
| +
|
| + for (var i = 0; i < 4; i++) {
|
| + var queue1Fired = false;
|
| + var queue2Fired = false;
|
| + var queue3Fired = false;
|
| + var queue4Fired = false;
|
| + var queue5Fired = false;
|
| +
|
| + queue5.next.then(expectAsync((_) {
|
| + queue5Fired = true;
|
| + expect(queue1Fired, isTrue);
|
| + expect(queue2Fired, isTrue);
|
| + expect(queue3Fired, isTrue);
|
| + expect(queue4Fired, isTrue);
|
| + }));
|
| +
|
| + queue1.next.then(expectAsync((_) {
|
| + queue1Fired = true;
|
| + expect(queue2Fired, isFalse);
|
| + expect(queue3Fired, isFalse);
|
| + expect(queue4Fired, isFalse);
|
| + expect(queue5Fired, isFalse);
|
| + }));
|
| +
|
| + queue4.next.then(expectAsync((_) {
|
| + queue4Fired = true;
|
| + expect(queue1Fired, isTrue);
|
| + expect(queue2Fired, isTrue);
|
| + expect(queue3Fired, isTrue);
|
| + expect(queue5Fired, isFalse);
|
| + }));
|
| +
|
| + queue2.next.then(expectAsync((_) {
|
| + queue2Fired = true;
|
| + expect(queue1Fired, isTrue);
|
| + expect(queue3Fired, isFalse);
|
| + expect(queue4Fired, isFalse);
|
| + expect(queue5Fired, isFalse);
|
| + }));
|
| +
|
| + queue3.next.then(expectAsync((_) {
|
| + queue3Fired = true;
|
| + expect(queue1Fired, isTrue);
|
| + expect(queue2Fired, isTrue);
|
| + expect(queue4Fired, isFalse);
|
| + expect(queue5Fired, isFalse);
|
| + }));
|
| + }
|
| + });
|
| +
|
| + test("forks can be created from forks", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| +
|
| + var queue2 = queue1.fork();
|
| + expect(await queue1.next, 1);
|
| + expect(await queue2.next, 1);
|
| +
|
| + var queue3 = queue2.fork();
|
| + expect(await queue1.next, 2);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue3.next, 2);
|
| +
|
| + var queue4 = queue3.fork();
|
| + expect(await queue1.next, 3);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue3.next, 3);
|
| + expect(await queue4.next, 3);
|
| +
|
| + var queue5 = queue4.fork();
|
| + expect(await queue1.next, 4);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue3.next, 4);
|
| + expect(await queue4.next, 4);
|
| + expect(await queue5.next, 4);
|
| +
|
| + var queue6 = queue5.fork();
|
| + expect(await queue1.hasNext, isFalse);
|
| + expect(await queue2.hasNext, isFalse);
|
| + expect(await queue3.hasNext, isFalse);
|
| + expect(await queue4.hasNext, isFalse);
|
| + expect(await queue5.hasNext, isFalse);
|
| + expect(await queue6.hasNext, isFalse);
|
| + });
|
| +
|
| + group("canceling:", () {
|
| + test("cancelling a fork doesn't cancel its source", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + queue2.cancel();
|
| + expect(() => queue2.next, throwsStateError);
|
| +
|
| + expect(await queue1.next, 1);
|
| + expect(await queue1.next, 2);
|
| + expect(await queue1.next, 3);
|
| + expect(await queue1.next, 4);
|
| + expect(await queue1.hasNext, isFalse);
|
| + });
|
| +
|
| + test("cancelling a source doesn't cancel its unmaterialized fork",
|
| + () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + queue1.cancel();
|
| + expect(() => queue1.next, throwsStateError);
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("cancelling a source doesn't cancel its materialized fork",
|
| + () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + expect(await queue1.next, 1);
|
| +
|
| + queue1.cancel();
|
| + expect(() => queue1.next, throwsStateError);
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("the underlying stream is only canceled once all forks are canceled",
|
| + () async {
|
| + var controller = new StreamController();
|
| + var queue1 = new StreamQueue<int>(controller.stream);
|
| + var queue2 = queue1.fork();
|
| +
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + expect(queue1.next, completion(1));
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + queue2.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + controller.add(1);
|
| + queue1.cancel();
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| +
|
| + group("with immediate,", () {
|
| + test("cancelling a fork doesn't cancel its source", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + queue2.cancel(immediate: true);
|
| + expect(() => queue2.next, throwsStateError);
|
| +
|
| + expect(await queue1.next, 1);
|
| + expect(await queue1.next, 2);
|
| + expect(await queue1.next, 3);
|
| + expect(await queue1.next, 4);
|
| + expect(await queue1.hasNext, isFalse);
|
| + });
|
| +
|
| + test("cancelling a source doesn't cancel its unmaterialized fork",
|
| + () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + queue1.cancel(immediate: true);
|
| + expect(() => queue1.next, throwsStateError);
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("cancelling a source doesn't cancel its materialized fork",
|
| + () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + expect(await queue1.next, 1);
|
| +
|
| + queue1.cancel(immediate: true);
|
| + expect(() => queue1.next, throwsStateError);
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("the underlying stream is only canceled once all forks are "
|
| + "canceled", () async {
|
| + var controller = new StreamController();
|
| + var queue1 = new StreamQueue<int>(controller.stream);
|
| + var queue2 = queue1.fork();
|
| +
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + expect(queue1.next, throwsStateError);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + queue2.cancel(immediate: true);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| +
|
| + queue1.cancel(immediate: true);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| + });
|
| + });
|
| + });
|
| +
|
| + group("pausing:", () {
|
| + test("the underlying stream is only implicitly paused when no forks are "
|
| + "awaiting input", () async {
|
| + var controller = new StreamController();
|
| + var queue1 = new StreamQueue<int>(controller.stream);
|
| + var queue2 = queue1.fork();
|
| +
|
| + controller.add(1);
|
| + expect(await queue1.next, 1);
|
| + expect(controller.hasListener, isTrue);
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + expect(queue1.next, completion(2));
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + controller.add(2);
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + expect(queue2.next, completion(1));
|
| + expect(queue2.next, completion(2));
|
| + expect(queue2.next, completion(3));
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + controller.add(3);
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| + });
|
| +
|
| + test("pausing a fork doesn't pause its source", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + queue2.rest.listen(expectAsync((_) {}, count: 0)).pause();
|
| +
|
| + expect(await queue1.next, 1);
|
| + expect(await queue1.next, 2);
|
| + expect(await queue1.next, 3);
|
| + expect(await queue1.next, 4);
|
| + expect(await queue1.hasNext, isFalse);
|
| + });
|
| +
|
| + test("pausing a source doesn't pause its fork", () async {
|
| + var queue1 = new StreamQueue<int>(createStream());
|
| + var queue2 = queue1.fork();
|
| +
|
| + queue1.rest.listen(expectAsync((_) {}, count: 0)).pause();
|
| +
|
| + expect(await queue2.next, 1);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("the underlying stream is only paused when all forks are paused",
|
| + () async {
|
| + var controller = new StreamController();
|
| + var queue1 = new StreamQueue<int>(controller.stream);
|
| + var queue2 = queue1.fork();
|
| +
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isFalse);
|
| +
|
| + var sub1 = queue1.rest.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.hasListener, isTrue);
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + sub1.pause();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + expect(queue2.next, completion(1));
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + controller.add(1);
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + var sub2 = queue2.rest.listen(null);
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| +
|
| + sub2.pause();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isTrue);
|
| +
|
| + sub1.resume();
|
| + await flushMicrotasks();
|
| + expect(controller.isPaused, isFalse);
|
| + });
|
| + });
|
| + });
|
| +
|
| test("all combinations sequential skip/next/take operations", () async {
|
| // Takes all combinations of two of next, skip and take, then ends with
|
| // doing rest. Each of the first rounds do 10 events of each type,
|
|
|