Index: packages/async/test/stream_queue_test.dart |
diff --git a/packages/async/test/stream_queue_test.dart b/packages/async/test/stream_queue_test.dart |
index 228ba8adc4fb419a7c3b69d47c8494afb0d43005..2e4805b80754569a4615cf08d3109d6c76203456 100644 |
--- a/packages/async/test/stream_queue_test.dart |
+++ b/packages/async/test/stream_queue_test.dart |
@@ -4,7 +4,7 @@ |
import "dart:async"; |
-import "package:async/async.dart" show StreamQueue; |
+import "package:async/async.dart"; |
import "package:test/test.dart"; |
import "utils.dart"; |
@@ -12,7 +12,7 @@ import "utils.dart"; |
main() { |
group("source stream", () { |
test("is listened to on first request, paused between requests", () async { |
- var controller = new StreamController(); |
+ var controller = new StreamController<int>(); |
var events = new StreamQueue<int>(controller.stream); |
await flushMicrotasks(); |
expect(controller.hasListener, isFalse); |
@@ -42,6 +42,106 @@ main() { |
}); |
}); |
+ group("eventsDispatched", () { |
+ test("increments after a next future completes", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ |
+ expect(events.eventsDispatched, equals(0)); |
+ await flushMicrotasks(); |
+ expect(events.eventsDispatched, equals(0)); |
+ |
+ var next = events.next; |
+ expect(events.eventsDispatched, equals(0)); |
+ |
+ await next; |
+ expect(events.eventsDispatched, equals(1)); |
+ |
+ await events.next; |
+ expect(events.eventsDispatched, equals(2)); |
+ }); |
+ |
+ test("increments multiple times for multi-value requests", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ await events.take(3); |
+ expect(events.eventsDispatched, equals(3)); |
+ }); |
+ |
+ test("increments multiple times for an accepted transaction", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ await events.withTransaction((queue) async { |
+ await queue.next; |
+ await queue.next; |
+ return true; |
+ }); |
+ expect(events.eventsDispatched, equals(2)); |
+ }); |
+ |
+ test("doesn't increment for rest requests", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ await events.rest.toList(); |
+ expect(events.eventsDispatched, equals(0)); |
+ }); |
+ }); |
+ |
+ group("lookAhead operation", () { |
+ test("as simple list of events", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ expect(await events.lookAhead(4), [1, 2, 3, 4]); |
+ expect(await events.next, 1); |
+ expect(await events.lookAhead(2), [2, 3]); |
+ expect(await events.take(2), [2, 3]); |
+ expect(await events.next, 4); |
+ await events.cancel(); |
+ }); |
+ |
+ test("of 0 events", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ expect(events.lookAhead(0), completion([])); |
+ expect(events.next, completion(1)); |
+ expect(events.lookAhead(0), completion([])); |
+ expect(events.next, completion(2)); |
+ expect(events.lookAhead(0), completion([])); |
+ expect(events.next, completion(3)); |
+ expect(events.lookAhead(0), completion([])); |
+ expect(events.next, completion(4)); |
+ expect(events.lookAhead(0), completion([])); |
+ expect(events.lookAhead(5), completion([])); |
+ expect(events.next, throwsStateError); |
+ await events.cancel(); |
+ }); |
+ |
+ test("with bad arguments throws", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ expect(() => events.lookAhead(-1), throwsArgumentError); |
+ expect(await events.next, 1); // Did not consume event. |
+ expect(() => events.lookAhead(-1), throwsArgumentError); |
+ expect(await events.next, 2); // Did not consume event. |
+ await events.cancel(); |
+ }); |
+ |
+ test("of too many arguments", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ expect(await events.lookAhead(6), [1, 2, 3, 4]); |
+ await events.cancel(); |
+ }); |
+ |
+ test("too large later", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ expect(await events.next, 1); |
+ expect(await events.next, 2); |
+ expect(await events.lookAhead(6), [3, 4]); |
+ await events.cancel(); |
+ }); |
+ |
+ test("error", () async { |
+ var events = new StreamQueue<int>(createErrorStream()); |
+ expect(events.lookAhead(4), throwsA("To err is divine!")); |
+ expect(events.take(4), throwsA("To err is divine!")); |
+ expect(await events.next, 4); |
+ await events.cancel(); |
+ }); |
+ }); |
+ |
group("next operation", () { |
test("simple sequence of requests", () async { |
var events = new StreamQueue<int>(createStream()); |
@@ -53,8 +153,8 @@ main() { |
test("multiple requests at the same time", () async { |
var events = new StreamQueue<int>(createStream()); |
- var result = await Future.wait( |
- [events.next, events.next, events.next, events.next]); |
+ var result = await Future |
+ .wait([events.next, events.next, events.next, events.next]); |
expect(result, [1, 2, 3, 4]); |
await events.cancel(); |
}); |
@@ -83,9 +183,9 @@ main() { |
expect(() => events.skip(-1), throwsArgumentError); |
// A non-int throws either a type error or an argument error, |
// depending on whether it's checked mode or not. |
- expect(await events.next, 1); // Did not consume event. |
+ expect(await events.next, 1); // Did not consume event. |
expect(() => events.skip(-1), throwsArgumentError); |
- expect(await events.next, 2); // Did not consume event. |
+ expect(await events.next, 2); // Did not consume event. |
await events.cancel(); |
}); |
@@ -150,15 +250,17 @@ main() { |
var skip4 = events.skip(1); |
var index = 0; |
// Check that futures complete in order. |
- sequence(expectedValue, sequenceIndex) => (value) { |
- expect(value, expectedValue); |
- expect(index, sequenceIndex); |
- index++; |
- }; |
- await Future.wait([skip1.then(sequence(0, 0)), |
- skip2.then(sequence(0, 1)), |
- skip3.then(sequence(1, 2)), |
- skip4.then(sequence(1, 3))]); |
+ Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) { |
+ expect(value, expectedValue); |
+ expect(index, sequenceIndex); |
+ index++; |
+ }; |
+ await Future.wait([ |
+ skip1.then(sequence(0, 0)), |
+ skip2.then(sequence(0, 1)), |
+ skip3.then(sequence(1, 2)), |
+ skip4.then(sequence(1, 3)) |
+ ]); |
await events.cancel(); |
}); |
}); |
@@ -191,9 +293,9 @@ main() { |
test("with bad arguments throws", () async { |
var events = new StreamQueue<int>(createStream()); |
expect(() => events.take(-1), throwsArgumentError); |
- expect(await events.next, 1); // Did not consume event. |
+ expect(await events.next, 1); // Did not consume event. |
expect(() => events.take(-1), throwsArgumentError); |
- expect(await events.next, 2); // Did not consume event. |
+ expect(await events.next, 2); // Did not consume event. |
await events.cancel(); |
}); |
@@ -288,7 +390,7 @@ main() { |
test("forwards to underlying stream", () async { |
var cancel = new Completer(); |
- var controller = new StreamController(onCancel: () => cancel.future); |
+ var controller = new StreamController<int>(onCancel: () => cancel.future); |
var events = new StreamQueue<int>(controller.stream); |
expect(controller.hasListener, isFalse); |
var next = events.next; |
@@ -333,11 +435,46 @@ main() { |
}); |
}); |
+ group("peek operation", () { |
+ test("peeks one event", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ expect(await events.peek, 1); |
+ expect(await events.next, 1); |
+ expect(await events.peek, 2); |
+ expect(await events.take(2), [2, 3]); |
+ expect(await events.peek, 4); |
+ expect(await events.next, 4); |
+ // Throws at end. |
+ expect(events.peek, throws); |
+ await events.cancel(); |
+ }); |
+ test("multiple requests at the same time", () async { |
+ var events = new StreamQueue<int>(createStream()); |
+ var result = await Future.wait( |
+ [events.peek, events.peek, events.next, events.peek, events.peek]); |
+ expect(result, [1, 1, 1, 2, 2]); |
+ await events.cancel(); |
+ }); |
+ test("sequence of requests with error", () async { |
+ var events = new StreamQueue<int>(createErrorStream()); |
+ expect(await events.next, 1); |
+ expect(await events.next, 2); |
+ expect(events.peek, throwsA("To err is divine!")); |
+ // Error stays in queue. |
+ expect(events.peek, throwsA("To err is divine!")); |
+ expect(events.next, throwsA("To err is divine!")); |
+ expect(await events.next, 4); |
+ await events.cancel(); |
+ }); |
+ }); |
+ |
group("cancel operation", () { |
test("closes the events, prevents any other operation", () async { |
var events = new StreamQueue<int>(createStream()); |
await events.cancel(); |
+ expect(() => events.lookAhead(1), throwsStateError); |
expect(() => events.next, throwsStateError); |
+ expect(() => events.peek, throwsStateError); |
expect(() => events.skip(1), throwsStateError); |
expect(() => events.take(1), throwsStateError); |
expect(() => events.rest, throwsStateError); |
@@ -347,14 +484,14 @@ main() { |
test("cancels underlying subscription when called before any event", |
() async { |
var cancelFuture = new Future.value(42); |
- var controller = new StreamController(onCancel: () => cancelFuture); |
+ var controller = new StreamController<int>(onCancel: () => cancelFuture); |
var events = new StreamQueue<int>(controller.stream); |
expect(await events.cancel(), 42); |
}); |
test("cancels underlying subscription, returns result", () async { |
var cancelFuture = new Future.value(42); |
- var controller = new StreamController(onCancel: () => cancelFuture); |
+ var controller = new StreamController<int>(onCancel: () => cancelFuture); |
var events = new StreamQueue<int>(controller.stream); |
controller.add(1); |
expect(await events.next, 1); |
@@ -373,7 +510,7 @@ main() { |
}); |
test("cancels the underlying subscription immediately", () async { |
- var controller = new StreamController(); |
+ var controller = new StreamController<int>(); |
controller.add(1); |
var events = new StreamQueue<int>(controller.stream); |
@@ -387,7 +524,8 @@ main() { |
test("cancels the underlying subscription when called before any event", |
() async { |
var cancelFuture = new Future.value(42); |
- var controller = new StreamController(onCancel: () => cancelFuture); |
+ var controller = |
+ new StreamController<int>(onCancel: () => cancelFuture); |
var events = new StreamQueue<int>(controller.stream); |
expect(await events.cancel(immediate: true), 42); |
@@ -404,8 +542,8 @@ main() { |
test("returns the result of closing the underlying subscription", |
() async { |
- var controller = new StreamController( |
- onCancel: () => new Future.value(42)); |
+ var controller = |
+ new StreamController<int>(onCancel: () => new Future.value(42)); |
var events = new StreamQueue<int>(controller.stream); |
expect(await events.cancel(immediate: true), 42); |
}); |
@@ -413,8 +551,8 @@ main() { |
test("listens and then cancels a stream that hasn't been listened to yet", |
() async { |
var wasListened = false; |
- var controller = new StreamController( |
- onListen: () => wasListened = true); |
+ var controller = |
+ new StreamController<int>(onListen: () => wasListened = true); |
var events = new StreamQueue<int>(controller.stream); |
expect(wasListened, isFalse); |
expect(controller.hasListener, isFalse); |
@@ -448,7 +586,7 @@ main() { |
test("true when enqueued", () async { |
var events = new StreamQueue<int>(createStream()); |
- var values = []; |
+ var values = <int>[]; |
for (int i = 1; i <= 3; i++) { |
events.next.then(values.add); |
} |
@@ -459,7 +597,7 @@ main() { |
test("false when enqueued", () async { |
var events = new StreamQueue<int>(createStream()); |
- var values = []; |
+ var values = <int>[]; |
for (int i = 1; i <= 4; i++) { |
events.next.then(values.add); |
} |
@@ -469,11 +607,13 @@ main() { |
}); |
test("true when data event", () async { |
- var controller = new StreamController(); |
+ var controller = new StreamController<int>(); |
var events = new StreamQueue<int>(controller.stream); |
var hasNext; |
- events.hasNext.then((result) { hasNext = result; }); |
+ events.hasNext.then((result) { |
+ hasNext = result; |
+ }); |
await flushMicrotasks(); |
expect(hasNext, isNull); |
controller.add(42); |
@@ -483,11 +623,13 @@ main() { |
}); |
test("true when error event", () async { |
- var controller = new StreamController(); |
+ var controller = new StreamController<int>(); |
var events = new StreamQueue<int>(controller.stream); |
var hasNext; |
- events.hasNext.then((result) { hasNext = result; }); |
+ events.hasNext.then((result) { |
+ hasNext = result; |
+ }); |
await flushMicrotasks(); |
expect(hasNext, isNull); |
controller.addError("BAD"); |
@@ -525,7 +667,7 @@ main() { |
test("- next after true, enqueued", () async { |
var events = new StreamQueue<int>(createStream()); |
- var responses = []; |
+ var responses = <Object>[]; |
events.next.then(responses.add); |
events.hasNext.then(responses.add); |
events.next.then(responses.add); |
@@ -629,6 +771,313 @@ main() { |
}); |
}); |
+ group("startTransaction operation produces a transaction that", () { |
+ StreamQueue<int> events; |
+ StreamQueueTransaction<int> transaction; |
+ StreamQueue<int> queue1; |
+ StreamQueue<int> queue2; |
+ setUp(() async { |
+ events = new StreamQueue(createStream()); |
+ expect(await events.next, 1); |
+ transaction = events.startTransaction(); |
+ queue1 = transaction.newQueue(); |
+ queue2 = transaction.newQueue(); |
+ }); |
+ |
+ group("emits queues that", () { |
+ test("independently emit events", () async { |
+ expect(await queue1.next, 2); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(await queue1.next, 3); |
+ expect(await queue1.next, 4); |
+ expect(await queue2.next, 4); |
+ expect(await queue1.hasNext, isFalse); |
+ expect(await queue2.hasNext, isFalse); |
+ }); |
+ |
+ test("queue requests for events", () async { |
+ expect(queue1.next, completion(2)); |
+ expect(queue2.next, completion(2)); |
+ expect(queue2.next, completion(3)); |
+ expect(queue1.next, completion(3)); |
+ expect(queue1.next, completion(4)); |
+ expect(queue2.next, completion(4)); |
+ expect(queue1.hasNext, completion(isFalse)); |
+ expect(queue2.hasNext, completion(isFalse)); |
+ }); |
+ |
+ test("independently emit errors", () async { |
+ events = new StreamQueue(createErrorStream()); |
+ expect(await events.next, 1); |
+ transaction = events.startTransaction(); |
+ queue1 = transaction.newQueue(); |
+ queue2 = transaction.newQueue(); |
+ |
+ expect(queue1.next, completion(2)); |
+ expect(queue2.next, completion(2)); |
+ expect(queue2.next, throwsA("To err is divine!")); |
+ expect(queue1.next, throwsA("To err is divine!")); |
+ expect(queue1.next, completion(4)); |
+ expect(queue2.next, completion(4)); |
+ expect(queue1.hasNext, completion(isFalse)); |
+ expect(queue2.hasNext, completion(isFalse)); |
+ }); |
+ }); |
+ |
+ group("when rejected", () { |
+ test("further original requests use the previous state", () async { |
+ expect(await queue1.next, 2); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ |
+ await flushMicrotasks(); |
+ transaction.reject(); |
+ |
+ expect(await events.next, 2); |
+ expect(await events.next, 3); |
+ expect(await events.next, 4); |
+ expect(await events.hasNext, isFalse); |
+ }); |
+ |
+ test("pending original requests use the previous state", () async { |
+ expect(await queue1.next, 2); |
+ expect(await queue2.next, 2); |
+ expect(await queue2.next, 3); |
+ expect(events.next, completion(2)); |
+ expect(events.next, completion(3)); |
+ expect(events.next, completion(4)); |
+ expect(events.hasNext, completion(isFalse)); |
+ |
+ await flushMicrotasks(); |
+ transaction.reject(); |
+ }); |
+ |
+ test("further child requests act as though the stream was closed", |
+ () async { |
+ expect(await queue1.next, 2); |
+ transaction.reject(); |
+ |
+ expect(await queue1.hasNext, isFalse); |
+ expect(queue1.next, throwsStateError); |
+ }); |
+ |
+ test("pending child requests act as though the stream was closed", |
+ () async { |
+ expect(await queue1.next, 2); |
+ expect(queue1.hasNext, completion(isFalse)); |
+ expect(queue1.next, throwsStateError); |
+ transaction.reject(); |
+ }); |
+ |
+ // Regression test. |
+ test("pending child rest requests emit no more events", () async { |
+ var controller = new StreamController(); |
+ var events = new StreamQueue(controller.stream); |
+ var transaction = events.startTransaction(); |
+ var queue = transaction.newQueue(); |
+ |
+ // This should emit no more events after the transaction is rejected. |
+ queue.rest.listen(expectAsync1((_) {}, count: 3), |
+ onDone: expectAsync0(() {}, count: 0)); |
+ |
+ controller.add(1); |
+ controller.add(2); |
+ controller.add(3); |
+ await flushMicrotasks(); |
+ |
+ transaction.reject(); |
+ await flushMicrotasks(); |
+ |
+ // These shouldn't affect the result of `queue.rest.toList()`. |
+ controller.add(4); |
+ controller.add(5); |
+ }); |
+ |
+ test("child requests' cancel() may still be called explicitly", () async { |
+ transaction.reject(); |
+ await queue1.cancel(); |
+ }); |
+ |
+ test("calls to commit() or reject() fail", () async { |
+ transaction.reject(); |
+ expect(transaction.reject, throwsStateError); |
+ expect(() => transaction.commit(queue1), throwsStateError); |
+ }); |
+ }); |
+ |
+ group("when committed,", () { |
+ test("further original requests use the committed state", () async { |
+ expect(await queue1.next, 2); |
+ await flushMicrotasks(); |
+ transaction.commit(queue1); |
+ expect(await events.next, 3); |
+ }); |
+ |
+ test("pending original requests use the committed state", () async { |
+ expect(await queue1.next, 2); |
+ expect(events.next, completion(3)); |
+ await flushMicrotasks(); |
+ transaction.commit(queue1); |
+ }); |
+ |
+ test("further child requests act as though the stream was closed", |
+ () async { |
+ expect(await queue2.next, 2); |
+ transaction.commit(queue2); |
+ |
+ expect(await queue1.hasNext, isFalse); |
+ expect(queue1.next, throwsStateError); |
+ }); |
+ |
+ test("pending child requests act as though the stream was closed", |
+ () async { |
+ expect(await queue2.next, 2); |
+ expect(queue1.hasNext, completion(isFalse)); |
+ expect(queue1.next, throwsStateError); |
+ transaction.commit(queue2); |
+ }); |
+ |
+ test("further requests act as though the stream was closed", () async { |
+ expect(await queue1.next, 2); |
+ transaction.commit(queue1); |
+ |
+ expect(await queue1.hasNext, isFalse); |
+ expect(queue1.next, throwsStateError); |
+ }); |
+ |
+ test("cancel() may still be called explicitly", () async { |
+ expect(await queue1.next, 2); |
+ transaction.commit(queue1); |
+ await queue1.cancel(); |
+ }); |
+ |
+ test("throws if there are pending requests", () async { |
+ expect(await queue1.next, 2); |
+ expect(queue1.hasNext, completion(isTrue)); |
+ expect(() => transaction.commit(queue1), throwsStateError); |
+ }); |
+ |
+ test("calls to commit() or reject() fail", () async { |
+ transaction.commit(queue1); |
+ expect(transaction.reject, throwsStateError); |
+ expect(() => transaction.commit(queue1), throwsStateError); |
+ }); |
+ }); |
+ }); |
+ |
+ group("withTransaction operation", () { |
+ StreamQueue<int> events; |
+ setUp(() async { |
+ events = new StreamQueue(createStream()); |
+ expect(await events.next, 1); |
+ }); |
+ |
+ test("passes a copy of the parent queue", () async { |
+ await events.withTransaction(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ expect(await queue.next, 3); |
+ expect(await queue.next, 4); |
+ expect(await queue.hasNext, isFalse); |
+ return true; |
+ })); |
+ }); |
+ |
+ test( |
+ "the parent queue continues from the child position if it returns " |
+ "true", () async { |
+ await events.withTransaction(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ return true; |
+ })); |
+ |
+ expect(await events.next, 3); |
+ }); |
+ |
+ test( |
+ "the parent queue continues from its original position if it returns " |
+ "false", () async { |
+ await events.withTransaction(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ return false; |
+ })); |
+ |
+ expect(await events.next, 2); |
+ }); |
+ |
+ test("the parent queue continues from the child position if it throws", () { |
+ expect(events.withTransaction(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ throw "oh no"; |
+ })), throwsA("oh no")); |
+ |
+ expect(events.next, completion(3)); |
+ }); |
+ |
+ test("returns whether the transaction succeeded", () { |
+ expect(events.withTransaction((_) async => true), completion(isTrue)); |
+ expect(events.withTransaction((_) async => false), completion(isFalse)); |
+ }); |
+ }); |
+ |
+ group("cancelable operation", () { |
+ StreamQueue<int> events; |
+ setUp(() async { |
+ events = new StreamQueue(createStream()); |
+ expect(await events.next, 1); |
+ }); |
+ |
+ test("passes a copy of the parent queue", () async { |
+ await events.cancelable(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ expect(await queue.next, 3); |
+ expect(await queue.next, 4); |
+ expect(await queue.hasNext, isFalse); |
+ })).value; |
+ }); |
+ |
+ test("the parent queue continues from the child position by default", |
+ () async { |
+ await events.cancelable(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ })).value; |
+ |
+ expect(await events.next, 3); |
+ }); |
+ |
+ test( |
+ "the parent queue continues from the child position if an error is " |
+ "thrown", () async { |
+ expect( |
+ events.cancelable(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ throw "oh no"; |
+ })).value, |
+ throwsA("oh no")); |
+ |
+ expect(events.next, completion(3)); |
+ }); |
+ |
+ test("the parent queue continues from the original position if canceled", |
+ () async { |
+ var operation = events.cancelable(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ })); |
+ operation.cancel(); |
+ |
+ expect(await events.next, 2); |
+ }); |
+ |
+ test("forwards the value from the callback", () async { |
+ expect( |
+ await events.cancelable(expectAsync1((queue) async { |
+ expect(await queue.next, 2); |
+ return "value"; |
+ })).value, |
+ "value"); |
+ }); |
+ }); |
+ |
test("all combinations sequential skip/next/take operations", () async { |
// Takes all combinations of two of next, skip and take, then ends with |
// doing rest. Each of the first rounds do 10 events of each type, |
@@ -653,8 +1102,9 @@ main() { |
// `take(10)`. |
takeTest(startIndex) { |
expect(events.take(10), |
- completion(new List.generate(10, (i) => startIndex + i))); |
+ completion(new List.generate(10, (i) => startIndex + i))); |
} |
+ |
var tests = [nextTest, skipTest, takeTest]; |
int counter = 0; |
@@ -668,10 +1118,12 @@ main() { |
} |
// Then expect 20 more events as a `rest` call. |
expect(events.rest.toList(), |
- completion(new List.generate(20, (i) => counter + i))); |
+ completion(new List.generate(20, (i) => counter + i))); |
}); |
} |
+typedef T Func1Required<T>(T value); |
+ |
Stream<int> createStream() async* { |
yield 1; |
await flushMicrotasks(); |
@@ -683,7 +1135,7 @@ Stream<int> createStream() async* { |
} |
Stream<int> createErrorStream() { |
- StreamController controller = new StreamController<int>(); |
+ var controller = new StreamController<int>(); |
() async { |
controller.add(1); |
await flushMicrotasks(); |