Index: test/subscription_stream_test.dart |
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart |
index 03f0dddec203723465beb5679edbed44f787adc9..0d1870b2cf52cbbb87d749e62f75b22f5e8419f2 100644 |
--- a/test/subscription_stream_test.dart |
+++ b/test/subscription_stream_test.dart |
@@ -73,8 +73,11 @@ main() { |
for (var sourceCancels in [false, true]) { |
group("${sourceCancels ? "yes" : "no"}:", () { |
var subscriptionStream; |
+ var onCancel; // Completes if source stream is canceled before done. |
setUp(() { |
- var source = createErrorStream(); |
+ var cancelCompleter = new Completer(); |
+ var source = createErrorStream(cancelCompleter); |
+ onCancel = cancelCompleter.future; |
var sourceSubscription = source.listen(null, |
cancelOnError: sourceCancels); |
subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
@@ -89,9 +92,12 @@ main() { |
cancelOnError: false); |
var expected = [1, 2, "To err is divine!"]; |
if (sourceCancels) { |
- var timeout = done.future.timeout(const Duration(milliseconds: 5), |
- onTimeout: () => true); |
- expect(await timeout, true); |
+ await onCancel; |
+ // And [done] won't complete at all. |
+ bool isDone = false; |
+ done.future.then((_) { isDone = true; }); |
+ await new Future.delayed(const Duration(milliseconds: 5)); |
+ expect(isDone, false); |
} else { |
expected.add(4); |
await done.future; |
@@ -155,20 +161,25 @@ Stream<int> createStream() async* { |
yield 4; |
} |
-Stream<int> createErrorStream() { |
- StreamController controller = new StreamController<int>(); |
- () async { |
- controller.add(1); |
+Stream<int> createErrorStream([Completer onCancel]) async* { |
+ bool canceled = true; |
+ try { |
+ yield 1; |
await flushMicrotasks(); |
- controller.add(2); |
+ yield 2; |
await flushMicrotasks(); |
- controller.addError("To err is divine!"); |
+ yield* new Future.error("To err is divine!").asStream(); |
await flushMicrotasks(); |
- controller.add(4); |
+ yield 4; |
await flushMicrotasks(); |
- controller.close(); |
- }(); |
- return controller.stream; |
+ canceled = false; |
+ } finally { |
+ // Completes before the "done", but should be after all events. |
+ if (canceled && onCancel != null) { |
+ await flushMicrotasks(); |
+ onCancel.complete(); |
+ } |
+ } |
} |
Stream<int> createLongStream() async* { |