Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Unified Diff: packages/async/test/stream_queue_test.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/async/test/stream_group_test.dart ('k') | packages/async/test/stream_sink_completer_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/async/test/stream_queue_test.dart
diff --git a/packages/async/test/stream_queue_test.dart b/packages/async/test/stream_queue_test.dart
index 228ba8adc4fb419a7c3b69d47c8494afb0d43005..2e4805b80754569a4615cf08d3109d6c76203456 100644
--- a/packages/async/test/stream_queue_test.dart
+++ b/packages/async/test/stream_queue_test.dart
@@ -4,7 +4,7 @@
import "dart:async";
-import "package:async/async.dart" show StreamQueue;
+import "package:async/async.dart";
import "package:test/test.dart";
import "utils.dart";
@@ -12,7 +12,7 @@ import "utils.dart";
main() {
group("source stream", () {
test("is listened to on first request, paused between requests", () async {
- var controller = new StreamController();
+ var controller = new StreamController<int>();
var events = new StreamQueue<int>(controller.stream);
await flushMicrotasks();
expect(controller.hasListener, isFalse);
@@ -42,6 +42,106 @@ main() {
});
});
+ group("eventsDispatched", () {
+ test("increments after a next future completes", () async {
+ var events = new StreamQueue<int>(createStream());
+
+ expect(events.eventsDispatched, equals(0));
+ await flushMicrotasks();
+ expect(events.eventsDispatched, equals(0));
+
+ var next = events.next;
+ expect(events.eventsDispatched, equals(0));
+
+ await next;
+ expect(events.eventsDispatched, equals(1));
+
+ await events.next;
+ expect(events.eventsDispatched, equals(2));
+ });
+
+ test("increments multiple times for multi-value requests", () async {
+ var events = new StreamQueue<int>(createStream());
+ await events.take(3);
+ expect(events.eventsDispatched, equals(3));
+ });
+
+ test("increments multiple times for an accepted transaction", () async {
+ var events = new StreamQueue<int>(createStream());
+ await events.withTransaction((queue) async {
+ await queue.next;
+ await queue.next;
+ return true;
+ });
+ expect(events.eventsDispatched, equals(2));
+ });
+
+ test("doesn't increment for rest requests", () async {
+ var events = new StreamQueue<int>(createStream());
+ await events.rest.toList();
+ expect(events.eventsDispatched, equals(0));
+ });
+ });
+
+ group("lookAhead operation", () {
+ test("as simple list of events", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.lookAhead(4), [1, 2, 3, 4]);
+ expect(await events.next, 1);
+ expect(await events.lookAhead(2), [2, 3]);
+ expect(await events.take(2), [2, 3]);
+ expect(await events.next, 4);
+ await events.cancel();
+ });
+
+ test("of 0 events", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(1));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(2));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(3));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(4));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.lookAhead(5), completion([]));
+ expect(events.next, throwsStateError);
+ await events.cancel();
+ });
+
+ test("with bad arguments throws", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(() => events.lookAhead(-1), throwsArgumentError);
+ expect(await events.next, 1); // Did not consume event.
+ expect(() => events.lookAhead(-1), throwsArgumentError);
+ expect(await events.next, 2); // Did not consume event.
+ await events.cancel();
+ });
+
+ test("of too many arguments", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.lookAhead(6), [1, 2, 3, 4]);
+ await events.cancel();
+ });
+
+ test("too large later", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.next, 1);
+ expect(await events.next, 2);
+ expect(await events.lookAhead(6), [3, 4]);
+ await events.cancel();
+ });
+
+ test("error", () async {
+ var events = new StreamQueue<int>(createErrorStream());
+ expect(events.lookAhead(4), throwsA("To err is divine!"));
+ expect(events.take(4), throwsA("To err is divine!"));
+ expect(await events.next, 4);
+ await events.cancel();
+ });
+ });
+
group("next operation", () {
test("simple sequence of requests", () async {
var events = new StreamQueue<int>(createStream());
@@ -53,8 +153,8 @@ main() {
test("multiple requests at the same time", () async {
var events = new StreamQueue<int>(createStream());
- var result = await Future.wait(
- [events.next, events.next, events.next, events.next]);
+ var result = await Future
+ .wait([events.next, events.next, events.next, events.next]);
expect(result, [1, 2, 3, 4]);
await events.cancel();
});
@@ -83,9 +183,9 @@ main() {
expect(() => events.skip(-1), throwsArgumentError);
// A non-int throws either a type error or an argument error,
// depending on whether it's checked mode or not.
- expect(await events.next, 1); // Did not consume event.
+ expect(await events.next, 1); // Did not consume event.
expect(() => events.skip(-1), throwsArgumentError);
- expect(await events.next, 2); // Did not consume event.
+ expect(await events.next, 2); // Did not consume event.
await events.cancel();
});
@@ -150,15 +250,17 @@ main() {
var skip4 = events.skip(1);
var index = 0;
// Check that futures complete in order.
- sequence(expectedValue, sequenceIndex) => (value) {
- expect(value, expectedValue);
- expect(index, sequenceIndex);
- index++;
- };
- await Future.wait([skip1.then(sequence(0, 0)),
- skip2.then(sequence(0, 1)),
- skip3.then(sequence(1, 2)),
- skip4.then(sequence(1, 3))]);
+ Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
+ expect(value, expectedValue);
+ expect(index, sequenceIndex);
+ index++;
+ };
+ await Future.wait([
+ skip1.then(sequence(0, 0)),
+ skip2.then(sequence(0, 1)),
+ skip3.then(sequence(1, 2)),
+ skip4.then(sequence(1, 3))
+ ]);
await events.cancel();
});
});
@@ -191,9 +293,9 @@ main() {
test("with bad arguments throws", () async {
var events = new StreamQueue<int>(createStream());
expect(() => events.take(-1), throwsArgumentError);
- expect(await events.next, 1); // Did not consume event.
+ expect(await events.next, 1); // Did not consume event.
expect(() => events.take(-1), throwsArgumentError);
- expect(await events.next, 2); // Did not consume event.
+ expect(await events.next, 2); // Did not consume event.
await events.cancel();
});
@@ -288,7 +390,7 @@ main() {
test("forwards to underlying stream", () async {
var cancel = new Completer();
- var controller = new StreamController(onCancel: () => cancel.future);
+ var controller = new StreamController<int>(onCancel: () => cancel.future);
var events = new StreamQueue<int>(controller.stream);
expect(controller.hasListener, isFalse);
var next = events.next;
@@ -333,11 +435,46 @@ main() {
});
});
+ group("peek operation", () {
+ test("peeks one event", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.peek, 1);
+ expect(await events.next, 1);
+ expect(await events.peek, 2);
+ expect(await events.take(2), [2, 3]);
+ expect(await events.peek, 4);
+ expect(await events.next, 4);
+ // Throws at end.
+ expect(events.peek, throws);
+ await events.cancel();
+ });
+ test("multiple requests at the same time", () async {
+ var events = new StreamQueue<int>(createStream());
+ var result = await Future.wait(
+ [events.peek, events.peek, events.next, events.peek, events.peek]);
+ expect(result, [1, 1, 1, 2, 2]);
+ await events.cancel();
+ });
+ test("sequence of requests with error", () async {
+ var events = new StreamQueue<int>(createErrorStream());
+ expect(await events.next, 1);
+ expect(await events.next, 2);
+ expect(events.peek, throwsA("To err is divine!"));
+ // Error stays in queue.
+ expect(events.peek, throwsA("To err is divine!"));
+ expect(events.next, throwsA("To err is divine!"));
+ expect(await events.next, 4);
+ await events.cancel();
+ });
+ });
+
group("cancel operation", () {
test("closes the events, prevents any other operation", () async {
var events = new StreamQueue<int>(createStream());
await events.cancel();
+ expect(() => events.lookAhead(1), throwsStateError);
expect(() => events.next, throwsStateError);
+ expect(() => events.peek, throwsStateError);
expect(() => events.skip(1), throwsStateError);
expect(() => events.take(1), throwsStateError);
expect(() => events.rest, throwsStateError);
@@ -347,14 +484,14 @@ main() {
test("cancels underlying subscription when called before any event",
() async {
var cancelFuture = new Future.value(42);
- var controller = new StreamController(onCancel: () => cancelFuture);
+ var controller = new StreamController<int>(onCancel: () => cancelFuture);
var events = new StreamQueue<int>(controller.stream);
expect(await events.cancel(), 42);
});
test("cancels underlying subscription, returns result", () async {
var cancelFuture = new Future.value(42);
- var controller = new StreamController(onCancel: () => cancelFuture);
+ var controller = new StreamController<int>(onCancel: () => cancelFuture);
var events = new StreamQueue<int>(controller.stream);
controller.add(1);
expect(await events.next, 1);
@@ -373,7 +510,7 @@ main() {
});
test("cancels the underlying subscription immediately", () async {
- var controller = new StreamController();
+ var controller = new StreamController<int>();
controller.add(1);
var events = new StreamQueue<int>(controller.stream);
@@ -387,7 +524,8 @@ main() {
test("cancels the underlying subscription when called before any event",
() async {
var cancelFuture = new Future.value(42);
- var controller = new StreamController(onCancel: () => cancelFuture);
+ var controller =
+ new StreamController<int>(onCancel: () => cancelFuture);
var events = new StreamQueue<int>(controller.stream);
expect(await events.cancel(immediate: true), 42);
@@ -404,8 +542,8 @@ main() {
test("returns the result of closing the underlying subscription",
() async {
- var controller = new StreamController(
- onCancel: () => new Future.value(42));
+ var controller =
+ new StreamController<int>(onCancel: () => new Future.value(42));
var events = new StreamQueue<int>(controller.stream);
expect(await events.cancel(immediate: true), 42);
});
@@ -413,8 +551,8 @@ main() {
test("listens and then cancels a stream that hasn't been listened to yet",
() async {
var wasListened = false;
- var controller = new StreamController(
- onListen: () => wasListened = true);
+ var controller =
+ new StreamController<int>(onListen: () => wasListened = true);
var events = new StreamQueue<int>(controller.stream);
expect(wasListened, isFalse);
expect(controller.hasListener, isFalse);
@@ -448,7 +586,7 @@ main() {
test("true when enqueued", () async {
var events = new StreamQueue<int>(createStream());
- var values = [];
+ var values = <int>[];
for (int i = 1; i <= 3; i++) {
events.next.then(values.add);
}
@@ -459,7 +597,7 @@ main() {
test("false when enqueued", () async {
var events = new StreamQueue<int>(createStream());
- var values = [];
+ var values = <int>[];
for (int i = 1; i <= 4; i++) {
events.next.then(values.add);
}
@@ -469,11 +607,13 @@ main() {
});
test("true when data event", () async {
- var controller = new StreamController();
+ var controller = new StreamController<int>();
var events = new StreamQueue<int>(controller.stream);
var hasNext;
- events.hasNext.then((result) { hasNext = result; });
+ events.hasNext.then((result) {
+ hasNext = result;
+ });
await flushMicrotasks();
expect(hasNext, isNull);
controller.add(42);
@@ -483,11 +623,13 @@ main() {
});
test("true when error event", () async {
- var controller = new StreamController();
+ var controller = new StreamController<int>();
var events = new StreamQueue<int>(controller.stream);
var hasNext;
- events.hasNext.then((result) { hasNext = result; });
+ events.hasNext.then((result) {
+ hasNext = result;
+ });
await flushMicrotasks();
expect(hasNext, isNull);
controller.addError("BAD");
@@ -525,7 +667,7 @@ main() {
test("- next after true, enqueued", () async {
var events = new StreamQueue<int>(createStream());
- var responses = [];
+ var responses = <Object>[];
events.next.then(responses.add);
events.hasNext.then(responses.add);
events.next.then(responses.add);
@@ -629,6 +771,313 @@ main() {
});
});
+ group("startTransaction operation produces a transaction that", () {
+ StreamQueue<int> events;
+ StreamQueueTransaction<int> transaction;
+ StreamQueue<int> queue1;
+ StreamQueue<int> queue2;
+ setUp(() async {
+ events = new StreamQueue(createStream());
+ expect(await events.next, 1);
+ transaction = events.startTransaction();
+ queue1 = transaction.newQueue();
+ queue2 = transaction.newQueue();
+ });
+
+ group("emits queues that", () {
+ test("independently emit events", () async {
+ expect(await queue1.next, 2);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(await queue1.next, 3);
+ expect(await queue1.next, 4);
+ expect(await queue2.next, 4);
+ expect(await queue1.hasNext, isFalse);
+ expect(await queue2.hasNext, isFalse);
+ });
+
+ test("queue requests for events", () async {
+ expect(queue1.next, completion(2));
+ expect(queue2.next, completion(2));
+ expect(queue2.next, completion(3));
+ expect(queue1.next, completion(3));
+ expect(queue1.next, completion(4));
+ expect(queue2.next, completion(4));
+ expect(queue1.hasNext, completion(isFalse));
+ expect(queue2.hasNext, completion(isFalse));
+ });
+
+ test("independently emit errors", () async {
+ events = new StreamQueue(createErrorStream());
+ expect(await events.next, 1);
+ transaction = events.startTransaction();
+ queue1 = transaction.newQueue();
+ queue2 = transaction.newQueue();
+
+ expect(queue1.next, completion(2));
+ expect(queue2.next, completion(2));
+ expect(queue2.next, throwsA("To err is divine!"));
+ expect(queue1.next, throwsA("To err is divine!"));
+ expect(queue1.next, completion(4));
+ expect(queue2.next, completion(4));
+ expect(queue1.hasNext, completion(isFalse));
+ expect(queue2.hasNext, completion(isFalse));
+ });
+ });
+
+ group("when rejected", () {
+ test("further original requests use the previous state", () async {
+ expect(await queue1.next, 2);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+
+ await flushMicrotasks();
+ transaction.reject();
+
+ expect(await events.next, 2);
+ expect(await events.next, 3);
+ expect(await events.next, 4);
+ expect(await events.hasNext, isFalse);
+ });
+
+ test("pending original requests use the previous state", () async {
+ expect(await queue1.next, 2);
+ expect(await queue2.next, 2);
+ expect(await queue2.next, 3);
+ expect(events.next, completion(2));
+ expect(events.next, completion(3));
+ expect(events.next, completion(4));
+ expect(events.hasNext, completion(isFalse));
+
+ await flushMicrotasks();
+ transaction.reject();
+ });
+
+ test("further child requests act as though the stream was closed",
+ () async {
+ expect(await queue1.next, 2);
+ transaction.reject();
+
+ expect(await queue1.hasNext, isFalse);
+ expect(queue1.next, throwsStateError);
+ });
+
+ test("pending child requests act as though the stream was closed",
+ () async {
+ expect(await queue1.next, 2);
+ expect(queue1.hasNext, completion(isFalse));
+ expect(queue1.next, throwsStateError);
+ transaction.reject();
+ });
+
+ // Regression test.
+ test("pending child rest requests emit no more events", () async {
+ var controller = new StreamController();
+ var events = new StreamQueue(controller.stream);
+ var transaction = events.startTransaction();
+ var queue = transaction.newQueue();
+
+ // This should emit no more events after the transaction is rejected.
+ queue.rest.listen(expectAsync1((_) {}, count: 3),
+ onDone: expectAsync0(() {}, count: 0));
+
+ controller.add(1);
+ controller.add(2);
+ controller.add(3);
+ await flushMicrotasks();
+
+ transaction.reject();
+ await flushMicrotasks();
+
+ // These shouldn't affect the result of `queue.rest.toList()`.
+ controller.add(4);
+ controller.add(5);
+ });
+
+ test("child requests' cancel() may still be called explicitly", () async {
+ transaction.reject();
+ await queue1.cancel();
+ });
+
+ test("calls to commit() or reject() fail", () async {
+ transaction.reject();
+ expect(transaction.reject, throwsStateError);
+ expect(() => transaction.commit(queue1), throwsStateError);
+ });
+ });
+
+ group("when committed,", () {
+ test("further original requests use the committed state", () async {
+ expect(await queue1.next, 2);
+ await flushMicrotasks();
+ transaction.commit(queue1);
+ expect(await events.next, 3);
+ });
+
+ test("pending original requests use the committed state", () async {
+ expect(await queue1.next, 2);
+ expect(events.next, completion(3));
+ await flushMicrotasks();
+ transaction.commit(queue1);
+ });
+
+ test("further child requests act as though the stream was closed",
+ () async {
+ expect(await queue2.next, 2);
+ transaction.commit(queue2);
+
+ expect(await queue1.hasNext, isFalse);
+ expect(queue1.next, throwsStateError);
+ });
+
+ test("pending child requests act as though the stream was closed",
+ () async {
+ expect(await queue2.next, 2);
+ expect(queue1.hasNext, completion(isFalse));
+ expect(queue1.next, throwsStateError);
+ transaction.commit(queue2);
+ });
+
+ test("further requests act as though the stream was closed", () async {
+ expect(await queue1.next, 2);
+ transaction.commit(queue1);
+
+ expect(await queue1.hasNext, isFalse);
+ expect(queue1.next, throwsStateError);
+ });
+
+ test("cancel() may still be called explicitly", () async {
+ expect(await queue1.next, 2);
+ transaction.commit(queue1);
+ await queue1.cancel();
+ });
+
+ test("throws if there are pending requests", () async {
+ expect(await queue1.next, 2);
+ expect(queue1.hasNext, completion(isTrue));
+ expect(() => transaction.commit(queue1), throwsStateError);
+ });
+
+ test("calls to commit() or reject() fail", () async {
+ transaction.commit(queue1);
+ expect(transaction.reject, throwsStateError);
+ expect(() => transaction.commit(queue1), throwsStateError);
+ });
+ });
+ });
+
+ group("withTransaction operation", () {
+ StreamQueue<int> events;
+ setUp(() async {
+ events = new StreamQueue(createStream());
+ expect(await events.next, 1);
+ });
+
+ test("passes a copy of the parent queue", () async {
+ await events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ expect(await queue.next, 3);
+ expect(await queue.next, 4);
+ expect(await queue.hasNext, isFalse);
+ return true;
+ }));
+ });
+
+ test(
+ "the parent queue continues from the child position if it returns "
+ "true", () async {
+ await events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ return true;
+ }));
+
+ expect(await events.next, 3);
+ });
+
+ test(
+ "the parent queue continues from its original position if it returns "
+ "false", () async {
+ await events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ return false;
+ }));
+
+ expect(await events.next, 2);
+ });
+
+ test("the parent queue continues from the child position if it throws", () {
+ expect(events.withTransaction(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ throw "oh no";
+ })), throwsA("oh no"));
+
+ expect(events.next, completion(3));
+ });
+
+ test("returns whether the transaction succeeded", () {
+ expect(events.withTransaction((_) async => true), completion(isTrue));
+ expect(events.withTransaction((_) async => false), completion(isFalse));
+ });
+ });
+
+ group("cancelable operation", () {
+ StreamQueue<int> events;
+ setUp(() async {
+ events = new StreamQueue(createStream());
+ expect(await events.next, 1);
+ });
+
+ test("passes a copy of the parent queue", () async {
+ await events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ expect(await queue.next, 3);
+ expect(await queue.next, 4);
+ expect(await queue.hasNext, isFalse);
+ })).value;
+ });
+
+ test("the parent queue continues from the child position by default",
+ () async {
+ await events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ })).value;
+
+ expect(await events.next, 3);
+ });
+
+ test(
+ "the parent queue continues from the child position if an error is "
+ "thrown", () async {
+ expect(
+ events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ throw "oh no";
+ })).value,
+ throwsA("oh no"));
+
+ expect(events.next, completion(3));
+ });
+
+ test("the parent queue continues from the original position if canceled",
+ () async {
+ var operation = events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ }));
+ operation.cancel();
+
+ expect(await events.next, 2);
+ });
+
+ test("forwards the value from the callback", () async {
+ expect(
+ await events.cancelable(expectAsync1((queue) async {
+ expect(await queue.next, 2);
+ return "value";
+ })).value,
+ "value");
+ });
+ });
+
test("all combinations sequential skip/next/take operations", () async {
// Takes all combinations of two of next, skip and take, then ends with
// doing rest. Each of the first rounds do 10 events of each type,
@@ -653,8 +1102,9 @@ main() {
// `take(10)`.
takeTest(startIndex) {
expect(events.take(10),
- completion(new List.generate(10, (i) => startIndex + i)));
+ completion(new List.generate(10, (i) => startIndex + i)));
}
+
var tests = [nextTest, skipTest, takeTest];
int counter = 0;
@@ -668,10 +1118,12 @@ main() {
}
// Then expect 20 more events as a `rest` call.
expect(events.rest.toList(),
- completion(new List.generate(20, (i) => counter + i)));
+ completion(new List.generate(20, (i) => counter + i)));
});
}
+typedef T Func1Required<T>(T value);
+
Stream<int> createStream() async* {
yield 1;
await flushMicrotasks();
@@ -683,7 +1135,7 @@ Stream<int> createStream() async* {
}
Stream<int> createErrorStream() {
- StreamController controller = new StreamController<int>();
+ var controller = new StreamController<int>();
() async {
controller.add(1);
await flushMicrotasks();
« no previous file with comments | « packages/async/test/stream_group_test.dart ('k') | packages/async/test/stream_sink_completer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698