OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 import "dart:async"; | 5 import "dart:async"; |
6 | 6 |
7 import "package:async/async.dart" show StreamCompleter; | 7 import "package:async/async.dart" show StreamCompleter; |
8 import "package:test/test.dart"; | 8 import "package:test/test.dart"; |
9 | 9 |
10 import "utils.dart"; | 10 import "utils.dart"; |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
68 await flushMicrotasks(); | 68 await flushMicrotasks(); |
69 expect(events, []); | 69 expect(events, []); |
70 subscription.resume(); | 70 subscription.resume(); |
71 } | 71 } |
72 await done; | 72 await done; |
73 expect(events, [1, 2, 3, 4]); | 73 expect(events, [1, 2, 3, 4]); |
74 }); | 74 }); |
75 | 75 |
76 test("cancel new stream before source is done", () async { | 76 test("cancel new stream before source is done", () async { |
77 var completer = new StreamCompleter(); | 77 var completer = new StreamCompleter(); |
78 var listened = false; | |
79 var lastEvent = -1; | 78 var lastEvent = -1; |
80 var controller = new StreamController(); | 79 var controller = new StreamController(); |
81 var subscription; | 80 var subscription; |
82 subscription = completer.stream.listen( | 81 subscription = completer.stream.listen( |
83 (value) { | 82 (value) { |
84 expect(value, lessThan(3)); | 83 expect(value, lessThan(3)); |
85 lastEvent = value; | 84 lastEvent = value; |
86 if (value == 2) { | 85 if (value == 2) { |
87 subscription.cancel(); | 86 subscription.cancel(); |
88 } | 87 } |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
139 completer.setSourceStream(controller.stream); | 138 completer.setSourceStream(controller.stream); |
140 await flushMicrotasks(); | 139 await flushMicrotasks(); |
141 expect(controller.hasListener, isFalse); | 140 expect(controller.hasListener, isFalse); |
142 var subscription = completer.stream.listen(null); | 141 var subscription = completer.stream.listen(null); |
143 expect(controller.hasListener, isTrue); | 142 expect(controller.hasListener, isTrue); |
144 await subscription.asFuture(); | 143 await subscription.asFuture(); |
145 }); | 144 }); |
146 | 145 |
147 test("cancelOnError true when listening before linking stream", () async { | 146 test("cancelOnError true when listening before linking stream", () async { |
148 var completer = new StreamCompleter(); | 147 var completer = new StreamCompleter(); |
149 var listened = false; | |
150 var canceled = false; | |
151 var lastEvent = -1; | 148 var lastEvent = -1; |
152 var controller = new StreamController(); | 149 var controller = new StreamController(); |
153 var subscription = completer.stream.listen( | 150 completer.stream.listen( |
154 (value) { | 151 (value) { |
155 expect(value, lessThan(3)); | 152 expect(value, lessThan(3)); |
156 lastEvent = value; | 153 lastEvent = value; |
157 }, | 154 }, |
158 onError: (value) { | 155 onError: (value) { |
159 expect(value, "3"); | 156 expect(value, "3"); |
160 lastEvent = value; | 157 lastEvent = value; |
161 }, | 158 }, |
162 onDone: unreachable("done"), | 159 onDone: unreachable("done"), |
163 cancelOnError: true); | 160 cancelOnError: true); |
(...skipping 20 matching lines...) Expand all Loading... |
184 }); | 181 }); |
185 | 182 |
186 test("cancelOnError true when listening after linking stream", () async { | 183 test("cancelOnError true when listening after linking stream", () async { |
187 var completer = new StreamCompleter(); | 184 var completer = new StreamCompleter(); |
188 var lastEvent = -1; | 185 var lastEvent = -1; |
189 var controller = new StreamController(); | 186 var controller = new StreamController(); |
190 completer.setSourceStream(controller.stream); | 187 completer.setSourceStream(controller.stream); |
191 controller.add(1); | 188 controller.add(1); |
192 expect(controller.hasListener, isFalse); | 189 expect(controller.hasListener, isFalse); |
193 | 190 |
194 var subscription = completer.stream.listen( | 191 completer.stream.listen( |
195 (value) { | 192 (value) { |
196 expect(value, lessThan(3)); | 193 expect(value, lessThan(3)); |
197 lastEvent = value; | 194 lastEvent = value; |
198 }, | 195 }, |
199 onError: (value) { | 196 onError: (value) { |
200 expect(value, "3"); | 197 expect(value, "3"); |
201 lastEvent = value; | 198 lastEvent = value; |
202 }, | 199 }, |
203 onDone: unreachable("done"), | 200 onDone: unreachable("done"), |
204 cancelOnError: true); | 201 cancelOnError: true); |
(...skipping 10 matching lines...) Expand all Loading... |
215 expect(controller.hasListener, isTrue); | 212 expect(controller.hasListener, isTrue); |
216 controller.addError("3"); | 213 controller.addError("3"); |
217 | 214 |
218 await flushMicrotasks(); | 215 await flushMicrotasks(); |
219 expect(controller.hasListener, isFalse); | 216 expect(controller.hasListener, isFalse); |
220 }); | 217 }); |
221 | 218 |
222 test("linking a stream after setSourceStream before listen", () async { | 219 test("linking a stream after setSourceStream before listen", () async { |
223 var completer = new StreamCompleter(); | 220 var completer = new StreamCompleter(); |
224 completer.setSourceStream(createStream()); | 221 completer.setSourceStream(createStream()); |
225 expect(() => completer.setSourceStream(createStream()), throws); | 222 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
226 expect(() => completer.setEmpty(createStream()), throws); | 223 expect(() => completer.setEmpty(), throwsStateError); |
227 await completer.stream.toList(); | 224 await completer.stream.toList(); |
228 // Still fails after source is done | 225 // Still fails after source is done |
229 expect(() => completer.setSourceStream(createStream()), throws); | 226 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
230 expect(() => completer.setEmpty(createStream()), throws); | 227 expect(() => completer.setEmpty(), throwsStateError); |
231 }); | 228 }); |
232 | 229 |
233 test("linking a stream after setSourceStream after listen", () async { | 230 test("linking a stream after setSourceStream after listen", () async { |
234 var completer = new StreamCompleter(); | 231 var completer = new StreamCompleter(); |
235 var list = completer.stream.toList(); | 232 var list = completer.stream.toList(); |
236 completer.setSourceStream(createStream()); | 233 completer.setSourceStream(createStream()); |
237 expect(() => completer.setSoureStream(createStream()), throws); | 234 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
238 expect(() => completer.stEmpty(createStream()), throws); | 235 expect(() => completer.setEmpty(), throwsStateError); |
239 await list; | 236 await list; |
240 // Still fails after source is done. | 237 // Still fails after source is done. |
241 expect(() => completer.setSoureStream(createStream()), throws); | 238 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
242 expect(() => completer.stEmpty(createStream()), throws); | 239 expect(() => completer.setEmpty(), throwsStateError); |
243 }); | 240 }); |
244 | 241 |
245 test("linking a stream after setEmpty before listen", () async { | 242 test("linking a stream after setEmpty before listen", () async { |
246 var completer = new StreamCompleter(); | 243 var completer = new StreamCompleter(); |
247 completer.setEmpty(); | 244 completer.setEmpty(); |
248 expect(() => completer.setSoureStream(createStream()), throws); | 245 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
249 expect(() => completer.stEmpty(createStream()), throws); | 246 expect(() => completer.setEmpty(), throwsStateError); |
250 await completer.stream.toList(); | 247 await completer.stream.toList(); |
251 // Still fails after source is done | 248 // Still fails after source is done |
252 expect(() => completer.setSoureStream(createStream()), throws); | 249 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
253 expect(() => completer.stEmpty(createStream()), throws); | 250 expect(() => completer.setEmpty(), throwsStateError); |
254 }); | 251 }); |
255 | 252 |
256 test("linking a stream after setEmpty() after listen", () async { | 253 test("linking a stream after setEmpty() after listen", () async { |
257 var completer = new StreamCompleter(); | 254 var completer = new StreamCompleter(); |
258 var list = completer.stream.toList(); | 255 var list = completer.stream.toList(); |
259 completer.setEmpty(); | 256 completer.setEmpty(); |
260 expect(() => completer.setSoureStream(createStream()), throws); | 257 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
261 expect(() => completer.stEmpty(createStream()), throws); | 258 expect(() => completer.setEmpty(), throwsStateError); |
262 await list; | 259 await list; |
263 // Still fails after source is done. | 260 // Still fails after source is done. |
264 expect(() => completer.setSoureStream(createStream()), throws); | 261 expect(() => completer.setSourceStream(createStream()), throwsStateError); |
265 expect(() => completer.stEmpty(createStream()), throws); | 262 expect(() => completer.setEmpty(), throwsStateError); |
266 }); | 263 }); |
267 | 264 |
268 test("listening more than once after setting stream", () async { | 265 test("listening more than once after setting stream", () async { |
269 var completer = new StreamCompleter(); | 266 var completer = new StreamCompleter(); |
270 completer.setSourceStream(createStream()); | 267 completer.setSourceStream(createStream()); |
271 var list = completer.stream.toList(); | 268 var list = completer.stream.toList(); |
272 expect(() => completer.stream.oList(), throws); | 269 expect(() => completer.stream.toList(), throwsStateError); |
273 await list; | 270 await list; |
274 expect(() => completer.stream.oList(), throws); | 271 expect(() => completer.stream.toList(), throwsStateError); |
275 }); | 272 }); |
276 | 273 |
277 test("listening more than once before setting stream", () async { | 274 test("listening more than once before setting stream", () async { |
278 var completer = new StreamCompleter(); | 275 var completer = new StreamCompleter(); |
279 var list = completer.stream.toList(); | 276 completer.stream.toList(); |
280 expect(() => completer.stream.oList(), throws); | 277 expect(() => completer.stream.toList(), throwsStateError); |
281 }); | 278 }); |
282 | 279 |
283 test("setting onData etc. before and after setting stream", () async { | 280 test("setting onData etc. before and after setting stream", () async { |
284 var completer = new StreamCompleter(); | 281 var completer = new StreamCompleter(); |
285 var controller = new StreamController(); | 282 var controller = new StreamController(); |
286 var subscription = completer.stream.listen(null); | 283 var subscription = completer.stream.listen(null); |
287 var lastEvent = 0; | 284 var lastEvent = 0; |
288 subscription.onData((value) => lastEvent = value); | 285 subscription.onData((value) => lastEvent = value); |
289 subscription.onError((value) => lastEvent = "$value"); | 286 subscription.onError((value) => lastEvent = "$value"); |
290 subscription.onDone(() => lastEvent = -1); | 287 subscription.onDone(() => lastEvent = -1); |
(...skipping 15 matching lines...) Expand all Loading... |
306 expect(lastEvent, "-2"); | 303 expect(lastEvent, "-2"); |
307 controller.close(); | 304 controller.close(); |
308 await flushMicrotasks(); | 305 await flushMicrotasks(); |
309 expect(lastEvent, -1); | 306 expect(lastEvent, -1); |
310 }); | 307 }); |
311 | 308 |
312 test("pause w/ resume future accross setting stream", () async { | 309 test("pause w/ resume future accross setting stream", () async { |
313 var completer = new StreamCompleter(); | 310 var completer = new StreamCompleter(); |
314 var resume = new Completer(); | 311 var resume = new Completer(); |
315 var subscription = completer.stream.listen(unreachable("data")); | 312 var subscription = completer.stream.listen(unreachable("data")); |
316 var lastEvent = 0; | |
317 subscription.pause(resume.future); | 313 subscription.pause(resume.future); |
318 await flushMicrotasks(); | 314 await flushMicrotasks(); |
319 completer.setSourceStream(createStream()); | 315 completer.setSourceStream(createStream()); |
320 await flushMicrotasks(); | 316 await flushMicrotasks(); |
321 resume.complete(); | 317 resume.complete(); |
322 var events = []; | 318 var events = []; |
323 subscription.onData(events.add); | 319 subscription.onData(events.add); |
324 await subscription.asFuture(); | 320 await subscription.asFuture(); |
325 expect(events, [1, 2, 3, 4]); | 321 expect(events, [1, 2, 3, 4]); |
326 }); | 322 }); |
(...skipping 18 matching lines...) Expand all Loading... |
345 | 341 |
346 Stream<int> createStream() async* { | 342 Stream<int> createStream() async* { |
347 yield 1; | 343 yield 1; |
348 await flushMicrotasks(); | 344 await flushMicrotasks(); |
349 yield 2; | 345 yield 2; |
350 await flushMicrotasks(); | 346 await flushMicrotasks(); |
351 yield 3; | 347 yield 3; |
352 await flushMicrotasks(); | 348 await flushMicrotasks(); |
353 yield 4; | 349 yield 4; |
354 } | 350 } |
355 | |
356 /// A zero-millisecond timer should wait until after all microtasks. | |
357 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | |
358 | |
359 /// A generic unreachable callback function. | |
360 /// | |
361 /// Returns a function that fails the test if it is ever called. | |
362 unreachable(String name) => ([a, b]) => fail("Unreachable: $name"); | |
363 | |
364 /// A badly behaved stream which throws if it's ever listened to. | |
365 /// | |
366 /// Can be used to test cases where a stream should not be used. | |
367 class UnusableStream extends Stream { | |
368 listen(onData, {onError, onDone, cancelOnError}) { | |
369 throw new UnimplementedError("Gotcha!"); | |
370 } | |
371 } | |
OLD | NEW |