Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(172)

Side by Side Diff: packages/async/test/stream_group_test.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.test.stream_group_test;
6
7 import 'dart:async'; 5 import 'dart:async';
8 6
9 import 'package:async/async.dart'; 7 import 'package:async/async.dart';
10 import 'package:test/test.dart'; 8 import 'package:test/test.dart';
11 9
12 main() { 10 main() {
13 group("single-subscription", () { 11 group("single-subscription", () {
14 var streamGroup; 12 var streamGroup;
15 setUp(() { 13 setUp(() {
16 streamGroup = new StreamGroup<String>(); 14 streamGroup = new StreamGroup<String>();
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
70 controller.close(); 68 controller.close();
71 69
72 await flushMicrotasks(); 70 await flushMicrotasks();
73 71
74 expect(streamGroup.close(), completes); 72 expect(streamGroup.close(), completes);
75 73
76 var transformed = streamGroup.stream.transform( 74 var transformed = streamGroup.stream.transform(
77 new StreamTransformer.fromHandlers( 75 new StreamTransformer.fromHandlers(
78 handleData: (data, sink) => sink.add("data: $data"), 76 handleData: (data, sink) => sink.add("data: $data"),
79 handleError: (error, _, sink) => sink.add("error: $error"))); 77 handleError: (error, _, sink) => sink.add("error: $error")));
80 expect(transformed.toList(), completion(equals([ 78 expect(
81 "data: first", 79 transformed.toList(),
82 "error: second", 80 completion(equals([
83 "data: third", 81 "data: first",
84 "error: fourth", 82 "error: second",
85 "error: fifth", 83 "data: third",
86 "data: sixth" 84 "error: fourth",
87 ]))); 85 "error: fifth",
86 "data: sixth"
87 ])));
88 }); 88 });
89 89
90 test("emits events once there's a listener", () { 90 test("emits events once there's a listener", () {
91 var controller = new StreamController<String>(); 91 var controller = new StreamController<String>();
92 streamGroup.add(controller.stream); 92 streamGroup.add(controller.stream);
93 93
94 expect(streamGroup.stream.toList(), 94 expect(
95 completion(equals(["first", "second"]))); 95 streamGroup.stream.toList(), completion(equals(["first", "second"])));
96 96
97 controller.add("first"); 97 controller.add("first");
98 controller.add("second"); 98 controller.add("second");
99 controller.close(); 99 controller.close();
100 100
101 expect(streamGroup.close(), completes); 101 expect(streamGroup.close(), completes);
102 }); 102 });
103 103
104 test("doesn't buffer events from a broadcast stream", () async { 104 test("doesn't buffer events from a broadcast stream", () async {
105 var controller = new StreamController<String>.broadcast(); 105 var controller = new StreamController<String>.broadcast();
(...skipping 26 matching lines...) Expand all
132 expect(streamGroup.close(), completes); 132 expect(streamGroup.close(), completes);
133 await flushMicrotasks(); 133 await flushMicrotasks();
134 134
135 expect(events, equals(["first", "second"])); 135 expect(events, equals(["first", "second"]));
136 }); 136 });
137 137
138 test("emits events from a broadcast stream once there's a listener", () { 138 test("emits events from a broadcast stream once there's a listener", () {
139 var controller = new StreamController<String>.broadcast(); 139 var controller = new StreamController<String>.broadcast();
140 streamGroup.add(controller.stream); 140 streamGroup.add(controller.stream);
141 141
142 expect(streamGroup.stream.toList(), 142 expect(
143 completion(equals(["first", "second"]))); 143 streamGroup.stream.toList(), completion(equals(["first", "second"])));
144 144
145 controller.add("first"); 145 controller.add("first");
146 controller.add("second"); 146 controller.add("second");
147 controller.close(); 147 controller.close();
148 148
149 expect(streamGroup.close(), completes); 149 expect(streamGroup.close(), completes);
150 }); 150 });
151 151
152 test("forwards cancel errors", () async { 152 test("forwards cancel errors", () async {
153 var subscription = streamGroup.stream.listen(null); 153 var subscription = streamGroup.stream.listen(null);
154 154
155 var controller = new StreamController<String>( 155 var controller =
156 onCancel: () => throw "error"); 156 new StreamController<String>(onCancel: () => throw "error");
157 streamGroup.add(controller.stream); 157 streamGroup.add(controller.stream);
158 await flushMicrotasks(); 158 await flushMicrotasks();
159 159
160 expect(subscription.cancel(), throwsA("error")); 160 expect(subscription.cancel(), throwsA("error"));
161 }); 161 });
162 162
163 test("forwards a cancel future", () async { 163 test("forwards a cancel future", () async {
164 var subscription = streamGroup.stream.listen(null); 164 var subscription = streamGroup.stream.listen(null);
165 165
166 var completer = new Completer(); 166 var completer = new Completer();
167 var controller = new StreamController<String>( 167 var controller =
168 onCancel: () => completer.future); 168 new StreamController<String>(onCancel: () => completer.future);
169 streamGroup.add(controller.stream); 169 streamGroup.add(controller.stream);
170 await flushMicrotasks(); 170 await flushMicrotasks();
171 171
172 var fired = false; 172 var fired = false;
173 subscription.cancel().then((_) => fired = true); 173 subscription.cancel().then((_) => fired = true);
174 174
175 await flushMicrotasks(); 175 await flushMicrotasks();
176 expect(fired, isFalse); 176 expect(fired, isFalse);
177 177
178 completer.complete(); 178 completer.complete();
179 await flushMicrotasks(); 179 await flushMicrotasks();
180 expect(fired, isTrue); 180 expect(fired, isTrue);
181 }); 181 });
182 182
183 test("add() while active pauses the stream if the group is paused, then " 183 test(
184 "add() while active pauses the stream if the group is paused, then "
184 "resumes once the group resumes", () async { 185 "resumes once the group resumes", () async {
185 var subscription = streamGroup.stream.listen(null); 186 var subscription = streamGroup.stream.listen(null);
186 await flushMicrotasks(); 187 await flushMicrotasks();
187 188
188 var paused = false; 189 var paused = false;
189 var controller = new StreamController<String>( 190 var controller = new StreamController<String>(
190 onPause: () => paused = true, 191 onPause: () => paused = true, onResume: () => paused = false);
191 onResume: () => paused = false);
192 192
193 subscription.pause(); 193 subscription.pause();
194 await flushMicrotasks(); 194 await flushMicrotasks();
195 195
196 streamGroup.add(controller.stream); 196 streamGroup.add(controller.stream);
197 await flushMicrotasks(); 197 await flushMicrotasks();
198 expect(paused, isTrue); 198 expect(paused, isTrue);
199 199
200 subscription.resume(); 200 subscription.resume();
201 await flushMicrotasks(); 201 await flushMicrotasks();
202 expect(paused, isFalse); 202 expect(paused, isFalse);
203 }); 203 });
204 204
205 group("add() while canceled", () { 205 group("add() while canceled", () {
206 setUp(() async { 206 setUp(() async {
207 streamGroup.stream.listen(null).cancel(); 207 streamGroup.stream.listen(null).cancel();
208 await flushMicrotasks(); 208 await flushMicrotasks();
209 }); 209 });
210 210
211 test("immediately listens to and cancels the stream", () async { 211 test("immediately listens to and cancels the stream", () async {
212 var listened = false; 212 var listened = false;
213 var canceled = false; 213 var canceled = false;
214 var controller = new StreamController<String>(onListen: () { 214 var controller = new StreamController<String>(onListen: () {
215 listened = true; 215 listened = true;
216 }, onCancel: expectAsync(() { 216 }, onCancel: expectAsync0(() {
217 expect(listened, isTrue); 217 expect(listened, isTrue);
218 canceled = true; 218 canceled = true;
219 })); 219 }));
220 220
221 streamGroup.add(controller.stream); 221 streamGroup.add(controller.stream);
222 await flushMicrotasks(); 222 await flushMicrotasks();
223 expect(listened, isTrue); 223 expect(listened, isTrue);
224 expect(canceled, isTrue); 224 expect(canceled, isTrue);
225 }); 225 });
226 226
227 test("forwards cancel errors", () { 227 test("forwards cancel errors", () {
228 var controller = new StreamController<String>( 228 var controller =
229 onCancel: () => throw "error"); 229 new StreamController<String>(onCancel: () => throw "error");
230 230
231 expect(streamGroup.add(controller.stream), throwsA("error")); 231 expect(streamGroup.add(controller.stream), throwsA("error"));
232 }); 232 });
233 233
234 test("forwards a cancel future", () async { 234 test("forwards a cancel future", () async {
235 var completer = new Completer(); 235 var completer = new Completer();
236 var controller = new StreamController<String>( 236 var controller =
237 onCancel: () => completer.future); 237 new StreamController<String>(onCancel: () => completer.future);
238 238
239 var fired = false; 239 var fired = false;
240 streamGroup.add(controller.stream).then((_) => fired = true); 240 streamGroup.add(controller.stream).then((_) => fired = true);
241 241
242 await flushMicrotasks(); 242 await flushMicrotasks();
243 expect(fired, isFalse); 243 expect(fired, isFalse);
244 244
245 completer.complete(); 245 completer.complete();
246 await flushMicrotasks(); 246 await flushMicrotasks();
247 expect(fired, isTrue); 247 expect(fired, isTrue);
(...skipping 15 matching lines...) Expand all
263 263
264 var controller2 = new StreamController<String>(); 264 var controller2 = new StreamController<String>();
265 streamGroup.add(controller2.stream); 265 streamGroup.add(controller2.stream);
266 controller2.add("second"); 266 controller2.add("second");
267 controller2.close(); 267 controller2.close();
268 268
269 await flushMicrotasks(); 269 await flushMicrotasks();
270 270
271 expect(streamGroup.close(), completes); 271 expect(streamGroup.close(), completes);
272 272
273 expect(streamGroup.stream.toList(), 273 expect(
274 completion(equals(["first", "second"]))); 274 streamGroup.stream.toList(), completion(equals(["first", "second"])));
275 }); 275 });
276 276
277 test("emits events from multiple sources once there's a listener", () { 277 test("emits events from multiple sources once there's a listener", () {
278 var controller1 = new StreamController<String>(); 278 var controller1 = new StreamController<String>();
279 streamGroup.add(controller1.stream); 279 streamGroup.add(controller1.stream);
280 280
281 var controller2 = new StreamController<String>(); 281 var controller2 = new StreamController<String>();
282 streamGroup.add(controller2.stream); 282 streamGroup.add(controller2.stream);
283 283
284 expect(streamGroup.stream.toList(), 284 expect(
285 completion(equals(["first", "second"]))); 285 streamGroup.stream.toList(), completion(equals(["first", "second"])));
286 286
287 controller1.add("first"); 287 controller1.add("first");
288 controller2.add("second"); 288 controller2.add("second");
289 controller1.close(); 289 controller1.close();
290 controller2.close(); 290 controller2.close();
291 291
292 expect(streamGroup.close(), completes); 292 expect(streamGroup.close(), completes);
293 }); 293 });
294 294
295 test("doesn't buffer events once a listener has been added and removed", 295 test("doesn't buffer events once a listener has been added and removed",
(...skipping 24 matching lines...) Expand all
320 await flushMicrotasks(); 320 await flushMicrotasks();
321 321
322 expect(streamGroup.close(), completes); 322 expect(streamGroup.close(), completes);
323 expect(streamGroup.stream.toList(), completion(isEmpty)); 323 expect(streamGroup.stream.toList(), completion(isEmpty));
324 }); 324 });
325 325
326 test("emits events from a broadcast stream once there's a listener", () { 326 test("emits events from a broadcast stream once there's a listener", () {
327 var controller = new StreamController<String>.broadcast(); 327 var controller = new StreamController<String>.broadcast();
328 streamGroup.add(controller.stream); 328 streamGroup.add(controller.stream);
329 329
330 expect(streamGroup.stream.toList(), 330 expect(
331 completion(equals(["first", "second"]))); 331 streamGroup.stream.toList(), completion(equals(["first", "second"])));
332 332
333 controller.add("first"); 333 controller.add("first");
334 controller.add("second"); 334 controller.add("second");
335 controller.close(); 335 controller.close();
336 336
337 expect(streamGroup.close(), completes); 337 expect(streamGroup.close(), completes);
338 }); 338 });
339 339
340 test("cancels and re-listens broadcast streams", () async { 340 test("cancels and re-listens broadcast streams", () async {
341 var subscription = streamGroup.stream.listen(null); 341 var subscription = streamGroup.stream.listen(null);
342 342
343 var controller = new StreamController<String>.broadcast(); 343 var controller = new StreamController<String>.broadcast();
344 344
345 streamGroup.add(controller.stream); 345 streamGroup.add(controller.stream);
346 await flushMicrotasks(); 346 await flushMicrotasks();
347 expect(controller.hasListener, isTrue); 347 expect(controller.hasListener, isTrue);
348 348
349 subscription.cancel(); 349 subscription.cancel();
350 await flushMicrotasks(); 350 await flushMicrotasks();
351 expect(controller.hasListener, isFalse); 351 expect(controller.hasListener, isFalse);
352 352
353 streamGroup.stream.listen(null); 353 streamGroup.stream.listen(null);
354 await flushMicrotasks(); 354 await flushMicrotasks();
355 expect(controller.hasListener, isTrue); 355 expect(controller.hasListener, isTrue);
356 }); 356 });
357 357
358 test("never cancels single-subscription streams", () async { 358 test("never cancels single-subscription streams", () async {
359 var subscription = streamGroup.stream.listen(null); 359 var subscription = streamGroup.stream.listen(null);
360 360
361 var controller = new StreamController<String>( 361 var controller =
362 onCancel: expectAsync(() {}, count: 0)); 362 new StreamController<String>(onCancel: expectAsync0(() {}, count: 0));
363 363
364 streamGroup.add(controller.stream); 364 streamGroup.add(controller.stream);
365 await flushMicrotasks(); 365 await flushMicrotasks();
366 366
367 subscription.cancel(); 367 subscription.cancel();
368 await flushMicrotasks(); 368 await flushMicrotasks();
369 369
370 streamGroup.stream.listen(null); 370 streamGroup.stream.listen(null);
371 await flushMicrotasks(); 371 await flushMicrotasks();
372 }); 372 });
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
502 test("stops emitting events for a stream that's removed", () async { 502 test("stops emitting events for a stream that's removed", () async {
503 var controller = new StreamController<String>(); 503 var controller = new StreamController<String>();
504 streamGroup.add(controller.stream); 504 streamGroup.add(controller.stream);
505 505
506 expect(streamGroup.stream.toList(), completion(equals(["first"]))); 506 expect(streamGroup.stream.toList(), completion(equals(["first"])));
507 507
508 controller.add("first"); 508 controller.add("first");
509 await flushMicrotasks(); 509 await flushMicrotasks();
510 controller.add("second"); 510 controller.add("second");
511 511
512 expect(streamGroup.remove(controller.stream), isNull); 512 expect(streamGroup.remove(controller.stream), completion(null));
513 expect(streamGroup.close(), completes); 513 expect(streamGroup.close(), completes);
514 }); 514 });
515 515
516 test("is a no-op for an unknown stream", () { 516 test("is a no-op for an unknown stream", () {
517 var controller = new StreamController<String>(); 517 var controller = new StreamController<String>();
518 expect(streamGroup.remove(controller.stream), isNull); 518 expect(streamGroup.remove(controller.stream), isNull);
519 }); 519 });
520 520
521 test("and closed closes the group when the last stream is removed", 521 test("and closed closes the group when the last stream is removed",
522 () async { 522 () async {
(...skipping 20 matching lines...) Expand all
543 test("doesn't emit events from a removed stream", () { 543 test("doesn't emit events from a removed stream", () {
544 var controller = new StreamController<String>(); 544 var controller = new StreamController<String>();
545 streamGroup.add(controller.stream); 545 streamGroup.add(controller.stream);
546 546
547 // The subscription to [controller.stream] is canceled synchronously, so 547 // The subscription to [controller.stream] is canceled synchronously, so
548 // the first event is dropped even though it was added before the 548 // the first event is dropped even though it was added before the
549 // removal. This is documented in [StreamGroup.remove]. 549 // removal. This is documented in [StreamGroup.remove].
550 expect(streamGroup.stream.toList(), completion(isEmpty)); 550 expect(streamGroup.stream.toList(), completion(isEmpty));
551 551
552 controller.add("first"); 552 controller.add("first");
553 expect(streamGroup.remove(controller.stream), isNull); 553 expect(streamGroup.remove(controller.stream), completion(null));
554 controller.add("second"); 554 controller.add("second");
555 555
556 expect(streamGroup.close(), completes); 556 expect(streamGroup.close(), completes);
557 }); 557 });
558 558
559 test("cancels the stream's subscription", () async { 559 test("cancels the stream's subscription", () async {
560 var controller = new StreamController<String>(); 560 var controller = new StreamController<String>();
561 streamGroup.add(controller.stream); 561 streamGroup.add(controller.stream);
562 562
563 streamGroup.stream.listen(null); 563 streamGroup.stream.listen(null);
564 await flushMicrotasks(); 564 await flushMicrotasks();
565 expect(controller.hasListener, isTrue); 565 expect(controller.hasListener, isTrue);
566 566
567 streamGroup.remove(controller.stream); 567 streamGroup.remove(controller.stream);
568 await flushMicrotasks(); 568 await flushMicrotasks();
569 expect(controller.hasListener, isFalse); 569 expect(controller.hasListener, isFalse);
570 }); 570 });
571 571
572 test("forwards cancel errors", () async { 572 test("forwards cancel errors", () async {
573 var controller = new StreamController<String>( 573 var controller =
574 onCancel: () => throw "error"); 574 new StreamController<String>(onCancel: () => throw "error");
575 streamGroup.add(controller.stream); 575 streamGroup.add(controller.stream);
576 576
577 streamGroup.stream.listen(null); 577 streamGroup.stream.listen(null);
578 await flushMicrotasks(); 578 await flushMicrotasks();
579 579
580 expect(streamGroup.remove(controller.stream), throwsA("error")); 580 expect(streamGroup.remove(controller.stream), throwsA("error"));
581 }); 581 });
582 582
583 test("forwards cancel futures", () async { 583 test("forwards cancel futures", () async {
584 var completer = new Completer(); 584 var completer = new Completer();
585 var controller = new StreamController<String>( 585 var controller =
586 onCancel: () => completer.future); 586 new StreamController<String>(onCancel: () => completer.future);
587 587
588 streamGroup.stream.listen(null); 588 streamGroup.stream.listen(null);
589 await flushMicrotasks(); 589 await flushMicrotasks();
590 590
591 streamGroup.add(controller.stream); 591 streamGroup.add(controller.stream);
592 await flushMicrotasks(); 592 await flushMicrotasks();
593 593
594 var fired = false; 594 var fired = false;
595 streamGroup.remove(controller.stream).then((_) => fired = true); 595 streamGroup.remove(controller.stream).then((_) => fired = true);
596 596
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
636 }); 636 });
637 }); 637 });
638 638
639 group("close()", () { 639 group("close()", () {
640 group("while dormant", () { 640 group("while dormant", () {
641 test("if there are no streams, closes the group", () { 641 test("if there are no streams, closes the group", () {
642 expect(streamGroup.close(), completes); 642 expect(streamGroup.close(), completes);
643 expect(streamGroup.stream.toList(), completion(isEmpty)); 643 expect(streamGroup.stream.toList(), completion(isEmpty));
644 }); 644 });
645 645
646 test("if there are streams, closes the group once those streams close " 646 test(
647 "if there are streams, closes the group once those streams close "
647 "and there's a listener", () async { 648 "and there's a listener", () async {
648 var controller1 = new StreamController<String>(); 649 var controller1 = new StreamController<String>();
649 var controller2 = new StreamController<String>(); 650 var controller2 = new StreamController<String>();
650 651
651 streamGroup.add(controller1.stream); 652 streamGroup.add(controller1.stream);
652 streamGroup.add(controller2.stream); 653 streamGroup.add(controller2.stream);
653 await flushMicrotasks(); 654 await flushMicrotasks();
654 655
655 streamGroup.close(); 656 streamGroup.close();
656 657
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after
714 controller.close(); 715 controller.close();
715 716
716 await streamGroup.close(); 717 await streamGroup.close();
717 expect(events, equals(["one", "two", "three", "four", "five", "six"])); 718 expect(events, equals(["one", "two", "three", "four", "five", "six"]));
718 }); 719 });
719 }); 720 });
720 } 721 }
721 722
722 /// Wait for all microtasks to complete. 723 /// Wait for all microtasks to complete.
723 Future flushMicrotasks() => new Future.delayed(Duration.ZERO); 724 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
OLDNEW
« no previous file with comments | « packages/async/test/stream_completer_test.dart ('k') | packages/async/test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698