OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.test.stream_group_test; | |
6 | |
7 import 'dart:async'; | 5 import 'dart:async'; |
8 | 6 |
9 import 'package:async/async.dart'; | 7 import 'package:async/async.dart'; |
10 import 'package:test/test.dart'; | 8 import 'package:test/test.dart'; |
11 | 9 |
12 main() { | 10 main() { |
13 group("single-subscription", () { | 11 group("single-subscription", () { |
14 var streamGroup; | 12 var streamGroup; |
15 setUp(() { | 13 setUp(() { |
16 streamGroup = new StreamGroup<String>(); | 14 streamGroup = new StreamGroup<String>(); |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
70 controller.close(); | 68 controller.close(); |
71 | 69 |
72 await flushMicrotasks(); | 70 await flushMicrotasks(); |
73 | 71 |
74 expect(streamGroup.close(), completes); | 72 expect(streamGroup.close(), completes); |
75 | 73 |
76 var transformed = streamGroup.stream.transform( | 74 var transformed = streamGroup.stream.transform( |
77 new StreamTransformer.fromHandlers( | 75 new StreamTransformer.fromHandlers( |
78 handleData: (data, sink) => sink.add("data: $data"), | 76 handleData: (data, sink) => sink.add("data: $data"), |
79 handleError: (error, _, sink) => sink.add("error: $error"))); | 77 handleError: (error, _, sink) => sink.add("error: $error"))); |
80 expect(transformed.toList(), completion(equals([ | 78 expect( |
81 "data: first", | 79 transformed.toList(), |
82 "error: second", | 80 completion(equals([ |
83 "data: third", | 81 "data: first", |
84 "error: fourth", | 82 "error: second", |
85 "error: fifth", | 83 "data: third", |
86 "data: sixth" | 84 "error: fourth", |
87 ]))); | 85 "error: fifth", |
| 86 "data: sixth" |
| 87 ]))); |
88 }); | 88 }); |
89 | 89 |
90 test("emits events once there's a listener", () { | 90 test("emits events once there's a listener", () { |
91 var controller = new StreamController<String>(); | 91 var controller = new StreamController<String>(); |
92 streamGroup.add(controller.stream); | 92 streamGroup.add(controller.stream); |
93 | 93 |
94 expect(streamGroup.stream.toList(), | 94 expect( |
95 completion(equals(["first", "second"]))); | 95 streamGroup.stream.toList(), completion(equals(["first", "second"]))); |
96 | 96 |
97 controller.add("first"); | 97 controller.add("first"); |
98 controller.add("second"); | 98 controller.add("second"); |
99 controller.close(); | 99 controller.close(); |
100 | 100 |
101 expect(streamGroup.close(), completes); | 101 expect(streamGroup.close(), completes); |
102 }); | 102 }); |
103 | 103 |
104 test("doesn't buffer events from a broadcast stream", () async { | 104 test("doesn't buffer events from a broadcast stream", () async { |
105 var controller = new StreamController<String>.broadcast(); | 105 var controller = new StreamController<String>.broadcast(); |
(...skipping 26 matching lines...) Expand all Loading... |
132 expect(streamGroup.close(), completes); | 132 expect(streamGroup.close(), completes); |
133 await flushMicrotasks(); | 133 await flushMicrotasks(); |
134 | 134 |
135 expect(events, equals(["first", "second"])); | 135 expect(events, equals(["first", "second"])); |
136 }); | 136 }); |
137 | 137 |
138 test("emits events from a broadcast stream once there's a listener", () { | 138 test("emits events from a broadcast stream once there's a listener", () { |
139 var controller = new StreamController<String>.broadcast(); | 139 var controller = new StreamController<String>.broadcast(); |
140 streamGroup.add(controller.stream); | 140 streamGroup.add(controller.stream); |
141 | 141 |
142 expect(streamGroup.stream.toList(), | 142 expect( |
143 completion(equals(["first", "second"]))); | 143 streamGroup.stream.toList(), completion(equals(["first", "second"]))); |
144 | 144 |
145 controller.add("first"); | 145 controller.add("first"); |
146 controller.add("second"); | 146 controller.add("second"); |
147 controller.close(); | 147 controller.close(); |
148 | 148 |
149 expect(streamGroup.close(), completes); | 149 expect(streamGroup.close(), completes); |
150 }); | 150 }); |
151 | 151 |
152 test("forwards cancel errors", () async { | 152 test("forwards cancel errors", () async { |
153 var subscription = streamGroup.stream.listen(null); | 153 var subscription = streamGroup.stream.listen(null); |
154 | 154 |
155 var controller = new StreamController<String>( | 155 var controller = |
156 onCancel: () => throw "error"); | 156 new StreamController<String>(onCancel: () => throw "error"); |
157 streamGroup.add(controller.stream); | 157 streamGroup.add(controller.stream); |
158 await flushMicrotasks(); | 158 await flushMicrotasks(); |
159 | 159 |
160 expect(subscription.cancel(), throwsA("error")); | 160 expect(subscription.cancel(), throwsA("error")); |
161 }); | 161 }); |
162 | 162 |
163 test("forwards a cancel future", () async { | 163 test("forwards a cancel future", () async { |
164 var subscription = streamGroup.stream.listen(null); | 164 var subscription = streamGroup.stream.listen(null); |
165 | 165 |
166 var completer = new Completer(); | 166 var completer = new Completer(); |
167 var controller = new StreamController<String>( | 167 var controller = |
168 onCancel: () => completer.future); | 168 new StreamController<String>(onCancel: () => completer.future); |
169 streamGroup.add(controller.stream); | 169 streamGroup.add(controller.stream); |
170 await flushMicrotasks(); | 170 await flushMicrotasks(); |
171 | 171 |
172 var fired = false; | 172 var fired = false; |
173 subscription.cancel().then((_) => fired = true); | 173 subscription.cancel().then((_) => fired = true); |
174 | 174 |
175 await flushMicrotasks(); | 175 await flushMicrotasks(); |
176 expect(fired, isFalse); | 176 expect(fired, isFalse); |
177 | 177 |
178 completer.complete(); | 178 completer.complete(); |
179 await flushMicrotasks(); | 179 await flushMicrotasks(); |
180 expect(fired, isTrue); | 180 expect(fired, isTrue); |
181 }); | 181 }); |
182 | 182 |
183 test("add() while active pauses the stream if the group is paused, then " | 183 test( |
| 184 "add() while active pauses the stream if the group is paused, then " |
184 "resumes once the group resumes", () async { | 185 "resumes once the group resumes", () async { |
185 var subscription = streamGroup.stream.listen(null); | 186 var subscription = streamGroup.stream.listen(null); |
186 await flushMicrotasks(); | 187 await flushMicrotasks(); |
187 | 188 |
188 var paused = false; | 189 var paused = false; |
189 var controller = new StreamController<String>( | 190 var controller = new StreamController<String>( |
190 onPause: () => paused = true, | 191 onPause: () => paused = true, onResume: () => paused = false); |
191 onResume: () => paused = false); | |
192 | 192 |
193 subscription.pause(); | 193 subscription.pause(); |
194 await flushMicrotasks(); | 194 await flushMicrotasks(); |
195 | 195 |
196 streamGroup.add(controller.stream); | 196 streamGroup.add(controller.stream); |
197 await flushMicrotasks(); | 197 await flushMicrotasks(); |
198 expect(paused, isTrue); | 198 expect(paused, isTrue); |
199 | 199 |
200 subscription.resume(); | 200 subscription.resume(); |
201 await flushMicrotasks(); | 201 await flushMicrotasks(); |
202 expect(paused, isFalse); | 202 expect(paused, isFalse); |
203 }); | 203 }); |
204 | 204 |
205 group("add() while canceled", () { | 205 group("add() while canceled", () { |
206 setUp(() async { | 206 setUp(() async { |
207 streamGroup.stream.listen(null).cancel(); | 207 streamGroup.stream.listen(null).cancel(); |
208 await flushMicrotasks(); | 208 await flushMicrotasks(); |
209 }); | 209 }); |
210 | 210 |
211 test("immediately listens to and cancels the stream", () async { | 211 test("immediately listens to and cancels the stream", () async { |
212 var listened = false; | 212 var listened = false; |
213 var canceled = false; | 213 var canceled = false; |
214 var controller = new StreamController<String>(onListen: () { | 214 var controller = new StreamController<String>(onListen: () { |
215 listened = true; | 215 listened = true; |
216 }, onCancel: expectAsync(() { | 216 }, onCancel: expectAsync0(() { |
217 expect(listened, isTrue); | 217 expect(listened, isTrue); |
218 canceled = true; | 218 canceled = true; |
219 })); | 219 })); |
220 | 220 |
221 streamGroup.add(controller.stream); | 221 streamGroup.add(controller.stream); |
222 await flushMicrotasks(); | 222 await flushMicrotasks(); |
223 expect(listened, isTrue); | 223 expect(listened, isTrue); |
224 expect(canceled, isTrue); | 224 expect(canceled, isTrue); |
225 }); | 225 }); |
226 | 226 |
227 test("forwards cancel errors", () { | 227 test("forwards cancel errors", () { |
228 var controller = new StreamController<String>( | 228 var controller = |
229 onCancel: () => throw "error"); | 229 new StreamController<String>(onCancel: () => throw "error"); |
230 | 230 |
231 expect(streamGroup.add(controller.stream), throwsA("error")); | 231 expect(streamGroup.add(controller.stream), throwsA("error")); |
232 }); | 232 }); |
233 | 233 |
234 test("forwards a cancel future", () async { | 234 test("forwards a cancel future", () async { |
235 var completer = new Completer(); | 235 var completer = new Completer(); |
236 var controller = new StreamController<String>( | 236 var controller = |
237 onCancel: () => completer.future); | 237 new StreamController<String>(onCancel: () => completer.future); |
238 | 238 |
239 var fired = false; | 239 var fired = false; |
240 streamGroup.add(controller.stream).then((_) => fired = true); | 240 streamGroup.add(controller.stream).then((_) => fired = true); |
241 | 241 |
242 await flushMicrotasks(); | 242 await flushMicrotasks(); |
243 expect(fired, isFalse); | 243 expect(fired, isFalse); |
244 | 244 |
245 completer.complete(); | 245 completer.complete(); |
246 await flushMicrotasks(); | 246 await flushMicrotasks(); |
247 expect(fired, isTrue); | 247 expect(fired, isTrue); |
(...skipping 15 matching lines...) Expand all Loading... |
263 | 263 |
264 var controller2 = new StreamController<String>(); | 264 var controller2 = new StreamController<String>(); |
265 streamGroup.add(controller2.stream); | 265 streamGroup.add(controller2.stream); |
266 controller2.add("second"); | 266 controller2.add("second"); |
267 controller2.close(); | 267 controller2.close(); |
268 | 268 |
269 await flushMicrotasks(); | 269 await flushMicrotasks(); |
270 | 270 |
271 expect(streamGroup.close(), completes); | 271 expect(streamGroup.close(), completes); |
272 | 272 |
273 expect(streamGroup.stream.toList(), | 273 expect( |
274 completion(equals(["first", "second"]))); | 274 streamGroup.stream.toList(), completion(equals(["first", "second"]))); |
275 }); | 275 }); |
276 | 276 |
277 test("emits events from multiple sources once there's a listener", () { | 277 test("emits events from multiple sources once there's a listener", () { |
278 var controller1 = new StreamController<String>(); | 278 var controller1 = new StreamController<String>(); |
279 streamGroup.add(controller1.stream); | 279 streamGroup.add(controller1.stream); |
280 | 280 |
281 var controller2 = new StreamController<String>(); | 281 var controller2 = new StreamController<String>(); |
282 streamGroup.add(controller2.stream); | 282 streamGroup.add(controller2.stream); |
283 | 283 |
284 expect(streamGroup.stream.toList(), | 284 expect( |
285 completion(equals(["first", "second"]))); | 285 streamGroup.stream.toList(), completion(equals(["first", "second"]))); |
286 | 286 |
287 controller1.add("first"); | 287 controller1.add("first"); |
288 controller2.add("second"); | 288 controller2.add("second"); |
289 controller1.close(); | 289 controller1.close(); |
290 controller2.close(); | 290 controller2.close(); |
291 | 291 |
292 expect(streamGroup.close(), completes); | 292 expect(streamGroup.close(), completes); |
293 }); | 293 }); |
294 | 294 |
295 test("doesn't buffer events once a listener has been added and removed", | 295 test("doesn't buffer events once a listener has been added and removed", |
(...skipping 24 matching lines...) Expand all Loading... |
320 await flushMicrotasks(); | 320 await flushMicrotasks(); |
321 | 321 |
322 expect(streamGroup.close(), completes); | 322 expect(streamGroup.close(), completes); |
323 expect(streamGroup.stream.toList(), completion(isEmpty)); | 323 expect(streamGroup.stream.toList(), completion(isEmpty)); |
324 }); | 324 }); |
325 | 325 |
326 test("emits events from a broadcast stream once there's a listener", () { | 326 test("emits events from a broadcast stream once there's a listener", () { |
327 var controller = new StreamController<String>.broadcast(); | 327 var controller = new StreamController<String>.broadcast(); |
328 streamGroup.add(controller.stream); | 328 streamGroup.add(controller.stream); |
329 | 329 |
330 expect(streamGroup.stream.toList(), | 330 expect( |
331 completion(equals(["first", "second"]))); | 331 streamGroup.stream.toList(), completion(equals(["first", "second"]))); |
332 | 332 |
333 controller.add("first"); | 333 controller.add("first"); |
334 controller.add("second"); | 334 controller.add("second"); |
335 controller.close(); | 335 controller.close(); |
336 | 336 |
337 expect(streamGroup.close(), completes); | 337 expect(streamGroup.close(), completes); |
338 }); | 338 }); |
339 | 339 |
340 test("cancels and re-listens broadcast streams", () async { | 340 test("cancels and re-listens broadcast streams", () async { |
341 var subscription = streamGroup.stream.listen(null); | 341 var subscription = streamGroup.stream.listen(null); |
342 | 342 |
343 var controller = new StreamController<String>.broadcast(); | 343 var controller = new StreamController<String>.broadcast(); |
344 | 344 |
345 streamGroup.add(controller.stream); | 345 streamGroup.add(controller.stream); |
346 await flushMicrotasks(); | 346 await flushMicrotasks(); |
347 expect(controller.hasListener, isTrue); | 347 expect(controller.hasListener, isTrue); |
348 | 348 |
349 subscription.cancel(); | 349 subscription.cancel(); |
350 await flushMicrotasks(); | 350 await flushMicrotasks(); |
351 expect(controller.hasListener, isFalse); | 351 expect(controller.hasListener, isFalse); |
352 | 352 |
353 streamGroup.stream.listen(null); | 353 streamGroup.stream.listen(null); |
354 await flushMicrotasks(); | 354 await flushMicrotasks(); |
355 expect(controller.hasListener, isTrue); | 355 expect(controller.hasListener, isTrue); |
356 }); | 356 }); |
357 | 357 |
358 test("never cancels single-subscription streams", () async { | 358 test("never cancels single-subscription streams", () async { |
359 var subscription = streamGroup.stream.listen(null); | 359 var subscription = streamGroup.stream.listen(null); |
360 | 360 |
361 var controller = new StreamController<String>( | 361 var controller = |
362 onCancel: expectAsync(() {}, count: 0)); | 362 new StreamController<String>(onCancel: expectAsync0(() {}, count: 0)); |
363 | 363 |
364 streamGroup.add(controller.stream); | 364 streamGroup.add(controller.stream); |
365 await flushMicrotasks(); | 365 await flushMicrotasks(); |
366 | 366 |
367 subscription.cancel(); | 367 subscription.cancel(); |
368 await flushMicrotasks(); | 368 await flushMicrotasks(); |
369 | 369 |
370 streamGroup.stream.listen(null); | 370 streamGroup.stream.listen(null); |
371 await flushMicrotasks(); | 371 await flushMicrotasks(); |
372 }); | 372 }); |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
502 test("stops emitting events for a stream that's removed", () async { | 502 test("stops emitting events for a stream that's removed", () async { |
503 var controller = new StreamController<String>(); | 503 var controller = new StreamController<String>(); |
504 streamGroup.add(controller.stream); | 504 streamGroup.add(controller.stream); |
505 | 505 |
506 expect(streamGroup.stream.toList(), completion(equals(["first"]))); | 506 expect(streamGroup.stream.toList(), completion(equals(["first"]))); |
507 | 507 |
508 controller.add("first"); | 508 controller.add("first"); |
509 await flushMicrotasks(); | 509 await flushMicrotasks(); |
510 controller.add("second"); | 510 controller.add("second"); |
511 | 511 |
512 expect(streamGroup.remove(controller.stream), isNull); | 512 expect(streamGroup.remove(controller.stream), completion(null)); |
513 expect(streamGroup.close(), completes); | 513 expect(streamGroup.close(), completes); |
514 }); | 514 }); |
515 | 515 |
516 test("is a no-op for an unknown stream", () { | 516 test("is a no-op for an unknown stream", () { |
517 var controller = new StreamController<String>(); | 517 var controller = new StreamController<String>(); |
518 expect(streamGroup.remove(controller.stream), isNull); | 518 expect(streamGroup.remove(controller.stream), isNull); |
519 }); | 519 }); |
520 | 520 |
521 test("and closed closes the group when the last stream is removed", | 521 test("and closed closes the group when the last stream is removed", |
522 () async { | 522 () async { |
(...skipping 20 matching lines...) Expand all Loading... |
543 test("doesn't emit events from a removed stream", () { | 543 test("doesn't emit events from a removed stream", () { |
544 var controller = new StreamController<String>(); | 544 var controller = new StreamController<String>(); |
545 streamGroup.add(controller.stream); | 545 streamGroup.add(controller.stream); |
546 | 546 |
547 // The subscription to [controller.stream] is canceled synchronously, so | 547 // The subscription to [controller.stream] is canceled synchronously, so |
548 // the first event is dropped even though it was added before the | 548 // the first event is dropped even though it was added before the |
549 // removal. This is documented in [StreamGroup.remove]. | 549 // removal. This is documented in [StreamGroup.remove]. |
550 expect(streamGroup.stream.toList(), completion(isEmpty)); | 550 expect(streamGroup.stream.toList(), completion(isEmpty)); |
551 | 551 |
552 controller.add("first"); | 552 controller.add("first"); |
553 expect(streamGroup.remove(controller.stream), isNull); | 553 expect(streamGroup.remove(controller.stream), completion(null)); |
554 controller.add("second"); | 554 controller.add("second"); |
555 | 555 |
556 expect(streamGroup.close(), completes); | 556 expect(streamGroup.close(), completes); |
557 }); | 557 }); |
558 | 558 |
559 test("cancels the stream's subscription", () async { | 559 test("cancels the stream's subscription", () async { |
560 var controller = new StreamController<String>(); | 560 var controller = new StreamController<String>(); |
561 streamGroup.add(controller.stream); | 561 streamGroup.add(controller.stream); |
562 | 562 |
563 streamGroup.stream.listen(null); | 563 streamGroup.stream.listen(null); |
564 await flushMicrotasks(); | 564 await flushMicrotasks(); |
565 expect(controller.hasListener, isTrue); | 565 expect(controller.hasListener, isTrue); |
566 | 566 |
567 streamGroup.remove(controller.stream); | 567 streamGroup.remove(controller.stream); |
568 await flushMicrotasks(); | 568 await flushMicrotasks(); |
569 expect(controller.hasListener, isFalse); | 569 expect(controller.hasListener, isFalse); |
570 }); | 570 }); |
571 | 571 |
572 test("forwards cancel errors", () async { | 572 test("forwards cancel errors", () async { |
573 var controller = new StreamController<String>( | 573 var controller = |
574 onCancel: () => throw "error"); | 574 new StreamController<String>(onCancel: () => throw "error"); |
575 streamGroup.add(controller.stream); | 575 streamGroup.add(controller.stream); |
576 | 576 |
577 streamGroup.stream.listen(null); | 577 streamGroup.stream.listen(null); |
578 await flushMicrotasks(); | 578 await flushMicrotasks(); |
579 | 579 |
580 expect(streamGroup.remove(controller.stream), throwsA("error")); | 580 expect(streamGroup.remove(controller.stream), throwsA("error")); |
581 }); | 581 }); |
582 | 582 |
583 test("forwards cancel futures", () async { | 583 test("forwards cancel futures", () async { |
584 var completer = new Completer(); | 584 var completer = new Completer(); |
585 var controller = new StreamController<String>( | 585 var controller = |
586 onCancel: () => completer.future); | 586 new StreamController<String>(onCancel: () => completer.future); |
587 | 587 |
588 streamGroup.stream.listen(null); | 588 streamGroup.stream.listen(null); |
589 await flushMicrotasks(); | 589 await flushMicrotasks(); |
590 | 590 |
591 streamGroup.add(controller.stream); | 591 streamGroup.add(controller.stream); |
592 await flushMicrotasks(); | 592 await flushMicrotasks(); |
593 | 593 |
594 var fired = false; | 594 var fired = false; |
595 streamGroup.remove(controller.stream).then((_) => fired = true); | 595 streamGroup.remove(controller.stream).then((_) => fired = true); |
596 | 596 |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
636 }); | 636 }); |
637 }); | 637 }); |
638 | 638 |
639 group("close()", () { | 639 group("close()", () { |
640 group("while dormant", () { | 640 group("while dormant", () { |
641 test("if there are no streams, closes the group", () { | 641 test("if there are no streams, closes the group", () { |
642 expect(streamGroup.close(), completes); | 642 expect(streamGroup.close(), completes); |
643 expect(streamGroup.stream.toList(), completion(isEmpty)); | 643 expect(streamGroup.stream.toList(), completion(isEmpty)); |
644 }); | 644 }); |
645 | 645 |
646 test("if there are streams, closes the group once those streams close " | 646 test( |
| 647 "if there are streams, closes the group once those streams close " |
647 "and there's a listener", () async { | 648 "and there's a listener", () async { |
648 var controller1 = new StreamController<String>(); | 649 var controller1 = new StreamController<String>(); |
649 var controller2 = new StreamController<String>(); | 650 var controller2 = new StreamController<String>(); |
650 | 651 |
651 streamGroup.add(controller1.stream); | 652 streamGroup.add(controller1.stream); |
652 streamGroup.add(controller2.stream); | 653 streamGroup.add(controller2.stream); |
653 await flushMicrotasks(); | 654 await flushMicrotasks(); |
654 | 655 |
655 streamGroup.close(); | 656 streamGroup.close(); |
656 | 657 |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
714 controller.close(); | 715 controller.close(); |
715 | 716 |
716 await streamGroup.close(); | 717 await streamGroup.close(); |
717 expect(events, equals(["one", "two", "three", "four", "five", "six"])); | 718 expect(events, equals(["one", "two", "three", "four", "five", "six"])); |
718 }); | 719 }); |
719 }); | 720 }); |
720 } | 721 } |
721 | 722 |
722 /// Wait for all microtasks to complete. | 723 /// Wait for all microtasks to complete. |
723 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); | 724 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); |
OLD | NEW |