Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(179)

Side by Side Diff: test/stream_group_test.dart

Issue 1178793006: Add a StreamGroup class for merging streams. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/stream_group.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.test.stream_group_test;
6
7 import 'dart:async';
8
9 import 'package:async/async.dart';
10 import 'package:test/test.dart';
11
12 main() {
13 group("single-subscription", () {
14 var streamGroup;
15 setUp(() {
16 streamGroup = new StreamGroup<String>();
17 });
18
19 test("buffers events from multiple sources", () async {
20 var controller1 = new StreamController<String>();
21 streamGroup.add(controller1.stream);
22 controller1.add("first");
23 controller1.close();
24
25 var controller2 = new StreamController<String>();
26 streamGroup.add(controller2.stream);
27 controller2.add("second");
28 controller2.close();
29
30 await flushMicrotasks();
31
32 expect(streamGroup.close(), completes);
33
34 expect(streamGroup.stream.toList(),
35 completion(unorderedEquals(["first", "second"])));
36 });
37
38 test("buffers errors from multiple sources", () async {
39 var controller1 = new StreamController<String>();
40 streamGroup.add(controller1.stream);
41 controller1.addError("first");
42 controller1.close();
43
44 var controller2 = new StreamController<String>();
45 streamGroup.add(controller2.stream);
46 controller2.addError("second");
47 controller2.close();
48
49 await flushMicrotasks();
50
51 expect(streamGroup.close(), completes);
52
53 var transformed = streamGroup.stream.transform(
54 new StreamTransformer.fromHandlers(
55 handleError: (error, _, sink) => sink.add("error: $error")));
56 expect(transformed.toList(),
57 completion(equals(["error: first", "error: second"])));
58 });
59
60 test("buffers events and errors together", () async {
61 var controller = new StreamController<String>();
62 streamGroup.add(controller.stream);
63
64 controller.add("first");
65 controller.addError("second");
66 controller.add("third");
67 controller.addError("fourth");
68 controller.addError("fifth");
69 controller.add("sixth");
70 controller.close();
71
72 await flushMicrotasks();
73
74 expect(streamGroup.close(), completes);
75
76 var transformed = streamGroup.stream.transform(
77 new StreamTransformer.fromHandlers(
78 handleData: (data, sink) => sink.add("data: $data"),
79 handleError: (error, _, sink) => sink.add("error: $error")));
80 expect(transformed.toList(), completion(equals([
81 "data: first",
82 "error: second",
83 "data: third",
84 "error: fourth",
85 "error: fifth",
86 "data: sixth"
87 ])));
88 });
89
90 test("emits events once there's a listener", () {
91 var controller = new StreamController<String>();
92 streamGroup.add(controller.stream);
93
94 expect(streamGroup.stream.toList(),
95 completion(equals(["first", "second"])));
96
97 controller.add("first");
98 controller.add("second");
99 controller.close();
100
101 expect(streamGroup.close(), completes);
102 });
103
104 test("doesn't buffer events from a broadcast stream", () async {
105 var controller = new StreamController<String>.broadcast();
106 streamGroup.add(controller.stream);
107
108 controller.add("first");
109 controller.add("second");
110 controller.close();
111
112 await flushMicrotasks();
113
114 expect(streamGroup.close(), completes);
115 expect(streamGroup.stream.toList(), completion(isEmpty));
116 });
117
118 test("when paused, buffers events from a broadcast stream", () async {
119 var controller = new StreamController<String>.broadcast();
120 streamGroup.add(controller.stream);
121
122 var events = [];
123 var subscription = streamGroup.stream.listen(events.add);
124 subscription.pause();
125
126 controller.add("first");
127 controller.add("second");
128 controller.close();
129 await flushMicrotasks();
130
131 subscription.resume();
132 expect(streamGroup.close(), completes);
133 await flushMicrotasks();
134
135 expect(events, equals(["first", "second"]));
136 });
137
138 test("emits events from a broadcast stream once there's a listener", () {
139 var controller = new StreamController<String>.broadcast();
140 streamGroup.add(controller.stream);
141
142 expect(streamGroup.stream.toList(),
143 completion(equals(["first", "second"])));
144
145 controller.add("first");
146 controller.add("second");
147 controller.close();
148
149 expect(streamGroup.close(), completes);
150 });
151
152 test("forwards cancel errors", () async {
153 var subscription = streamGroup.stream.listen(null);
154
155 var controller = new StreamController<String>(
156 onCancel: () => throw "error");
157 streamGroup.add(controller.stream);
158 await flushMicrotasks();
159
160 expect(subscription.cancel(), throwsA("error"));
161 });
162
163 test("forwards a cancel future", () async {
164 var subscription = streamGroup.stream.listen(null);
165
166 var completer = new Completer();
167 var controller = new StreamController<String>(
168 onCancel: () => completer.future);
169 streamGroup.add(controller.stream);
170 await flushMicrotasks();
171
172 var fired = false;
173 subscription.cancel().then((_) => fired = true);
174
175 await flushMicrotasks();
176 expect(fired, isFalse);
177
178 completer.complete();
179 await flushMicrotasks();
180 expect(fired, isTrue);
181 });
182
183 test("add() while active pauses the stream if the group is paused, then "
184 "resumes once the group resumes", () async {
185 var subscription = streamGroup.stream.listen(null);
186 await flushMicrotasks();
187
188 var paused = false;
189 var controller = new StreamController<String>(
190 onPause: () => paused = true,
191 onResume: () => paused = false);
192
193 subscription.pause();
194 await flushMicrotasks();
195
196 streamGroup.add(controller.stream);
197 await flushMicrotasks();
198 expect(paused, isTrue);
199
200 subscription.resume();
201 await flushMicrotasks();
202 expect(paused, isFalse);
203 });
204
205 group("add() while canceled", () {
206 setUp(() async {
207 streamGroup.stream.listen(null).cancel();
208 await flushMicrotasks();
209 });
210
211 test("immediately listens to and cancels the stream", () async {
212 var listened = false;
213 var canceled = false;
214 var controller = new StreamController<String>(onListen: () {
215 listened = true;
216 }, onCancel: expectAsync(() {
217 expect(listened, isTrue);
218 canceled = true;
219 }));
220
221 streamGroup.add(controller.stream);
222 await flushMicrotasks();
223 expect(listened, isTrue);
224 expect(canceled, isTrue);
225 });
226
227 test("forwards cancel errors", () {
228 var controller = new StreamController<String>(
229 onCancel: () => throw "error");
230
231 expect(streamGroup.add(controller.stream), throwsA("error"));
232 });
233
234 test("forwards a cancel future", () async {
235 var completer = new Completer();
236 var controller = new StreamController<String>(
237 onCancel: () => completer.future);
238
239 var fired = false;
240 streamGroup.add(controller.stream).then((_) => fired = true);
241
242 await flushMicrotasks();
243 expect(fired, isFalse);
244
245 completer.complete();
246 await flushMicrotasks();
247 expect(fired, isTrue);
248 });
249 });
250 });
251
252 group("broadcast", () {
253 var streamGroup;
254 setUp(() {
255 streamGroup = new StreamGroup<String>.broadcast();
256 });
257
258 test("buffers events from multiple sources", () async {
259 var controller1 = new StreamController<String>();
260 streamGroup.add(controller1.stream);
261 controller1.add("first");
262 controller1.close();
263
264 var controller2 = new StreamController<String>();
265 streamGroup.add(controller2.stream);
266 controller2.add("second");
267 controller2.close();
268
269 await flushMicrotasks();
270
271 expect(streamGroup.close(), completes);
272
273 expect(streamGroup.stream.toList(),
274 completion(equals(["first", "second"])));
275 });
276
277 test("emits events from multiple sources once there's a listener", () {
278 var controller1 = new StreamController<String>();
279 streamGroup.add(controller1.stream);
280
281 var controller2 = new StreamController<String>();
282 streamGroup.add(controller2.stream);
283
284 expect(streamGroup.stream.toList(),
285 completion(equals(["first", "second"])));
286
287 controller1.add("first");
288 controller2.add("second");
289 controller1.close();
290 controller2.close();
291
292 expect(streamGroup.close(), completes);
293 });
294
295 test("doesn't buffer events once a listener has been added and removed",
296 () async {
297 var controller = new StreamController<String>();
298 streamGroup.add(controller.stream);
299
300 streamGroup.stream.listen(null).cancel();
301 await flushMicrotasks();
302
303 controller.add("first");
304 controller.addError("second");
305 controller.close();
306
307 await flushMicrotasks();
308
309 expect(streamGroup.close(), completes);
310 expect(streamGroup.stream.toList(), completion(isEmpty));
311 });
312
313 test("doesn't buffer events from a broadcast stream", () async {
314 var controller = new StreamController<String>.broadcast();
315 streamGroup.add(controller.stream);
316 controller.add("first");
317 controller.addError("second");
318 controller.close();
319
320 await flushMicrotasks();
321
322 expect(streamGroup.close(), completes);
323 expect(streamGroup.stream.toList(), completion(isEmpty));
324 });
325
326 test("emits events from a broadcast stream once there's a listener", () {
327 var controller = new StreamController<String>.broadcast();
328 streamGroup.add(controller.stream);
329
330 expect(streamGroup.stream.toList(),
331 completion(equals(["first", "second"])));
332
333 controller.add("first");
334 controller.add("second");
335 controller.close();
336
337 expect(streamGroup.close(), completes);
338 });
339
340 test("cancels and re-listens broadcast streams", () async {
341 var subscription = streamGroup.stream.listen(null);
342
343 var controller = new StreamController<String>.broadcast();
344
345 streamGroup.add(controller.stream);
346 await flushMicrotasks();
347 expect(controller.hasListener, isTrue);
348
349 subscription.cancel();
350 await flushMicrotasks();
351 expect(controller.hasListener, isFalse);
352
353 streamGroup.stream.listen(null);
354 await flushMicrotasks();
355 expect(controller.hasListener, isTrue);
356 });
357
358 test("never cancels single-subscription streams", () async {
359 var subscription = streamGroup.stream.listen(null);
360
361 var controller = new StreamController<String>(
362 onCancel: expectAsync(() {}, count: 0));
363
364 streamGroup.add(controller.stream);
365 await flushMicrotasks();
366
367 subscription.cancel();
368 await flushMicrotasks();
369
370 streamGroup.stream.listen(null);
371 await flushMicrotasks();
372 });
373
374 test("drops events from a single-subscription stream while dormant",
375 () async {
376 var events = [];
377 var subscription = streamGroup.stream.listen(events.add);
378
379 var controller = new StreamController<String>();
380 streamGroup.add(controller.stream);
381 await flushMicrotasks();
382
383 controller.add("first");
384 await flushMicrotasks();
385 expect(events, equals(["first"]));
386
387 subscription.cancel();
388 controller.add("second");
389 await flushMicrotasks();
390 expect(events, equals(["first"]));
391
392 streamGroup.stream.listen(events.add);
393 controller.add("third");
394 await flushMicrotasks();
395 expect(events, equals(["first", "third"]));
396 });
397
398 test("a single-subscription stream can be removed while dormant", () async {
399 var controller = new StreamController<String>();
400 streamGroup.add(controller.stream);
401 await flushMicrotasks();
402
403 streamGroup.stream.listen(null).cancel();
404 await flushMicrotasks();
405
406 streamGroup.remove(controller.stream);
407 expect(controller.hasListener, isFalse);
408 await flushMicrotasks();
409
410 expect(streamGroup.stream.toList(), completion(isEmpty));
411 controller.add("first");
412 expect(streamGroup.close(), completes);
413 });
414 });
415
416 group("regardless of type", () {
417 group("single-subscription", () {
418 regardlessOfType(() => new StreamGroup<String>());
419 });
420
421 group("broadcast", () {
422 regardlessOfType(() => new StreamGroup<String>.broadcast());
423 });
424 });
425
426 test("merge() emits events from all components streams", () {
427 var controller1 = new StreamController<String>();
428 var controller2 = new StreamController<String>();
429
430 var merged = StreamGroup.merge([controller1.stream, controller2.stream]);
431
432 controller1.add("first");
433 controller1.close();
434 controller2.add("second");
435 controller2.close();
436
437 expect(merged.toList(), completion(unorderedEquals(["first", "second"])));
438 });
439 }
440
441 void regardlessOfType(StreamGroup<String> newStreamGroup()) {
442 var streamGroup;
443 setUp(() {
444 streamGroup = newStreamGroup();
445 });
446
447 group("add()", () {
448 group("while dormant", () {
449 test("doesn't listen to the stream until the group is listened to",
450 () async {
451 var controller = new StreamController<String>();
452
453 expect(streamGroup.add(controller.stream), isNull);
454 await flushMicrotasks();
455 expect(controller.hasListener, isFalse);
456
457 streamGroup.stream.listen(null);
458 await flushMicrotasks();
459 expect(controller.hasListener, isTrue);
460 });
461
462 test("is a no-op if the stream is already in the group", () {
463 var controller = new StreamController<String>();
464 streamGroup.add(controller.stream);
465 streamGroup.add(controller.stream);
466 streamGroup.add(controller.stream);
467
468 // If the stream was actually listened to multiple times, this would
469 // throw a StateError.
470 streamGroup.stream.listen(null);
471 });
472 });
473
474 group("while active", () {
475 var subscription;
476 setUp(() async {
477 subscription = streamGroup.stream.listen(null);
478 await flushMicrotasks();
479 });
480
481 test("listens to the stream immediately", () async {
482 var controller = new StreamController<String>();
483
484 expect(streamGroup.add(controller.stream), isNull);
485 await flushMicrotasks();
486 expect(controller.hasListener, isTrue);
487 });
488
489 test("is a no-op if the stream is already in the group", () async {
490 var controller = new StreamController<String>();
491
492 // If the stream were actually listened to more than once, future
493 // calls to [add] would throw [StateError]s.
494 streamGroup.add(controller.stream);
495 streamGroup.add(controller.stream);
496 streamGroup.add(controller.stream);
497 });
498 });
499 });
500
501 group("remove()", () {
502 group("while dormant", () {
503 test("stops emitting events for a stream that's removed", () async {
504 var controller = new StreamController<String>();
505 streamGroup.add(controller.stream);
506
507 expect(streamGroup.stream.toList(), completion(equals(["first"])));
508
509 controller.add("first");
510 await flushMicrotasks();
511 controller.add("second");
512
513 expect(streamGroup.remove(controller.stream), isNull);
514 expect(streamGroup.close(), completes);
515 });
516
517 test("is a no-op for an unknown stream", () {
518 var controller = new StreamController<String>();
519 expect(streamGroup.remove(controller.stream), isNull);
520 });
521
522 test("and closed closes the group when the last stream is removed",
523 () async {
524 var controller1 = new StreamController<String>();
525 var controller2 = new StreamController<String>();
526
527 streamGroup.add(controller1.stream);
528 streamGroup.add(controller2.stream);
529 await flushMicrotasks();
530
531 streamGroup.close();
532
533 streamGroup.remove(controller1.stream);
534 await flushMicrotasks();
535
536 streamGroup.remove(controller2.stream);
537 await flushMicrotasks();
538
539 expect(streamGroup.stream.toList(), completion(isEmpty));
540 });
541 });
542
543 group("while listening", () {
544 test("doesn't emit events from a removed stream", () {
545 var controller = new StreamController<String>();
546 streamGroup.add(controller.stream);
547
548 // The subscription to [controller.stream] is canceled synchronously, so
549 // the first event is dropped even though it was added before the
550 // removal. This is documented in [StreamGroup.remove].
551 expect(streamGroup.stream.toList(), completion(isEmpty));
552
553 controller.add("first");
554 expect(streamGroup.remove(controller.stream), isNull);
555 controller.add("second");
556
557 expect(streamGroup.close(), completes);
558 });
559
560 test("cancels the stream's subscription", () async {
561 var controller = new StreamController<String>();
562 streamGroup.add(controller.stream);
563
564 streamGroup.stream.listen(null);
565 await flushMicrotasks();
566 expect(controller.hasListener, isTrue);
567
568 streamGroup.remove(controller.stream);
569 await flushMicrotasks();
570 expect(controller.hasListener, isFalse);
571 });
572
573 test("forwards cancel errors", () async {
574 var controller = new StreamController<String>(
575 onCancel: () => throw "error");
576 streamGroup.add(controller.stream);
577
578 streamGroup.stream.listen(null);
579 await flushMicrotasks();
580
581 expect(streamGroup.remove(controller.stream), throwsA("error"));
582 });
583
584 test("forwards cancel futures", () async {
585 var completer = new Completer();
586 var controller = new StreamController<String>(
587 onCancel: () => completer.future);
588
589 streamGroup.stream.listen(null);
590 await flushMicrotasks();
591
592 streamGroup.add(controller.stream);
593 await flushMicrotasks();
594
595 var fired = false;
596 streamGroup.remove(controller.stream).then((_) => fired = true);
597
598 await flushMicrotasks();
599 expect(fired, isFalse);
600
601 completer.complete();
602 await flushMicrotasks();
603 expect(fired, isTrue);
604 });
605
606 test("is a no-op for an unknown stream", () async {
607 var controller = new StreamController<String>();
608 streamGroup.stream.listen(null);
609 await flushMicrotasks();
610
611 expect(streamGroup.remove(controller.stream), isNull);
612 });
613
614 test("and closed closes the group when the last stream is removed",
615 () async {
616 var done = false;
617 streamGroup.stream.listen(null, onDone: () => done = true);
618 await flushMicrotasks();
619
620 var controller1 = new StreamController<String>();
621 var controller2 = new StreamController<String>();
622
623 streamGroup.add(controller1.stream);
624 streamGroup.add(controller2.stream);
625 await flushMicrotasks();
626
627 streamGroup.close();
628
629 streamGroup.remove(controller1.stream);
630 await flushMicrotasks();
631 expect(done, isFalse);
632
633 streamGroup.remove(controller2.stream);
634 await flushMicrotasks();
635 expect(done, isTrue);
636 });
637 });
638 });
639
640 group("close()", () {
641 group("while dormant", () {
642 test("if there are no streams, closes the group", () {
643 expect(streamGroup.close(), completes);
644 expect(streamGroup.stream.toList(), completion(isEmpty));
645 });
646
647 test("if there are streams, closes the group once those streams close "
648 "and there's a listener", () async {
649 var controller1 = new StreamController<String>();
650 var controller2 = new StreamController<String>();
651
652 streamGroup.add(controller1.stream);
653 streamGroup.add(controller2.stream);
654 await flushMicrotasks();
655
656 streamGroup.close();
657
658 controller1.close();
659 controller2.close();
660 expect(streamGroup.stream.toList(), completion(isEmpty));
661 });
662 });
663
664 group("while active", () {
665 test("if there are no streams, closes the group", () {
666 expect(streamGroup.stream.toList(), completion(isEmpty));
667 expect(streamGroup.close(), completes);
668 });
669
670 test("if there are streams, closes the group once those streams close",
671 () async {
672 var done = false;
673 streamGroup.stream.listen(null, onDone: () => done = true);
674 await flushMicrotasks();
675
676 var controller1 = new StreamController<String>();
677 var controller2 = new StreamController<String>();
678
679 streamGroup.add(controller1.stream);
680 streamGroup.add(controller2.stream);
681 await flushMicrotasks();
682
683 streamGroup.close();
684 await flushMicrotasks();
685 expect(done, isFalse);
686
687 controller1.close();
688 await flushMicrotasks();
689 expect(done, isFalse);
690
691 controller2.close();
692 await flushMicrotasks();
693 expect(done, isTrue);
694 });
695 });
696
697 test("returns a Future that completes once all events are dispatched",
698 () async {
699 var events = [];
700 streamGroup.stream.listen(events.add);
701
702 var controller = new StreamController<String>();
703 streamGroup.add(controller.stream);
704 await flushMicrotasks();
705
706 // Add a bunch of events. Each of these will get dispatched in a
707 // separate microtask, so we can test that [close] only completes once
708 // all of them have dispatched.
709 controller.add("one");
710 controller.add("two");
711 controller.add("three");
712 controller.add("four");
713 controller.add("five");
714 controller.add("six");
715 controller.close();
716
717 await streamGroup.close();
718 expect(events, equals(["one", "two", "three", "four", "five", "six"]));
719 });
720 });
721 }
722
723 /// Wait for all microtasks to complete.
724 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
OLDNEW
« no previous file with comments | « lib/src/stream_group.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698