Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: test/stream_group_test.dart

Issue 1178793006: Add a StreamGroup class for merging streams. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« lib/src/stream_group.dart ('K') | « lib/src/stream_group.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 library async.test.stream_group_test;
6
7 import 'dart:async';
8
9 import 'package:async/async.dart';
10 import 'package:test/test.dart';
11
12 main() {
13 group("single-subscription", () {
14 var streamGroup;
15 setUp(() {
16 streamGroup = new StreamGroup<String>();
17 });
18
19 test("buffers events from multiple sources", () async {
20 var controller1 = new StreamController<String>();
21 streamGroup.add(controller1.stream);
22 controller1.add("first");
23 controller1.close();
24
25 var controller2 = new StreamController<String>();
26 streamGroup.add(controller2.stream);
27 controller2.add("second");
28 controller2.close();
29
30 await new Future.delayed(Duration.ZERO);
Lasse Reichstein Nielsen 2015/06/18 10:29:32 I like having a helper function: flushMicrotasks
nweiz 2015/06/19 00:44:18 Done.
31
32 expect(streamGroup.close(), completes);
33
34 expect(streamGroup.stream.toList(),
35 completion(equals(["first", "second"])));
Lasse Reichstein Nielsen 2015/06/18 10:29:32 This expects the events in a particular order - wh
nweiz 2015/06/19 00:44:17 Done.
36 });
37
38 test("buffers errors from multiple sources", () async {
39 var controller1 = new StreamController<String>();
40 streamGroup.add(controller1.stream);
41 controller1.addError("first");
42 controller1.close();
43
44 var controller2 = new StreamController<String>();
45 streamGroup.add(controller2.stream);
46 controller2.addError("second");
47 controller2.close();
48
49 await new Future.delayed(Duration.ZERO);
50
51 expect(streamGroup.close(), completes);
52
53 var transformed = streamGroup.stream.transform(
54 new StreamTransformer.fromHandlers(
55 handleError: (error, _, sink) => sink.add("error: $error")));
56 expect(transformed.toList(),
57 completion(equals(["error: first", "error: second"])));
58 });
59
60 test("buffers events and errors together", () async {
61 var controller = new StreamController<String>();
62 streamGroup.add(controller.stream);
63
64 controller.add("first");
65 controller.addError("second");
66 controller.add("third");
67 controller.addError("fourth");
68 controller.addError("fifth");
69 controller.add("sixth");
70 controller.close();
71
72 await new Future.delayed(Duration.ZERO);
73
74 expect(streamGroup.close(), completes);
75
76 var transformed = streamGroup.stream.transform(
77 new StreamTransformer.fromHandlers(
78 handleData: (data, sink) => sink.add("data: $data"),
79 handleError: (error, _, sink) => sink.add("error: $error")));
80 expect(transformed.toList(), completion(equals([
81 "data: first",
82 "error: second",
83 "data: third",
84 "error: fourth",
85 "error: fifth",
86 "data: sixth"
87 ])));
88 });
89
90 test("emits events once there's a listener", () {
91 var controller = new StreamController<String>();
92 streamGroup.add(controller.stream);
93
94 expect(streamGroup.stream.toList(),
95 completion(equals(["first", "second"])));
96
97 controller.add("first");
98 controller.add("second");
99 controller.close();
100
101 expect(streamGroup.close(), completes);
102 });
103
104 test("doesn't buffer events from a broadcast stream", () async {
105 var controller = new StreamController<String>.broadcast();
106 streamGroup.add(controller.stream);
107
108 controller.add("first");
109 controller.add("second");
110 controller.close();
111
112 await new Future.delayed(Duration.ZERO);
113
114 expect(streamGroup.close(), completes);
115 expect(streamGroup.stream.toList(), completion(isEmpty));
116 });
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Try this test again after listening to the group a
nweiz 2015/06/19 00:44:18 I've added the test, but the actual behavior is th
117
118 test("emits events from a broadcast stream once there's a listener", () {
119 var controller = new StreamController<String>.broadcast();
120 streamGroup.add(controller.stream);
121
122 expect(streamGroup.stream.toList(),
123 completion(equals(["first", "second"])));
124
125 controller.add("first");
126 controller.add("second");
127 controller.close();
128
129 expect(streamGroup.close(), completes);
130 });
131
132 test("forwards cancel errors", () async {
133 var subscription = streamGroup.stream.listen(null);
134
135 var controller = new StreamController<String>(
136 onCancel: () => throw "error");
137 streamGroup.add(controller.stream);
138 await new Future.delayed(Duration.ZERO);
139
140 expect(subscription.cancel(), throwsA("error"));
141 });
142
143 test("forwards a cancel futures", () async {
Lasse Reichstein Nielsen 2015/06/18 10:29:32 a cancel futures -> a cancel future/cancel futures
nweiz 2015/06/19 00:44:17 Done.
144 var subscription = streamGroup.stream.listen(null);
145
146 var completer = new Completer();
147 var controller = new StreamController<String>(
148 onCancel: () => completer.future);
149 streamGroup.add(controller.stream);
150 await new Future.delayed(Duration.ZERO);
151
152 var fired = false;
153 subscription.cancel().then((_) => fired = true);
154
155 await new Future.delayed(Duration.ZERO);
156 expect(fired, isFalse);
157
158 completer.complete();
159 await new Future.delayed(Duration.ZERO);
160 expect(fired, isTrue);
161 });
162 });
163
164 group("broadcast", () {
165 var streamGroup;
166 setUp(() {
167 streamGroup = new StreamGroup<String>.broadcast();
168 });
169
170 test("buffers events from multiple sources", () async {
171 var controller1 = new StreamController<String>();
172 streamGroup.add(controller1.stream);
173 controller1.add("first");
174 controller1.close();
175
176 var controller2 = new StreamController<String>();
177 streamGroup.add(controller2.stream);
178 controller2.add("second");
179 controller2.close();
180
181 await new Future.delayed(Duration.ZERO);
182
183 expect(streamGroup.close(), completes);
184
185 expect(streamGroup.stream.toList(),
186 completion(equals(["first", "second"])));
187 });
188
189 test("emits events from multiple sources once there's a listener", () {
190 var controller1 = new StreamController<String>();
191 streamGroup.add(controller1.stream);
192
193 var controller2 = new StreamController<String>();
194 streamGroup.add(controller2.stream);
195
196 expect(streamGroup.stream.toList(),
197 completion(equals(["first", "second"])));
198
199 controller1.add("first");
200 controller2.add("second");
201 controller1.close();
202 controller2.close();
203
204 expect(streamGroup.close(), completes);
205 });
206
207 test("doesn't buffer events once a listener has been added and removed",
208 () async {
209 var controller = new StreamController<String>();
210 streamGroup.add(controller.stream);
211
212 streamGroup.stream.listen(null).cancel();
213 await new Future.delayed(Duration.ZERO);
214
215 controller.add("first");
216 controller.addError("second");
217 controller.close();
218
219 await new Future.delayed(Duration.ZERO);
220
221 expect(streamGroup.close(), completes);
222 expect(streamGroup.stream.toList(), completion(isEmpty));
223 });
224
225 test("doesn't buffer events from a broadcast stream", () async {
226 var controller = new StreamController<String>.broadcast();
227 streamGroup.add(controller.stream);
228 controller.add("first");
229 controller.addError("second");
230 controller.close();
231
232 await new Future.delayed(Duration.ZERO);
233
234 expect(streamGroup.close(), completes);
235 expect(streamGroup.stream.toList(), completion(isEmpty));
236 });
237
238 test("emits events from a broadcast stream once there's a listener", () {
239 var controller = new StreamController<String>.broadcast();
240 streamGroup.add(controller.stream);
241
242 expect(streamGroup.stream.toList(),
243 completion(equals(["first", "second"])));
244
245 controller.add("first");
246 controller.add("second");
247 controller.close();
248
249 expect(streamGroup.close(), completes);
250 });
251
252 test("cancels and re-listens broadcast streams", () async {
253 var subscription = streamGroup.stream.listen(null);
254
255 var listened = false;
256 var controller = new StreamController<String>.broadcast(onListen: () {
257 listened = true;
258 }, onCancel: () {
259 listened = false;
260 });
261
262 streamGroup.add(controller.stream);
263 await new Future.delayed(Duration.ZERO);
264 expect(listened, isTrue);
265
266 subscription.cancel();
267 await new Future.delayed(Duration.ZERO);
268 expect(listened, isFalse);
269
270 streamGroup.stream.listen(null);
271 await new Future.delayed(Duration.ZERO);
272 expect(listened, isTrue);
273 });
274
275 test("never cancels single-subscription streams", () async {
276 var subscription = streamGroup.stream.listen(null);
277
278 var controller = new StreamController<String>(
279 onCancel: expectAsync(() {}, count: 0));
280
281 streamGroup.add(controller.stream);
282 await new Future.delayed(Duration.ZERO);
283
284 subscription.cancel();
285 await new Future.delayed(Duration.ZERO);
286
287 streamGroup.stream.listen(null);
288 await new Future.delayed(Duration.ZERO);
289 });
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Add a test where you: add single-sub stream (cre
nweiz 2015/06/19 00:44:17 Done.
290 });
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Also test: create broadcast group add single-s
nweiz 2015/06/19 00:44:18 Done.
291
292 group("regardless of type", () {
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Could these tests be run for both broadcast and si
nweiz 2015/06/19 00:44:17 Done.
293 var streamGroup;
294 setUp(() {
295 streamGroup = new StreamGroup<String>();
296 });
297
298 group("add()", () {
299 group("while dormant", () {
300 test("doesn't listen to the stream until the group is listened to",
301 () async {
302 var listened = false;
303 var controller = new StreamController<String>(
304 onListen: () => listened = true);
305
306 expect(streamGroup.add(controller.stream), isNull);
307 await new Future.delayed(Duration.ZERO);
308 expect(listened, isFalse);
309
310 streamGroup.stream.listen(null);
311 await new Future.delayed(Duration.ZERO);
312 expect(listened, isTrue);
313 });
314
315 test("is a no-op if the stream is already in the group", () {
316 var controller = new StreamController<String>();
317 streamGroup.add(controller.stream);
318 streamGroup.add(controller.stream);
319 streamGroup.add(controller.stream);
320
321 // If the stream was actually listened to multiple times, this would
322 // throw a StateError.
323 streamGroup.stream.listen(null);
324 });
325 });
326
327 group("while active", () {
328 var subscription;
329 setUp(() async {
330 subscription = streamGroup.stream.listen(null);
331 await new Future.delayed(Duration.ZERO);
332 });
333
334 test("listens to the stream immediately", () async {
335 var listened = false;
336 var controller = new StreamController<String>(
337 onListen: () => listened = true);
338
339 expect(streamGroup.add(controller.stream), isNull);
340 await new Future.delayed(Duration.ZERO);
341 expect(listened, isTrue);
342 });
343
344 test("pauses the stream if the group is paused, then resumes once the "
345 "group resumes", () async {
346 var paused = false;
347 var controller = new StreamController<String>(
348 onPause: () => paused = true,
349 onResume: () => paused = false);
350
351 subscription.pause();
352 await new Future.delayed(Duration.ZERO);
353
354 streamGroup.add(controller.stream);
355 await new Future.delayed(Duration.ZERO);
356 expect(paused, isTrue);
357
358 subscription.resume();
359 await new Future.delayed(Duration.ZERO);
360 expect(paused, isFalse);
361 });
362
363 test("is a no-op if the stream is already in the group", () async {
364 var controller = new StreamController<String>();
365
366 // If the stream were actually listened to more than once, future
367 // calls to [add] would throw [StateError]s.
368 streamGroup.add(controller.stream);
369 streamGroup.add(controller.stream);
370 streamGroup.add(controller.stream);
371 });
372 });
373
374 group("while canceled", () {
375 setUp(() async {
376 streamGroup.stream.listen(null).cancel();
377 await new Future.delayed(Duration.ZERO);
378 });
379
380 test("immediately listens to and cancels the stream", () async {
381 var listened = false;
382 var canceled = false;
383 var controller = new StreamController<String>(onListen: () {
384 listened = true;
385 }, onCancel: expectAsync(() {
386 expect(listened, isTrue);
387 canceled = true;
388 }));
389
390 streamGroup.add(controller.stream);
391 await new Future.delayed(Duration.ZERO);
392 expect(listened, isTrue);
393 expect(canceled, isTrue);
394 });
395
396 test("forwards cancel errors", () {
397 var controller = new StreamController<String>(
398 onCancel: () => throw "error");
399
400 expect(streamGroup.add(controller.stream), throwsA("error"));
401 });
402
403 test("forwards a cancel futures", () async {
Lasse Reichstein Nielsen 2015/06/18 10:29:32 a .. futures
nweiz 2015/06/19 00:44:17 Done.
404 var completer = new Completer();
405 var controller = new StreamController<String>(
406 onCancel: () => completer.future);
407
408 var fired = false;
409 streamGroup.add(controller.stream).then((_) => fired = true);
410
411 await new Future.delayed(Duration.ZERO);
412 expect(fired, isFalse);
413
414 completer.complete();
415 await new Future.delayed(Duration.ZERO);
416 expect(fired, isTrue);
417 });
418 });
419 });
420
421 group("remove()", () {
422 group("while dormant", () {
423 test("stops emitting events for a stream that's removed", () {
424 var controller = new StreamController<String>();
425 streamGroup.add(controller.stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Check that prior events do get through. So co
426
427 controller.add("first");
428 expect(streamGroup.remove(controller.stream), isNull);
429
430 expect(streamGroup.close(), completes);
431 expect(streamGroup.stream.toList(), completion(equals(isEmpty)));
432 });
433
434 test("is a no-op for an unknown stream", () {
435 var controller = new StreamController<String>();
436 expect(streamGroup.remove(controller.stream), isNull);
437 });
438
439 test("and closed closes the group when the last stream is removed",
440 () async {
441 var controller1 = new StreamController<String>();
442 var controller2 = new StreamController<String>();
443
444 streamGroup.add(controller1.stream);
445 streamGroup.add(controller2.stream);
446 await new Future.delayed(Duration.ZERO);
447
448 streamGroup.close();
449
450 streamGroup.remove(controller1.stream);
451 await new Future.delayed(Duration.ZERO);
452
453 streamGroup.remove(controller2.stream);
454 await new Future.delayed(Duration.ZERO);
455
456 expect(streamGroup.stream.toList(), completion(isEmpty));
457 });
458 });
459
460 group("while listening", () {
461 test("doesn't emit events from a removed stream", () {
462 var controller = new StreamController<String>();
463 streamGroup.add(controller.stream);
464
465 // The subscription to [controller.stream] is canceled synchronously, so
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Long line :)
nweiz 2015/06/19 00:44:18 Done.
466 // the first event is dropped even though it was added before the
467 // removal. This is documented in [StreamGroup.remove].
468 expect(streamGroup.stream.toList(), completion(isEmpty));
469
470 controller.add("first");
471 expect(streamGroup.remove(controller.stream), isNull);
472 controller.add("second");
473
474 expect(streamGroup.close(), completes);
475 });
476
477 test("cancels the stream's subscription", () async {
478 var canceled = false;
479 var controller = new StreamController<String>(onCancel: () {
480 canceled = true;
481 });
482 streamGroup.add(controller.stream);
483
484 streamGroup.stream.listen(null);
485 await new Future.delayed(Duration.ZERO);
486 expect(canceled, isFalse);
487
488 streamGroup.remove(controller.stream);
489 await new Future.delayed(Duration.ZERO);
490 expect(canceled, isTrue);
491 });
492
493 test("forwards cancel errors", () async {
494 var controller = new StreamController<String>(
495 onCancel: () => throw "error");
496 streamGroup.add(controller.stream);
497
498 streamGroup.stream.listen(null);
499 await new Future.delayed(Duration.ZERO);
500
501 expect(streamGroup.remove(controller.stream), throwsA("error"));
502 });
503
504 test("forwards cancel futures", () async {
505 var completer = new Completer();
506 var controller = new StreamController<String>(
507 onCancel: () => completer.future);
508
509 streamGroup.stream.listen(null);
510 await new Future.delayed(Duration.ZERO);
511
512 streamGroup.add(controller.stream);
513 await new Future.delayed(Duration.ZERO);
514
515 var fired = false;
516 streamGroup.remove(controller.stream).then((_) => fired = true);
517
518 await new Future.delayed(Duration.ZERO);
519 expect(fired, isFalse);
520
521 completer.complete();
522 await new Future.delayed(Duration.ZERO);
523 expect(fired, isTrue);
524 });
525
526 test("is a no-op for an unknown stream", () async {
527 var controller = new StreamController<String>();
528 streamGroup.stream.listen(null);
529 await new Future.delayed(Duration.ZERO);
530
531 expect(streamGroup.remove(controller.stream), isNull);
532 });
533
534 test("and closed closes the group when the last stream is removed",
535 () async {
536 var done = false;
537 streamGroup.stream.listen(null, onDone: () => done = true);
538 await new Future.delayed(Duration.ZERO);
539
540 var controller1 = new StreamController<String>();
541 var controller2 = new StreamController<String>();
542
543 streamGroup.add(controller1.stream);
544 streamGroup.add(controller2.stream);
545 await new Future.delayed(Duration.ZERO);
546
547 streamGroup.close();
548
549 streamGroup.remove(controller1.stream);
550 await new Future.delayed(Duration.ZERO);
551 expect(done, isFalse);
552
553 streamGroup.remove(controller2.stream);
554 await new Future.delayed(Duration.ZERO);
555 expect(done, isTrue);
556 });
557 });
558 });
559
560 group("close()", () {
561 group("while dormant", () {
562 test("if there are no streams, closes the group", () {
563 expect(streamGroup.close(), completes);
564 expect(streamGroup.stream.toList(), completion(isEmpty));
565 });
566
567 test("if there are streams, closes the group once those streams close "
568 "and there's a listener", () async {
569 var controller1 = new StreamController<String>();
570 var controller2 = new StreamController<String>();
571
572 streamGroup.add(controller1.stream);
573 streamGroup.add(controller2.stream);
574 await new Future.delayed(Duration.ZERO);
575
576 streamGroup.close();
577
578 controller1.close();
579 controller2.close();
580 expect(streamGroup.stream.toList(), completion(isEmpty));
581 });
582 });
583
584 group("while active", () {
585 test("if there are no streams, closes the group", () {
586 expect(streamGroup.stream.toList(), completion(isEmpty));
587 expect(streamGroup.close(), completes);
588 });
589
590 test("if there are streams, closes the group once those streams close",
591 () async {
592 var done = false;
593 streamGroup.stream.listen(null, onDone: () => done = true);
594 await new Future.delayed(Duration.ZERO);
595
596 var controller1 = new StreamController<String>();
597 var controller2 = new StreamController<String>();
598
599 streamGroup.add(controller1.stream);
600 streamGroup.add(controller2.stream);
601 await new Future.delayed(Duration.ZERO);
602
603 streamGroup.close();
604 await new Future.delayed(Duration.ZERO);
605 expect(done, isFalse);
606
607 controller1.close();
608 await new Future.delayed(Duration.ZERO);
609 expect(done, isFalse);
610
611 controller2.close();
612 await new Future.delayed(Duration.ZERO);
613 expect(done, isTrue);
614 });
615 });
616
617 test("returns a Future that completes once all events are dispatched",
618 () async {
619 var events = [];
620 streamGroup.stream.listen(events.add);
621
622 var controller = new StreamController<String>();
623 streamGroup.add(controller.stream);
624 await new Future.delayed(Duration.ZERO);
625
626 // Add a bunch of events. Each of these will get dispatched in a
627 // separate microtask, so we can test that [close] only completes once
628 // all of them have dispatched.
629 controller.add("one");
630 controller.add("two");
631 controller.add("three");
632 controller.add("four");
633 controller.add("five");
634 controller.add("six");
635 controller.close();
636
637 await streamGroup.close();
638 expect(events, equals(["one", "two", "three", "four", "five", "six"]));
639 });
640 });
641 });
642
643 test("merge() emits events from all components streams", () {
644 var controller1 = new StreamController<String>();
645 var controller2 = new StreamController<String>();
646
647 var merged = StreamGroup.merge([controller1.stream, controller2.stream]);
648
649 controller1.add("first");
650 controller1.close();
651 controller2.add("second");
652 controller2.close();
653
654 expect(merged.toList(), completion(equals(["first", "second"])));
Lasse Reichstein Nielsen 2015/06/18 10:29:32 Again, maybe only compare unordered since the orde
nweiz 2015/06/19 00:44:17 Done.
655 });
656 }
OLDNEW
« lib/src/stream_group.dart ('K') | « lib/src/stream_group.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698