| Index: packages/async/test/stream_queue_test.dart
|
| diff --git a/packages/async/test/stream_queue_test.dart b/packages/async/test/stream_queue_test.dart
|
| index 228ba8adc4fb419a7c3b69d47c8494afb0d43005..2e4805b80754569a4615cf08d3109d6c76203456 100644
|
| --- a/packages/async/test/stream_queue_test.dart
|
| +++ b/packages/async/test/stream_queue_test.dart
|
| @@ -4,7 +4,7 @@
|
|
|
| import "dart:async";
|
|
|
| -import "package:async/async.dart" show StreamQueue;
|
| +import "package:async/async.dart";
|
| import "package:test/test.dart";
|
|
|
| import "utils.dart";
|
| @@ -12,7 +12,7 @@ import "utils.dart";
|
| main() {
|
| group("source stream", () {
|
| test("is listened to on first request, paused between requests", () async {
|
| - var controller = new StreamController();
|
| + var controller = new StreamController<int>();
|
| var events = new StreamQueue<int>(controller.stream);
|
| await flushMicrotasks();
|
| expect(controller.hasListener, isFalse);
|
| @@ -42,6 +42,106 @@ main() {
|
| });
|
| });
|
|
|
| + group("eventsDispatched", () {
|
| + test("increments after a next future completes", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| +
|
| + expect(events.eventsDispatched, equals(0));
|
| + await flushMicrotasks();
|
| + expect(events.eventsDispatched, equals(0));
|
| +
|
| + var next = events.next;
|
| + expect(events.eventsDispatched, equals(0));
|
| +
|
| + await next;
|
| + expect(events.eventsDispatched, equals(1));
|
| +
|
| + await events.next;
|
| + expect(events.eventsDispatched, equals(2));
|
| + });
|
| +
|
| + test("increments multiple times for multi-value requests", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + await events.take(3);
|
| + expect(events.eventsDispatched, equals(3));
|
| + });
|
| +
|
| + test("increments multiple times for an accepted transaction", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + await events.withTransaction((queue) async {
|
| + await queue.next;
|
| + await queue.next;
|
| + return true;
|
| + });
|
| + expect(events.eventsDispatched, equals(2));
|
| + });
|
| +
|
| + test("doesn't increment for rest requests", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + await events.rest.toList();
|
| + expect(events.eventsDispatched, equals(0));
|
| + });
|
| + });
|
| +
|
| + group("lookAhead operation", () {
|
| + test("as simple list of events", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + expect(await events.lookAhead(4), [1, 2, 3, 4]);
|
| + expect(await events.next, 1);
|
| + expect(await events.lookAhead(2), [2, 3]);
|
| + expect(await events.take(2), [2, 3]);
|
| + expect(await events.next, 4);
|
| + await events.cancel();
|
| + });
|
| +
|
| + test("of 0 events", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + expect(events.lookAhead(0), completion([]));
|
| + expect(events.next, completion(1));
|
| + expect(events.lookAhead(0), completion([]));
|
| + expect(events.next, completion(2));
|
| + expect(events.lookAhead(0), completion([]));
|
| + expect(events.next, completion(3));
|
| + expect(events.lookAhead(0), completion([]));
|
| + expect(events.next, completion(4));
|
| + expect(events.lookAhead(0), completion([]));
|
| + expect(events.lookAhead(5), completion([]));
|
| + expect(events.next, throwsStateError);
|
| + await events.cancel();
|
| + });
|
| +
|
| + test("with bad arguments throws", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + expect(() => events.lookAhead(-1), throwsArgumentError);
|
| + expect(await events.next, 1); // Did not consume event.
|
| + expect(() => events.lookAhead(-1), throwsArgumentError);
|
| + expect(await events.next, 2); // Did not consume event.
|
| + await events.cancel();
|
| + });
|
| +
|
| + test("of too many arguments", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + expect(await events.lookAhead(6), [1, 2, 3, 4]);
|
| + await events.cancel();
|
| + });
|
| +
|
| + test("too large later", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + expect(await events.next, 1);
|
| + expect(await events.next, 2);
|
| + expect(await events.lookAhead(6), [3, 4]);
|
| + await events.cancel();
|
| + });
|
| +
|
| + test("error", () async {
|
| + var events = new StreamQueue<int>(createErrorStream());
|
| + expect(events.lookAhead(4), throwsA("To err is divine!"));
|
| + expect(events.take(4), throwsA("To err is divine!"));
|
| + expect(await events.next, 4);
|
| + await events.cancel();
|
| + });
|
| + });
|
| +
|
| group("next operation", () {
|
| test("simple sequence of requests", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| @@ -53,8 +153,8 @@ main() {
|
|
|
| test("multiple requests at the same time", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| - var result = await Future.wait(
|
| - [events.next, events.next, events.next, events.next]);
|
| + var result = await Future
|
| + .wait([events.next, events.next, events.next, events.next]);
|
| expect(result, [1, 2, 3, 4]);
|
| await events.cancel();
|
| });
|
| @@ -83,9 +183,9 @@ main() {
|
| expect(() => events.skip(-1), throwsArgumentError);
|
| // A non-int throws either a type error or an argument error,
|
| // depending on whether it's checked mode or not.
|
| - expect(await events.next, 1); // Did not consume event.
|
| + expect(await events.next, 1); // Did not consume event.
|
| expect(() => events.skip(-1), throwsArgumentError);
|
| - expect(await events.next, 2); // Did not consume event.
|
| + expect(await events.next, 2); // Did not consume event.
|
| await events.cancel();
|
| });
|
|
|
| @@ -150,15 +250,17 @@ main() {
|
| var skip4 = events.skip(1);
|
| var index = 0;
|
| // Check that futures complete in order.
|
| - sequence(expectedValue, sequenceIndex) => (value) {
|
| - expect(value, expectedValue);
|
| - expect(index, sequenceIndex);
|
| - index++;
|
| - };
|
| - await Future.wait([skip1.then(sequence(0, 0)),
|
| - skip2.then(sequence(0, 1)),
|
| - skip3.then(sequence(1, 2)),
|
| - skip4.then(sequence(1, 3))]);
|
| + Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
|
| + expect(value, expectedValue);
|
| + expect(index, sequenceIndex);
|
| + index++;
|
| + };
|
| + await Future.wait([
|
| + skip1.then(sequence(0, 0)),
|
| + skip2.then(sequence(0, 1)),
|
| + skip3.then(sequence(1, 2)),
|
| + skip4.then(sequence(1, 3))
|
| + ]);
|
| await events.cancel();
|
| });
|
| });
|
| @@ -191,9 +293,9 @@ main() {
|
| test("with bad arguments throws", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| expect(() => events.take(-1), throwsArgumentError);
|
| - expect(await events.next, 1); // Did not consume event.
|
| + expect(await events.next, 1); // Did not consume event.
|
| expect(() => events.take(-1), throwsArgumentError);
|
| - expect(await events.next, 2); // Did not consume event.
|
| + expect(await events.next, 2); // Did not consume event.
|
| await events.cancel();
|
| });
|
|
|
| @@ -288,7 +390,7 @@ main() {
|
|
|
| test("forwards to underlying stream", () async {
|
| var cancel = new Completer();
|
| - var controller = new StreamController(onCancel: () => cancel.future);
|
| + var controller = new StreamController<int>(onCancel: () => cancel.future);
|
| var events = new StreamQueue<int>(controller.stream);
|
| expect(controller.hasListener, isFalse);
|
| var next = events.next;
|
| @@ -333,11 +435,46 @@ main() {
|
| });
|
| });
|
|
|
| + group("peek operation", () {
|
| + test("peeks one event", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + expect(await events.peek, 1);
|
| + expect(await events.next, 1);
|
| + expect(await events.peek, 2);
|
| + expect(await events.take(2), [2, 3]);
|
| + expect(await events.peek, 4);
|
| + expect(await events.next, 4);
|
| + // Throws at end.
|
| + expect(events.peek, throws);
|
| + await events.cancel();
|
| + });
|
| + test("multiple requests at the same time", () async {
|
| + var events = new StreamQueue<int>(createStream());
|
| + var result = await Future.wait(
|
| + [events.peek, events.peek, events.next, events.peek, events.peek]);
|
| + expect(result, [1, 1, 1, 2, 2]);
|
| + await events.cancel();
|
| + });
|
| + test("sequence of requests with error", () async {
|
| + var events = new StreamQueue<int>(createErrorStream());
|
| + expect(await events.next, 1);
|
| + expect(await events.next, 2);
|
| + expect(events.peek, throwsA("To err is divine!"));
|
| + // Error stays in queue.
|
| + expect(events.peek, throwsA("To err is divine!"));
|
| + expect(events.next, throwsA("To err is divine!"));
|
| + expect(await events.next, 4);
|
| + await events.cancel();
|
| + });
|
| + });
|
| +
|
| group("cancel operation", () {
|
| test("closes the events, prevents any other operation", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| await events.cancel();
|
| + expect(() => events.lookAhead(1), throwsStateError);
|
| expect(() => events.next, throwsStateError);
|
| + expect(() => events.peek, throwsStateError);
|
| expect(() => events.skip(1), throwsStateError);
|
| expect(() => events.take(1), throwsStateError);
|
| expect(() => events.rest, throwsStateError);
|
| @@ -347,14 +484,14 @@ main() {
|
| test("cancels underlying subscription when called before any event",
|
| () async {
|
| var cancelFuture = new Future.value(42);
|
| - var controller = new StreamController(onCancel: () => cancelFuture);
|
| + var controller = new StreamController<int>(onCancel: () => cancelFuture);
|
| var events = new StreamQueue<int>(controller.stream);
|
| expect(await events.cancel(), 42);
|
| });
|
|
|
| test("cancels underlying subscription, returns result", () async {
|
| var cancelFuture = new Future.value(42);
|
| - var controller = new StreamController(onCancel: () => cancelFuture);
|
| + var controller = new StreamController<int>(onCancel: () => cancelFuture);
|
| var events = new StreamQueue<int>(controller.stream);
|
| controller.add(1);
|
| expect(await events.next, 1);
|
| @@ -373,7 +510,7 @@ main() {
|
| });
|
|
|
| test("cancels the underlying subscription immediately", () async {
|
| - var controller = new StreamController();
|
| + var controller = new StreamController<int>();
|
| controller.add(1);
|
|
|
| var events = new StreamQueue<int>(controller.stream);
|
| @@ -387,7 +524,8 @@ main() {
|
| test("cancels the underlying subscription when called before any event",
|
| () async {
|
| var cancelFuture = new Future.value(42);
|
| - var controller = new StreamController(onCancel: () => cancelFuture);
|
| + var controller =
|
| + new StreamController<int>(onCancel: () => cancelFuture);
|
|
|
| var events = new StreamQueue<int>(controller.stream);
|
| expect(await events.cancel(immediate: true), 42);
|
| @@ -404,8 +542,8 @@ main() {
|
|
|
| test("returns the result of closing the underlying subscription",
|
| () async {
|
| - var controller = new StreamController(
|
| - onCancel: () => new Future.value(42));
|
| + var controller =
|
| + new StreamController<int>(onCancel: () => new Future.value(42));
|
| var events = new StreamQueue<int>(controller.stream);
|
| expect(await events.cancel(immediate: true), 42);
|
| });
|
| @@ -413,8 +551,8 @@ main() {
|
| test("listens and then cancels a stream that hasn't been listened to yet",
|
| () async {
|
| var wasListened = false;
|
| - var controller = new StreamController(
|
| - onListen: () => wasListened = true);
|
| + var controller =
|
| + new StreamController<int>(onListen: () => wasListened = true);
|
| var events = new StreamQueue<int>(controller.stream);
|
| expect(wasListened, isFalse);
|
| expect(controller.hasListener, isFalse);
|
| @@ -448,7 +586,7 @@ main() {
|
|
|
| test("true when enqueued", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| - var values = [];
|
| + var values = <int>[];
|
| for (int i = 1; i <= 3; i++) {
|
| events.next.then(values.add);
|
| }
|
| @@ -459,7 +597,7 @@ main() {
|
|
|
| test("false when enqueued", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| - var values = [];
|
| + var values = <int>[];
|
| for (int i = 1; i <= 4; i++) {
|
| events.next.then(values.add);
|
| }
|
| @@ -469,11 +607,13 @@ main() {
|
| });
|
|
|
| test("true when data event", () async {
|
| - var controller = new StreamController();
|
| + var controller = new StreamController<int>();
|
| var events = new StreamQueue<int>(controller.stream);
|
|
|
| var hasNext;
|
| - events.hasNext.then((result) { hasNext = result; });
|
| + events.hasNext.then((result) {
|
| + hasNext = result;
|
| + });
|
| await flushMicrotasks();
|
| expect(hasNext, isNull);
|
| controller.add(42);
|
| @@ -483,11 +623,13 @@ main() {
|
| });
|
|
|
| test("true when error event", () async {
|
| - var controller = new StreamController();
|
| + var controller = new StreamController<int>();
|
| var events = new StreamQueue<int>(controller.stream);
|
|
|
| var hasNext;
|
| - events.hasNext.then((result) { hasNext = result; });
|
| + events.hasNext.then((result) {
|
| + hasNext = result;
|
| + });
|
| await flushMicrotasks();
|
| expect(hasNext, isNull);
|
| controller.addError("BAD");
|
| @@ -525,7 +667,7 @@ main() {
|
|
|
| test("- next after true, enqueued", () async {
|
| var events = new StreamQueue<int>(createStream());
|
| - var responses = [];
|
| + var responses = <Object>[];
|
| events.next.then(responses.add);
|
| events.hasNext.then(responses.add);
|
| events.next.then(responses.add);
|
| @@ -629,6 +771,313 @@ main() {
|
| });
|
| });
|
|
|
| + group("startTransaction operation produces a transaction that", () {
|
| + StreamQueue<int> events;
|
| + StreamQueueTransaction<int> transaction;
|
| + StreamQueue<int> queue1;
|
| + StreamQueue<int> queue2;
|
| + setUp(() async {
|
| + events = new StreamQueue(createStream());
|
| + expect(await events.next, 1);
|
| + transaction = events.startTransaction();
|
| + queue1 = transaction.newQueue();
|
| + queue2 = transaction.newQueue();
|
| + });
|
| +
|
| + group("emits queues that", () {
|
| + test("independently emit events", () async {
|
| + expect(await queue1.next, 2);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(await queue1.next, 3);
|
| + expect(await queue1.next, 4);
|
| + expect(await queue2.next, 4);
|
| + expect(await queue1.hasNext, isFalse);
|
| + expect(await queue2.hasNext, isFalse);
|
| + });
|
| +
|
| + test("queue requests for events", () async {
|
| + expect(queue1.next, completion(2));
|
| + expect(queue2.next, completion(2));
|
| + expect(queue2.next, completion(3));
|
| + expect(queue1.next, completion(3));
|
| + expect(queue1.next, completion(4));
|
| + expect(queue2.next, completion(4));
|
| + expect(queue1.hasNext, completion(isFalse));
|
| + expect(queue2.hasNext, completion(isFalse));
|
| + });
|
| +
|
| + test("independently emit errors", () async {
|
| + events = new StreamQueue(createErrorStream());
|
| + expect(await events.next, 1);
|
| + transaction = events.startTransaction();
|
| + queue1 = transaction.newQueue();
|
| + queue2 = transaction.newQueue();
|
| +
|
| + expect(queue1.next, completion(2));
|
| + expect(queue2.next, completion(2));
|
| + expect(queue2.next, throwsA("To err is divine!"));
|
| + expect(queue1.next, throwsA("To err is divine!"));
|
| + expect(queue1.next, completion(4));
|
| + expect(queue2.next, completion(4));
|
| + expect(queue1.hasNext, completion(isFalse));
|
| + expect(queue2.hasNext, completion(isFalse));
|
| + });
|
| + });
|
| +
|
| + group("when rejected", () {
|
| + test("further original requests use the previous state", () async {
|
| + expect(await queue1.next, 2);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| +
|
| + await flushMicrotasks();
|
| + transaction.reject();
|
| +
|
| + expect(await events.next, 2);
|
| + expect(await events.next, 3);
|
| + expect(await events.next, 4);
|
| + expect(await events.hasNext, isFalse);
|
| + });
|
| +
|
| + test("pending original requests use the previous state", () async {
|
| + expect(await queue1.next, 2);
|
| + expect(await queue2.next, 2);
|
| + expect(await queue2.next, 3);
|
| + expect(events.next, completion(2));
|
| + expect(events.next, completion(3));
|
| + expect(events.next, completion(4));
|
| + expect(events.hasNext, completion(isFalse));
|
| +
|
| + await flushMicrotasks();
|
| + transaction.reject();
|
| + });
|
| +
|
| + test("further child requests act as though the stream was closed",
|
| + () async {
|
| + expect(await queue1.next, 2);
|
| + transaction.reject();
|
| +
|
| + expect(await queue1.hasNext, isFalse);
|
| + expect(queue1.next, throwsStateError);
|
| + });
|
| +
|
| + test("pending child requests act as though the stream was closed",
|
| + () async {
|
| + expect(await queue1.next, 2);
|
| + expect(queue1.hasNext, completion(isFalse));
|
| + expect(queue1.next, throwsStateError);
|
| + transaction.reject();
|
| + });
|
| +
|
| + // Regression test.
|
| + test("pending child rest requests emit no more events", () async {
|
| + var controller = new StreamController();
|
| + var events = new StreamQueue(controller.stream);
|
| + var transaction = events.startTransaction();
|
| + var queue = transaction.newQueue();
|
| +
|
| + // This should emit no more events after the transaction is rejected.
|
| + queue.rest.listen(expectAsync1((_) {}, count: 3),
|
| + onDone: expectAsync0(() {}, count: 0));
|
| +
|
| + controller.add(1);
|
| + controller.add(2);
|
| + controller.add(3);
|
| + await flushMicrotasks();
|
| +
|
| + transaction.reject();
|
| + await flushMicrotasks();
|
| +
|
| + // These shouldn't affect the result of `queue.rest.toList()`.
|
| + controller.add(4);
|
| + controller.add(5);
|
| + });
|
| +
|
| + test("child requests' cancel() may still be called explicitly", () async {
|
| + transaction.reject();
|
| + await queue1.cancel();
|
| + });
|
| +
|
| + test("calls to commit() or reject() fail", () async {
|
| + transaction.reject();
|
| + expect(transaction.reject, throwsStateError);
|
| + expect(() => transaction.commit(queue1), throwsStateError);
|
| + });
|
| + });
|
| +
|
| + group("when committed,", () {
|
| + test("further original requests use the committed state", () async {
|
| + expect(await queue1.next, 2);
|
| + await flushMicrotasks();
|
| + transaction.commit(queue1);
|
| + expect(await events.next, 3);
|
| + });
|
| +
|
| + test("pending original requests use the committed state", () async {
|
| + expect(await queue1.next, 2);
|
| + expect(events.next, completion(3));
|
| + await flushMicrotasks();
|
| + transaction.commit(queue1);
|
| + });
|
| +
|
| + test("further child requests act as though the stream was closed",
|
| + () async {
|
| + expect(await queue2.next, 2);
|
| + transaction.commit(queue2);
|
| +
|
| + expect(await queue1.hasNext, isFalse);
|
| + expect(queue1.next, throwsStateError);
|
| + });
|
| +
|
| + test("pending child requests act as though the stream was closed",
|
| + () async {
|
| + expect(await queue2.next, 2);
|
| + expect(queue1.hasNext, completion(isFalse));
|
| + expect(queue1.next, throwsStateError);
|
| + transaction.commit(queue2);
|
| + });
|
| +
|
| + test("further requests act as though the stream was closed", () async {
|
| + expect(await queue1.next, 2);
|
| + transaction.commit(queue1);
|
| +
|
| + expect(await queue1.hasNext, isFalse);
|
| + expect(queue1.next, throwsStateError);
|
| + });
|
| +
|
| + test("cancel() may still be called explicitly", () async {
|
| + expect(await queue1.next, 2);
|
| + transaction.commit(queue1);
|
| + await queue1.cancel();
|
| + });
|
| +
|
| + test("throws if there are pending requests", () async {
|
| + expect(await queue1.next, 2);
|
| + expect(queue1.hasNext, completion(isTrue));
|
| + expect(() => transaction.commit(queue1), throwsStateError);
|
| + });
|
| +
|
| + test("calls to commit() or reject() fail", () async {
|
| + transaction.commit(queue1);
|
| + expect(transaction.reject, throwsStateError);
|
| + expect(() => transaction.commit(queue1), throwsStateError);
|
| + });
|
| + });
|
| + });
|
| +
|
| + group("withTransaction operation", () {
|
| + StreamQueue<int> events;
|
| + setUp(() async {
|
| + events = new StreamQueue(createStream());
|
| + expect(await events.next, 1);
|
| + });
|
| +
|
| + test("passes a copy of the parent queue", () async {
|
| + await events.withTransaction(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + expect(await queue.next, 3);
|
| + expect(await queue.next, 4);
|
| + expect(await queue.hasNext, isFalse);
|
| + return true;
|
| + }));
|
| + });
|
| +
|
| + test(
|
| + "the parent queue continues from the child position if it returns "
|
| + "true", () async {
|
| + await events.withTransaction(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + return true;
|
| + }));
|
| +
|
| + expect(await events.next, 3);
|
| + });
|
| +
|
| + test(
|
| + "the parent queue continues from its original position if it returns "
|
| + "false", () async {
|
| + await events.withTransaction(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + return false;
|
| + }));
|
| +
|
| + expect(await events.next, 2);
|
| + });
|
| +
|
| + test("the parent queue continues from the child position if it throws", () {
|
| + expect(events.withTransaction(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + throw "oh no";
|
| + })), throwsA("oh no"));
|
| +
|
| + expect(events.next, completion(3));
|
| + });
|
| +
|
| + test("returns whether the transaction succeeded", () {
|
| + expect(events.withTransaction((_) async => true), completion(isTrue));
|
| + expect(events.withTransaction((_) async => false), completion(isFalse));
|
| + });
|
| + });
|
| +
|
| + group("cancelable operation", () {
|
| + StreamQueue<int> events;
|
| + setUp(() async {
|
| + events = new StreamQueue(createStream());
|
| + expect(await events.next, 1);
|
| + });
|
| +
|
| + test("passes a copy of the parent queue", () async {
|
| + await events.cancelable(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + expect(await queue.next, 3);
|
| + expect(await queue.next, 4);
|
| + expect(await queue.hasNext, isFalse);
|
| + })).value;
|
| + });
|
| +
|
| + test("the parent queue continues from the child position by default",
|
| + () async {
|
| + await events.cancelable(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + })).value;
|
| +
|
| + expect(await events.next, 3);
|
| + });
|
| +
|
| + test(
|
| + "the parent queue continues from the child position if an error is "
|
| + "thrown", () async {
|
| + expect(
|
| + events.cancelable(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + throw "oh no";
|
| + })).value,
|
| + throwsA("oh no"));
|
| +
|
| + expect(events.next, completion(3));
|
| + });
|
| +
|
| + test("the parent queue continues from the original position if canceled",
|
| + () async {
|
| + var operation = events.cancelable(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + }));
|
| + operation.cancel();
|
| +
|
| + expect(await events.next, 2);
|
| + });
|
| +
|
| + test("forwards the value from the callback", () async {
|
| + expect(
|
| + await events.cancelable(expectAsync1((queue) async {
|
| + expect(await queue.next, 2);
|
| + return "value";
|
| + })).value,
|
| + "value");
|
| + });
|
| + });
|
| +
|
| test("all combinations sequential skip/next/take operations", () async {
|
| // Takes all combinations of two of next, skip and take, then ends with
|
| // doing rest. Each of the first rounds do 10 events of each type,
|
| @@ -653,8 +1102,9 @@ main() {
|
| // `take(10)`.
|
| takeTest(startIndex) {
|
| expect(events.take(10),
|
| - completion(new List.generate(10, (i) => startIndex + i)));
|
| + completion(new List.generate(10, (i) => startIndex + i)));
|
| }
|
| +
|
| var tests = [nextTest, skipTest, takeTest];
|
|
|
| int counter = 0;
|
| @@ -668,10 +1118,12 @@ main() {
|
| }
|
| // Then expect 20 more events as a `rest` call.
|
| expect(events.rest.toList(),
|
| - completion(new List.generate(20, (i) => counter + i)));
|
| + completion(new List.generate(20, (i) => counter + i)));
|
| });
|
| }
|
|
|
| +typedef T Func1Required<T>(T value);
|
| +
|
| Stream<int> createStream() async* {
|
| yield 1;
|
| await flushMicrotasks();
|
| @@ -683,7 +1135,7 @@ Stream<int> createStream() async* {
|
| }
|
|
|
| Stream<int> createErrorStream() {
|
| - StreamController controller = new StreamController<int>();
|
| + var controller = new StreamController<int>();
|
| () async {
|
| controller.add(1);
|
| await flushMicrotasks();
|
|
|