Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(24)

Side by Side Diff: packages/async/test/stream_group_test.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.test.stream_group_test;
6
7 import 'dart:async';
8
9 import 'package:async/async.dart';
10 import 'package:test/test.dart';
11
12 main() {
13 group("single-subscription", () {
14 var streamGroup;
15 setUp(() {
16 streamGroup = new StreamGroup<String>();
17 });
18
19 test("buffers events from multiple sources", () async {
20 var controller1 = new StreamController<String>();
21 streamGroup.add(controller1.stream);
22 controller1.add("first");
23 controller1.close();
24
25 var controller2 = new StreamController<String>();
26 streamGroup.add(controller2.stream);
27 controller2.add("second");
28 controller2.close();
29
30 await flushMicrotasks();
31
32 expect(streamGroup.close(), completes);
33
34 expect(streamGroup.stream.toList(),
35 completion(unorderedEquals(["first", "second"])));
36 });
37
38 test("buffers errors from multiple sources", () async {
39 var controller1 = new StreamController<String>();
40 streamGroup.add(controller1.stream);
41 controller1.addError("first");
42 controller1.close();
43
44 var controller2 = new StreamController<String>();
45 streamGroup.add(controller2.stream);
46 controller2.addError("second");
47 controller2.close();
48
49 await flushMicrotasks();
50
51 expect(streamGroup.close(), completes);
52
53 var transformed = streamGroup.stream.transform(
54 new StreamTransformer.fromHandlers(
55 handleError: (error, _, sink) => sink.add("error: $error")));
56 expect(transformed.toList(),
57 completion(equals(["error: first", "error: second"])));
58 });
59
60 test("buffers events and errors together", () async {
61 var controller = new StreamController<String>();
62 streamGroup.add(controller.stream);
63
64 controller.add("first");
65 controller.addError("second");
66 controller.add("third");
67 controller.addError("fourth");
68 controller.addError("fifth");
69 controller.add("sixth");
70 controller.close();
71
72 await flushMicrotasks();
73
74 expect(streamGroup.close(), completes);
75
76 var transformed = streamGroup.stream.transform(
77 new StreamTransformer.fromHandlers(
78 handleData: (data, sink) => sink.add("data: $data"),
79 handleError: (error, _, sink) => sink.add("error: $error")));
80 expect(transformed.toList(), completion(equals([
81 "data: first",
82 "error: second",
83 "data: third",
84 "error: fourth",
85 "error: fifth",
86 "data: sixth"
87 ])));
88 });
89
90 test("emits events once there's a listener", () {
91 var controller = new StreamController<String>();
92 streamGroup.add(controller.stream);
93
94 expect(streamGroup.stream.toList(),
95 completion(equals(["first", "second"])));
96
97 controller.add("first");
98 controller.add("second");
99 controller.close();
100
101 expect(streamGroup.close(), completes);
102 });
103
104 test("doesn't buffer events from a broadcast stream", () async {
105 var controller = new StreamController<String>.broadcast();
106 streamGroup.add(controller.stream);
107
108 controller.add("first");
109 controller.add("second");
110 controller.close();
111
112 await flushMicrotasks();
113
114 expect(streamGroup.close(), completes);
115 expect(streamGroup.stream.toList(), completion(isEmpty));
116 });
117
118 test("when paused, buffers events from a broadcast stream", () async {
119 var controller = new StreamController<String>.broadcast();
120 streamGroup.add(controller.stream);
121
122 var events = [];
123 var subscription = streamGroup.stream.listen(events.add);
124 subscription.pause();
125
126 controller.add("first");
127 controller.add("second");
128 controller.close();
129 await flushMicrotasks();
130
131 subscription.resume();
132 expect(streamGroup.close(), completes);
133 await flushMicrotasks();
134
135 expect(events, equals(["first", "second"]));
136 });
137
138 test("emits events from a broadcast stream once there's a listener", () {
139 var controller = new StreamController<String>.broadcast();
140 streamGroup.add(controller.stream);
141
142 expect(streamGroup.stream.toList(),
143 completion(equals(["first", "second"])));
144
145 controller.add("first");
146 controller.add("second");
147 controller.close();
148
149 expect(streamGroup.close(), completes);
150 });
151
152 test("forwards cancel errors", () async {
153 var subscription = streamGroup.stream.listen(null);
154
155 var controller = new StreamController<String>(
156 onCancel: () => throw "error");
157 streamGroup.add(controller.stream);
158 await flushMicrotasks();
159
160 expect(subscription.cancel(), throwsA("error"));
161 });
162
163 test("forwards a cancel future", () async {
164 var subscription = streamGroup.stream.listen(null);
165
166 var completer = new Completer();
167 var controller = new StreamController<String>(
168 onCancel: () => completer.future);
169 streamGroup.add(controller.stream);
170 await flushMicrotasks();
171
172 var fired = false;
173 subscription.cancel().then((_) => fired = true);
174
175 await flushMicrotasks();
176 expect(fired, isFalse);
177
178 completer.complete();
179 await flushMicrotasks();
180 expect(fired, isTrue);
181 });
182
183 test("add() while active pauses the stream if the group is paused, then "
184 "resumes once the group resumes", () async {
185 var subscription = streamGroup.stream.listen(null);
186 await flushMicrotasks();
187
188 var paused = false;
189 var controller = new StreamController<String>(
190 onPause: () => paused = true,
191 onResume: () => paused = false);
192
193 subscription.pause();
194 await flushMicrotasks();
195
196 streamGroup.add(controller.stream);
197 await flushMicrotasks();
198 expect(paused, isTrue);
199
200 subscription.resume();
201 await flushMicrotasks();
202 expect(paused, isFalse);
203 });
204
205 group("add() while canceled", () {
206 setUp(() async {
207 streamGroup.stream.listen(null).cancel();
208 await flushMicrotasks();
209 });
210
211 test("immediately listens to and cancels the stream", () async {
212 var listened = false;
213 var canceled = false;
214 var controller = new StreamController<String>(onListen: () {
215 listened = true;
216 }, onCancel: expectAsync(() {
217 expect(listened, isTrue);
218 canceled = true;
219 }));
220
221 streamGroup.add(controller.stream);
222 await flushMicrotasks();
223 expect(listened, isTrue);
224 expect(canceled, isTrue);
225 });
226
227 test("forwards cancel errors", () {
228 var controller = new StreamController<String>(
229 onCancel: () => throw "error");
230
231 expect(streamGroup.add(controller.stream), throwsA("error"));
232 });
233
234 test("forwards a cancel future", () async {
235 var completer = new Completer();
236 var controller = new StreamController<String>(
237 onCancel: () => completer.future);
238
239 var fired = false;
240 streamGroup.add(controller.stream).then((_) => fired = true);
241
242 await flushMicrotasks();
243 expect(fired, isFalse);
244
245 completer.complete();
246 await flushMicrotasks();
247 expect(fired, isTrue);
248 });
249 });
250 });
251
252 group("broadcast", () {
253 var streamGroup;
254 setUp(() {
255 streamGroup = new StreamGroup<String>.broadcast();
256 });
257
258 test("buffers events from multiple sources", () async {
259 var controller1 = new StreamController<String>();
260 streamGroup.add(controller1.stream);
261 controller1.add("first");
262 controller1.close();
263
264 var controller2 = new StreamController<String>();
265 streamGroup.add(controller2.stream);
266 controller2.add("second");
267 controller2.close();
268
269 await flushMicrotasks();
270
271 expect(streamGroup.close(), completes);
272
273 expect(streamGroup.stream.toList(),
274 completion(equals(["first", "second"])));
275 });
276
277 test("emits events from multiple sources once there's a listener", () {
278 var controller1 = new StreamController<String>();
279 streamGroup.add(controller1.stream);
280
281 var controller2 = new StreamController<String>();
282 streamGroup.add(controller2.stream);
283
284 expect(streamGroup.stream.toList(),
285 completion(equals(["first", "second"])));
286
287 controller1.add("first");
288 controller2.add("second");
289 controller1.close();
290 controller2.close();
291
292 expect(streamGroup.close(), completes);
293 });
294
295 test("doesn't buffer events once a listener has been added and removed",
296 () async {
297 var controller = new StreamController<String>();
298 streamGroup.add(controller.stream);
299
300 streamGroup.stream.listen(null).cancel();
301 await flushMicrotasks();
302
303 controller.add("first");
304 controller.addError("second");
305 controller.close();
306
307 await flushMicrotasks();
308
309 expect(streamGroup.close(), completes);
310 expect(streamGroup.stream.toList(), completion(isEmpty));
311 });
312
313 test("doesn't buffer events from a broadcast stream", () async {
314 var controller = new StreamController<String>.broadcast();
315 streamGroup.add(controller.stream);
316 controller.add("first");
317 controller.addError("second");
318 controller.close();
319
320 await flushMicrotasks();
321
322 expect(streamGroup.close(), completes);
323 expect(streamGroup.stream.toList(), completion(isEmpty));
324 });
325
326 test("emits events from a broadcast stream once there's a listener", () {
327 var controller = new StreamController<String>.broadcast();
328 streamGroup.add(controller.stream);
329
330 expect(streamGroup.stream.toList(),
331 completion(equals(["first", "second"])));
332
333 controller.add("first");
334 controller.add("second");
335 controller.close();
336
337 expect(streamGroup.close(), completes);
338 });
339
340 test("cancels and re-listens broadcast streams", () async {
341 var subscription = streamGroup.stream.listen(null);
342
343 var controller = new StreamController<String>.broadcast();
344
345 streamGroup.add(controller.stream);
346 await flushMicrotasks();
347 expect(controller.hasListener, isTrue);
348
349 subscription.cancel();
350 await flushMicrotasks();
351 expect(controller.hasListener, isFalse);
352
353 streamGroup.stream.listen(null);
354 await flushMicrotasks();
355 expect(controller.hasListener, isTrue);
356 });
357
358 test("never cancels single-subscription streams", () async {
359 var subscription = streamGroup.stream.listen(null);
360
361 var controller = new StreamController<String>(
362 onCancel: expectAsync(() {}, count: 0));
363
364 streamGroup.add(controller.stream);
365 await flushMicrotasks();
366
367 subscription.cancel();
368 await flushMicrotasks();
369
370 streamGroup.stream.listen(null);
371 await flushMicrotasks();
372 });
373
374 test("drops events from a single-subscription stream while dormant",
375 () async {
376 var events = [];
377 var subscription = streamGroup.stream.listen(events.add);
378
379 var controller = new StreamController<String>();
380 streamGroup.add(controller.stream);
381 await flushMicrotasks();
382
383 controller.add("first");
384 await flushMicrotasks();
385 expect(events, equals(["first"]));
386
387 subscription.cancel();
388 controller.add("second");
389 await flushMicrotasks();
390 expect(events, equals(["first"]));
391
392 streamGroup.stream.listen(events.add);
393 controller.add("third");
394 await flushMicrotasks();
395 expect(events, equals(["first", "third"]));
396 });
397
398 test("a single-subscription stream can be removed while dormant", () async {
399 var controller = new StreamController<String>();
400 streamGroup.add(controller.stream);
401 await flushMicrotasks();
402
403 streamGroup.stream.listen(null).cancel();
404 await flushMicrotasks();
405
406 streamGroup.remove(controller.stream);
407 expect(controller.hasListener, isFalse);
408 await flushMicrotasks();
409
410 expect(streamGroup.stream.toList(), completion(isEmpty));
411 controller.add("first");
412 expect(streamGroup.close(), completes);
413 });
414 });
415
416 group("regardless of type", () {
417 group("single-subscription", () {
418 regardlessOfType(() => new StreamGroup<String>());
419 });
420
421 group("broadcast", () {
422 regardlessOfType(() => new StreamGroup<String>.broadcast());
423 });
424 });
425
426 test("merge() emits events from all components streams", () {
427 var controller1 = new StreamController<String>();
428 var controller2 = new StreamController<String>();
429
430 var merged = StreamGroup.merge([controller1.stream, controller2.stream]);
431
432 controller1.add("first");
433 controller1.close();
434 controller2.add("second");
435 controller2.close();
436
437 expect(merged.toList(), completion(unorderedEquals(["first", "second"])));
438 });
439 }
440
441 void regardlessOfType(StreamGroup<String> newStreamGroup()) {
442 var streamGroup;
443 setUp(() {
444 streamGroup = newStreamGroup();
445 });
446
447 group("add()", () {
448 group("while dormant", () {
449 test("doesn't listen to the stream until the group is listened to",
450 () async {
451 var controller = new StreamController<String>();
452
453 expect(streamGroup.add(controller.stream), isNull);
454 await flushMicrotasks();
455 expect(controller.hasListener, isFalse);
456
457 streamGroup.stream.listen(null);
458 await flushMicrotasks();
459 expect(controller.hasListener, isTrue);
460 });
461
462 test("is a no-op if the stream is already in the group", () {
463 var controller = new StreamController<String>();
464 streamGroup.add(controller.stream);
465 streamGroup.add(controller.stream);
466 streamGroup.add(controller.stream);
467
468 // If the stream was actually listened to multiple times, this would
469 // throw a StateError.
470 streamGroup.stream.listen(null);
471 });
472 });
473
474 group("while active", () {
475 setUp(() async {
476 streamGroup.stream.listen(null);
477 await flushMicrotasks();
478 });
479
480 test("listens to the stream immediately", () async {
481 var controller = new StreamController<String>();
482
483 expect(streamGroup.add(controller.stream), isNull);
484 await flushMicrotasks();
485 expect(controller.hasListener, isTrue);
486 });
487
488 test("is a no-op if the stream is already in the group", () async {
489 var controller = new StreamController<String>();
490
491 // If the stream were actually listened to more than once, future
492 // calls to [add] would throw [StateError]s.
493 streamGroup.add(controller.stream);
494 streamGroup.add(controller.stream);
495 streamGroup.add(controller.stream);
496 });
497 });
498 });
499
500 group("remove()", () {
501 group("while dormant", () {
502 test("stops emitting events for a stream that's removed", () async {
503 var controller = new StreamController<String>();
504 streamGroup.add(controller.stream);
505
506 expect(streamGroup.stream.toList(), completion(equals(["first"])));
507
508 controller.add("first");
509 await flushMicrotasks();
510 controller.add("second");
511
512 expect(streamGroup.remove(controller.stream), isNull);
513 expect(streamGroup.close(), completes);
514 });
515
516 test("is a no-op for an unknown stream", () {
517 var controller = new StreamController<String>();
518 expect(streamGroup.remove(controller.stream), isNull);
519 });
520
521 test("and closed closes the group when the last stream is removed",
522 () async {
523 var controller1 = new StreamController<String>();
524 var controller2 = new StreamController<String>();
525
526 streamGroup.add(controller1.stream);
527 streamGroup.add(controller2.stream);
528 await flushMicrotasks();
529
530 streamGroup.close();
531
532 streamGroup.remove(controller1.stream);
533 await flushMicrotasks();
534
535 streamGroup.remove(controller2.stream);
536 await flushMicrotasks();
537
538 expect(streamGroup.stream.toList(), completion(isEmpty));
539 });
540 });
541
542 group("while listening", () {
543 test("doesn't emit events from a removed stream", () {
544 var controller = new StreamController<String>();
545 streamGroup.add(controller.stream);
546
547 // The subscription to [controller.stream] is canceled synchronously, so
548 // the first event is dropped even though it was added before the
549 // removal. This is documented in [StreamGroup.remove].
550 expect(streamGroup.stream.toList(), completion(isEmpty));
551
552 controller.add("first");
553 expect(streamGroup.remove(controller.stream), isNull);
554 controller.add("second");
555
556 expect(streamGroup.close(), completes);
557 });
558
559 test("cancels the stream's subscription", () async {
560 var controller = new StreamController<String>();
561 streamGroup.add(controller.stream);
562
563 streamGroup.stream.listen(null);
564 await flushMicrotasks();
565 expect(controller.hasListener, isTrue);
566
567 streamGroup.remove(controller.stream);
568 await flushMicrotasks();
569 expect(controller.hasListener, isFalse);
570 });
571
572 test("forwards cancel errors", () async {
573 var controller = new StreamController<String>(
574 onCancel: () => throw "error");
575 streamGroup.add(controller.stream);
576
577 streamGroup.stream.listen(null);
578 await flushMicrotasks();
579
580 expect(streamGroup.remove(controller.stream), throwsA("error"));
581 });
582
583 test("forwards cancel futures", () async {
584 var completer = new Completer();
585 var controller = new StreamController<String>(
586 onCancel: () => completer.future);
587
588 streamGroup.stream.listen(null);
589 await flushMicrotasks();
590
591 streamGroup.add(controller.stream);
592 await flushMicrotasks();
593
594 var fired = false;
595 streamGroup.remove(controller.stream).then((_) => fired = true);
596
597 await flushMicrotasks();
598 expect(fired, isFalse);
599
600 completer.complete();
601 await flushMicrotasks();
602 expect(fired, isTrue);
603 });
604
605 test("is a no-op for an unknown stream", () async {
606 var controller = new StreamController<String>();
607 streamGroup.stream.listen(null);
608 await flushMicrotasks();
609
610 expect(streamGroup.remove(controller.stream), isNull);
611 });
612
613 test("and closed closes the group when the last stream is removed",
614 () async {
615 var done = false;
616 streamGroup.stream.listen(null, onDone: () => done = true);
617 await flushMicrotasks();
618
619 var controller1 = new StreamController<String>();
620 var controller2 = new StreamController<String>();
621
622 streamGroup.add(controller1.stream);
623 streamGroup.add(controller2.stream);
624 await flushMicrotasks();
625
626 streamGroup.close();
627
628 streamGroup.remove(controller1.stream);
629 await flushMicrotasks();
630 expect(done, isFalse);
631
632 streamGroup.remove(controller2.stream);
633 await flushMicrotasks();
634 expect(done, isTrue);
635 });
636 });
637 });
638
639 group("close()", () {
640 group("while dormant", () {
641 test("if there are no streams, closes the group", () {
642 expect(streamGroup.close(), completes);
643 expect(streamGroup.stream.toList(), completion(isEmpty));
644 });
645
646 test("if there are streams, closes the group once those streams close "
647 "and there's a listener", () async {
648 var controller1 = new StreamController<String>();
649 var controller2 = new StreamController<String>();
650
651 streamGroup.add(controller1.stream);
652 streamGroup.add(controller2.stream);
653 await flushMicrotasks();
654
655 streamGroup.close();
656
657 controller1.close();
658 controller2.close();
659 expect(streamGroup.stream.toList(), completion(isEmpty));
660 });
661 });
662
663 group("while active", () {
664 test("if there are no streams, closes the group", () {
665 expect(streamGroup.stream.toList(), completion(isEmpty));
666 expect(streamGroup.close(), completes);
667 });
668
669 test("if there are streams, closes the group once those streams close",
670 () async {
671 var done = false;
672 streamGroup.stream.listen(null, onDone: () => done = true);
673 await flushMicrotasks();
674
675 var controller1 = new StreamController<String>();
676 var controller2 = new StreamController<String>();
677
678 streamGroup.add(controller1.stream);
679 streamGroup.add(controller2.stream);
680 await flushMicrotasks();
681
682 streamGroup.close();
683 await flushMicrotasks();
684 expect(done, isFalse);
685
686 controller1.close();
687 await flushMicrotasks();
688 expect(done, isFalse);
689
690 controller2.close();
691 await flushMicrotasks();
692 expect(done, isTrue);
693 });
694 });
695
696 test("returns a Future that completes once all events are dispatched",
697 () async {
698 var events = [];
699 streamGroup.stream.listen(events.add);
700
701 var controller = new StreamController<String>();
702 streamGroup.add(controller.stream);
703 await flushMicrotasks();
704
705 // Add a bunch of events. Each of these will get dispatched in a
706 // separate microtask, so we can test that [close] only completes once
707 // all of them have dispatched.
708 controller.add("one");
709 controller.add("two");
710 controller.add("three");
711 controller.add("four");
712 controller.add("five");
713 controller.add("six");
714 controller.close();
715
716 await streamGroup.close();
717 expect(events, equals(["one", "two", "three", "four", "five", "six"]));
718 });
719 });
720 }
721
722 /// Wait for all microtasks to complete.
723 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
OLDNEW
« no previous file with comments | « packages/async/test/stream_completer_test.dart ('k') | packages/async/test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698