OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import "dart:async"; | 5 import "dart:async"; |
6 | 6 |
7 import "package:async/async.dart" show SubscriptionStream; | 7 import "package:async/async.dart" show SubscriptionStream; |
8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
9 | 9 |
10 import "utils.dart"; | 10 import "utils.dart"; |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
63 expect(controller.isPaused, isTrue); | 63 expect(controller.isPaused, isTrue); |
64 | 64 |
65 subscription.resume(); | 65 subscription.resume(); |
66 expect(controller.isPaused, isFalse); | 66 expect(controller.isPaused, isFalse); |
67 | 67 |
68 expect(await subscription.cancel(), 42); | 68 expect(await subscription.cancel(), 42); |
69 expect(controller.hasListener, isFalse); | 69 expect(controller.hasListener, isFalse); |
70 }); | 70 }); |
71 | 71 |
72 group("cancelOnError source:", () { | 72 group("cancelOnError source:", () { |
73 for (var sourceCancels in [false, true]) { | 73 for (var sourceCancels in [/*false,*/ true]) { |
floitsch
2015/10/12 13:18:24
debug comment?
Lasse Reichstein Nielsen
2015/10/13 10:38:21
Yes, removed.
| |
74 group("${sourceCancels ? "yes" : "no"}:", () { | 74 group("${sourceCancels ? "yes" : "no"}:", () { |
75 var subscriptionStream; | 75 var subscriptionStream; |
76 var onCancel; // Completes if source stream is canceled before done. | |
76 setUp(() { | 77 setUp(() { |
77 var source = createErrorStream(); | 78 var cancelCompleter = new Completer(); |
79 var source = createErrorStream(cancelCompleter); | |
80 onCancel = cancelCompleter.future; | |
78 var sourceSubscription = source.listen(null, | 81 var sourceSubscription = source.listen(null, |
79 cancelOnError: sourceCancels); | 82 cancelOnError: sourceCancels); |
80 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); | 83 subscriptionStream = new SubscriptionStream<int>(sourceSubscription); |
81 }); | 84 }); |
82 | 85 |
83 test("- subscriptionStream: no", () async { | 86 test("- subscriptionStream: no", () async { |
84 var done = new Completer(); | 87 var done = new Completer(); |
85 var events = []; | 88 var events = []; |
86 subscriptionStream.listen(events.add, | 89 subscriptionStream.listen(events.add, |
87 onError: events.add, | 90 onError: events.add, |
88 onDone: done.complete, | 91 onDone: done.complete, |
89 cancelOnError: false); | 92 cancelOnError: false); |
90 var expected = [1, 2, "To err is divine!"]; | 93 var expected = [1, 2, "To err is divine!"]; |
91 if (sourceCancels) { | 94 if (sourceCancels) { |
92 var timeout = done.future.timeout(const Duration(milliseconds: 5), | 95 await onCancel; |
93 onTimeout: () => true); | 96 // And [done] won't complete at all. |
94 expect(await timeout, true); | 97 bool isDone = false; |
98 done.future.then((_) { isDone = true; }); | |
99 await new Future.delayed(const Duration(milliseconds: 5)); | |
100 expect(isDone, false); | |
95 } else { | 101 } else { |
96 expected.add(4); | 102 expected.add(4); |
97 await done.future; | 103 await done.future; |
98 } | 104 } |
99 expect(events, expected); | 105 expect(events, expected); |
100 }); | 106 }); |
101 | 107 |
102 test("- subscriptionStream: yes", () async { | 108 test("- subscriptionStream: yes", () async { |
103 var completer = new Completer(); | 109 var completer = new Completer(); |
104 var events = []; | 110 var events = []; |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
148 Stream<int> createStream() async* { | 154 Stream<int> createStream() async* { |
149 yield 1; | 155 yield 1; |
150 await flushMicrotasks(); | 156 await flushMicrotasks(); |
151 yield 2; | 157 yield 2; |
152 await flushMicrotasks(); | 158 await flushMicrotasks(); |
153 yield 3; | 159 yield 3; |
154 await flushMicrotasks(); | 160 await flushMicrotasks(); |
155 yield 4; | 161 yield 4; |
156 } | 162 } |
157 | 163 |
158 Stream<int> createErrorStream() { | 164 Stream<int> createErrorStream([Completer onCancel]) async* { |
159 StreamController controller = new StreamController<int>(); | 165 bool canceled = true; |
160 () async { | 166 try { |
161 controller.add(1); | 167 yield 1; |
162 await flushMicrotasks(); | 168 await flushMicrotasks(); |
163 controller.add(2); | 169 yield 2; |
164 await flushMicrotasks(); | 170 await flushMicrotasks(); |
165 controller.addError("To err is divine!"); | 171 yield* new Future.error("To err is divine!").asStream(); |
166 await flushMicrotasks(); | 172 await flushMicrotasks(); |
167 controller.add(4); | 173 yield 4; |
168 await flushMicrotasks(); | 174 await flushMicrotasks(); |
169 controller.close(); | 175 canceled = false; |
170 }(); | 176 } finally { |
171 return controller.stream; | 177 // Completes before the "done", but should be after all events. |
178 if (canceled && onCancel != null) { | |
179 await flushMicrotasks(); | |
180 onCancel.complete(); | |
181 } | |
182 } | |
172 } | 183 } |
173 | 184 |
174 Stream<int> createLongStream() async* { | 185 Stream<int> createLongStream() async* { |
175 for (int i = 0; i < 200; i++) yield i; | 186 for (int i = 0; i < 200; i++) yield i; |
176 } | 187 } |
OLD | NEW |