Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(306)

Side by Side Diff: test/util/stream_queue_test.dart

Issue 1262623006: Temporarily bring in code from the async package. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents.
4
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
6 // lands.
7
8 import "dart:async";
9
10 import "package:test/src/util/stream_queue.dart";
11 import "package:test/test.dart";
12
13 main() {
14 group("source stream", () {
15 test("is listened to on first request, paused between requests", () async {
16 var controller = new StreamController();
17 var events = new StreamQueue<int>(controller.stream);
18 await flushMicrotasks();
19 expect(controller.hasListener, isFalse);
20
21 var next = events.next;
22 expect(controller.hasListener, isTrue);
23 expect(controller.isPaused, isFalse);
24
25 controller.add(1);
26
27 expect(await next, 1);
28 expect(controller.hasListener, isTrue);
29 expect(controller.isPaused, isTrue);
30
31 next = events.next;
32 expect(controller.hasListener, isTrue);
33 expect(controller.isPaused, isFalse);
34
35 controller.add(2);
36
37 expect(await next, 2);
38 expect(controller.hasListener, isTrue);
39 expect(controller.isPaused, isTrue);
40
41 events.cancel();
42 expect(controller.hasListener, isFalse);
43 });
44 });
45
46 group("next operation", () {
47 test("simple sequence of requests", () async {
48 var events = new StreamQueue<int>(createStream());
49 for (int i = 1; i <= 4; i++) {
50 expect(await events.next, i);
51 }
52 expect(events.next, throwsStateError);
53 });
54
55 test("multiple requests at the same time", () async {
56 var events = new StreamQueue<int>(createStream());
57 var result = await Future.wait(
58 [events.next, events.next, events.next, events.next]);
59 expect(result, [1, 2, 3, 4]);
60 await events.cancel();
61 });
62
63 test("sequence of requests with error", () async {
64 var events = new StreamQueue<int>(createErrorStream());
65 expect(await events.next, 1);
66 expect(await events.next, 2);
67 expect(events.next, throwsA("To err is divine!"));
68 expect(await events.next, 4);
69 await events.cancel();
70 });
71 });
72
73 group("skip operation", () {
74 test("of two elements in the middle of sequence", () async {
75 var events = new StreamQueue<int>(createStream());
76 expect(await events.next, 1);
77 expect(await events.skip(2), 0);
78 expect(await events.next, 4);
79 await events.cancel();
80 });
81
82 test("with negative/bad arguments throws", () async {
83 var events = new StreamQueue<int>(createStream());
84 expect(() => events.skip(-1), throwsArgumentError);
85 // A non-int throws either a type error or an argument error,
86 // depending on whether it's checked mode or not.
87 expect(await events.next, 1); // Did not consume event.
88 expect(() => events.skip(-1), throwsArgumentError);
89 expect(await events.next, 2); // Did not consume event.
90 await events.cancel();
91 });
92
93 test("of 0 elements works", () async {
94 var events = new StreamQueue<int>(createStream());
95 expect(events.skip(0), completion(0));
96 expect(events.next, completion(1));
97 expect(events.skip(0), completion(0));
98 expect(events.next, completion(2));
99 expect(events.skip(0), completion(0));
100 expect(events.next, completion(3));
101 expect(events.skip(0), completion(0));
102 expect(events.next, completion(4));
103 expect(events.skip(0), completion(0));
104 expect(events.skip(5), completion(5));
105 expect(events.next, throwsStateError);
106 await events.cancel();
107 });
108
109 test("of too many events ends at stream start", () async {
110 var events = new StreamQueue<int>(createStream());
111 expect(await events.skip(6), 2);
112 await events.cancel();
113 });
114
115 test("of too many events after some events", () async {
116 var events = new StreamQueue<int>(createStream());
117 expect(await events.next, 1);
118 expect(await events.next, 2);
119 expect(await events.skip(6), 4);
120 await events.cancel();
121 });
122
123 test("of too many events ends at stream end", () async {
124 var events = new StreamQueue<int>(createStream());
125 expect(await events.next, 1);
126 expect(await events.next, 2);
127 expect(await events.next, 3);
128 expect(await events.next, 4);
129 expect(await events.skip(2), 2);
130 await events.cancel();
131 });
132
133 test("of events with error", () async {
134 var events = new StreamQueue<int>(createErrorStream());
135 expect(events.skip(4), throwsA("To err is divine!"));
136 expect(await events.next, 4);
137 await events.cancel();
138 });
139
140 test("of events with error, and skip again after", () async {
141 var events = new StreamQueue<int>(createErrorStream());
142 expect(events.skip(4), throwsA("To err is divine!"));
143 expect(events.skip(2), completion(1));
144 await events.cancel();
145 });
146 test("multiple skips at same time complete in order.", () async {
147 var events = new StreamQueue<int>(createStream());
148 var skip1 = events.skip(1);
149 var skip2 = events.skip(0);
150 var skip3 = events.skip(4);
151 var skip4 = events.skip(1);
152 var index = 0;
153 // Check that futures complete in order.
154 sequence(expectedValue, sequenceIndex) => (value) {
155 expect(value, expectedValue);
156 expect(index, sequenceIndex);
157 index++;
158 };
159 await Future.wait([skip1.then(sequence(0, 0)),
160 skip2.then(sequence(0, 1)),
161 skip3.then(sequence(1, 2)),
162 skip4.then(sequence(1, 3))]);
163 await events.cancel();
164 });
165 });
166
167 group("take operation", () {
168 test("as simple take of events", () async {
169 var events = new StreamQueue<int>(createStream());
170 expect(await events.next, 1);
171 expect(await events.take(2), [2, 3]);
172 expect(await events.next, 4);
173 await events.cancel();
174 });
175
176 test("of 0 events", () async {
177 var events = new StreamQueue<int>(createStream());
178 expect(events.take(0), completion([]));
179 expect(events.next, completion(1));
180 expect(events.take(0), completion([]));
181 expect(events.next, completion(2));
182 expect(events.take(0), completion([]));
183 expect(events.next, completion(3));
184 expect(events.take(0), completion([]));
185 expect(events.next, completion(4));
186 expect(events.take(0), completion([]));
187 expect(events.take(5), completion([]));
188 expect(events.next, throwsStateError);
189 await events.cancel();
190 });
191
192 test("with bad arguments throws", () async {
193 var events = new StreamQueue<int>(createStream());
194 expect(() => events.take(-1), throwsArgumentError);
195 expect(await events.next, 1); // Did not consume event.
196 expect(() => events.take(-1), throwsArgumentError);
197 expect(await events.next, 2); // Did not consume event.
198 await events.cancel();
199 });
200
201 test("of too many arguments", () async {
202 var events = new StreamQueue<int>(createStream());
203 expect(await events.take(6), [1, 2, 3, 4]);
204 await events.cancel();
205 });
206
207 test("too large later", () async {
208 var events = new StreamQueue<int>(createStream());
209 expect(await events.next, 1);
210 expect(await events.next, 2);
211 expect(await events.take(6), [3, 4]);
212 await events.cancel();
213 });
214
215 test("error", () async {
216 var events = new StreamQueue<int>(createErrorStream());
217 expect(events.take(4), throwsA("To err is divine!"));
218 expect(await events.next, 4);
219 await events.cancel();
220 });
221 });
222
223 group("rest operation", () {
224 test("after single next", () async {
225 var events = new StreamQueue<int>(createStream());
226 expect(await events.next, 1);
227 expect(await events.rest.toList(), [2, 3, 4]);
228 });
229
230 test("at start", () async {
231 var events = new StreamQueue<int>(createStream());
232 expect(await events.rest.toList(), [1, 2, 3, 4]);
233 });
234
235 test("at end", () async {
236 var events = new StreamQueue<int>(createStream());
237 expect(await events.next, 1);
238 expect(await events.next, 2);
239 expect(await events.next, 3);
240 expect(await events.next, 4);
241 expect(await events.rest.toList(), isEmpty);
242 });
243
244 test("after end", () async {
245 var events = new StreamQueue<int>(createStream());
246 expect(await events.next, 1);
247 expect(await events.next, 2);
248 expect(await events.next, 3);
249 expect(await events.next, 4);
250 expect(events.next, throwsStateError);
251 expect(await events.rest.toList(), isEmpty);
252 });
253
254 test("after receiving done requested before", () async {
255 var events = new StreamQueue<int>(createStream());
256 var next1 = events.next;
257 var next2 = events.next;
258 var next3 = events.next;
259 var rest = events.rest;
260 for (int i = 0; i < 10; i++) {
261 await flushMicrotasks();
262 }
263 expect(await next1, 1);
264 expect(await next2, 2);
265 expect(await next3, 3);
266 expect(await rest.toList(), [4]);
267 });
268
269 test("with an error event error", () async {
270 var events = new StreamQueue<int>(createErrorStream());
271 expect(await events.next, 1);
272 var rest = events.rest;
273 var events2 = new StreamQueue(rest);
274 expect(await events2.next, 2);
275 expect(events2.next, throwsA("To err is divine!"));
276 expect(await events2.next, 4);
277 });
278
279 test("closes the events, prevents other operations", () async {
280 var events = new StreamQueue<int>(createStream());
281 var stream = events.rest;
282 expect(() => events.next, throwsStateError);
283 expect(() => events.skip(1), throwsStateError);
284 expect(() => events.take(1), throwsStateError);
285 expect(() => events.rest, throwsStateError);
286 expect(() => events.cancel(), throwsStateError);
287 expect(stream.toList(), completion([1, 2, 3, 4]));
288 });
289
290 test("forwards to underlying stream", () async {
291 var cancel = new Completer();
292 var controller = new StreamController(onCancel: () => cancel.future);
293 var events = new StreamQueue<int>(controller.stream);
294 expect(controller.hasListener, isFalse);
295 var next = events.next;
296 expect(controller.hasListener, isTrue);
297 expect(controller.isPaused, isFalse);
298
299 controller.add(1);
300 expect(await next, 1);
301 expect(controller.isPaused, isTrue);
302
303 var rest = events.rest;
304 var subscription = rest.listen(null);
305 expect(controller.hasListener, isTrue);
306 expect(controller.isPaused, isFalse);
307
308 var lastEvent;
309 subscription.onData((value) => lastEvent = value);
310
311 controller.add(2);
312
313 await flushMicrotasks();
314 expect(lastEvent, 2);
315 expect(controller.hasListener, isTrue);
316 expect(controller.isPaused, isFalse);
317
318 subscription.pause();
319 expect(controller.isPaused, isTrue);
320
321 controller.add(3);
322
323 await flushMicrotasks();
324 expect(lastEvent, 2);
325 subscription.resume();
326
327 await flushMicrotasks();
328 expect(lastEvent, 3);
329
330 var cancelFuture = subscription.cancel();
331 expect(controller.hasListener, isFalse);
332 cancel.complete(42);
333 expect(cancelFuture, completion(42));
334 });
335 });
336
337 group("cancel operation", () {
338 test("closes the events, prevents any other operation", () async {
339 var events = new StreamQueue<int>(createStream());
340 await events.cancel();
341 expect(() => events.next, throwsStateError);
342 expect(() => events.skip(1), throwsStateError);
343 expect(() => events.take(1), throwsStateError);
344 expect(() => events.rest, throwsStateError);
345 expect(() => events.cancel(), throwsStateError);
346 });
347
348 test("cancels underlying subscription when called before any event",
349 () async {
350 var cancelFuture = new Future.value(42);
351 var controller = new StreamController(onCancel: () => cancelFuture);
352 var events = new StreamQueue<int>(controller.stream);
353 expect(await events.cancel(), 42);
354 });
355
356 test("cancels underlying subscription, returns result", () async {
357 var cancelFuture = new Future.value(42);
358 var controller = new StreamController(onCancel: () => cancelFuture);
359 var events = new StreamQueue<int>(controller.stream);
360 controller.add(1);
361 expect(await events.next, 1);
362 expect(await events.cancel(), 42);
363 });
364
365 group("with immediate: true", () {
366 test("closes the events, prevents any other operation", () async {
367 var events = new StreamQueue<int>(createStream());
368 await events.cancel(immediate: true);
369 expect(() => events.next, throwsStateError);
370 expect(() => events.skip(1), throwsStateError);
371 expect(() => events.take(1), throwsStateError);
372 expect(() => events.rest, throwsStateError);
373 expect(() => events.cancel(), throwsStateError);
374 });
375
376 test("cancels the underlying subscription immediately", () async {
377 var controller = new StreamController();
378 controller.add(1);
379
380 var events = new StreamQueue<int>(controller.stream);
381 expect(await events.next, 1);
382 expect(controller.hasListener, isTrue);
383
384 events.cancel(immediate: true);
385 await expect(controller.hasListener, isFalse);
386 });
387
388 test("cancels the underlying subscription when called before any event",
389 () async {
390 var cancelFuture = new Future.value(42);
391 var controller = new StreamController(onCancel: () => cancelFuture);
392
393 var events = new StreamQueue<int>(controller.stream);
394 expect(await events.cancel(immediate: true), 42);
395 });
396
397 test("closes pending requests", () async {
398 var events = new StreamQueue<int>(createStream());
399 expect(await events.next, 1);
400 expect(events.next, throwsStateError);
401 expect(events.hasNext, completion(isFalse));
402
403 await events.cancel(immediate: true);
404 });
405
406 test("returns the result of closing the underlying subscription",
407 () async {
408 var controller = new StreamController(
409 onCancel: () => new Future.value(42));
410 var events = new StreamQueue<int>(controller.stream);
411 expect(await events.cancel(immediate: true), 42);
412 });
413
414 test("listens and then cancels a stream that hasn't been listened to yet",
415 () async {
416 var wasListened = false;
417 var controller = new StreamController(
418 onListen: () => wasListened = true);
419 var events = new StreamQueue<int>(controller.stream);
420 expect(wasListened, isFalse);
421 expect(controller.hasListener, isFalse);
422
423 await events.cancel(immediate: true);
424 expect(wasListened, isTrue);
425 expect(controller.hasListener, isFalse);
426 });
427 });
428 });
429
430 group("hasNext operation", () {
431 test("true at start", () async {
432 var events = new StreamQueue<int>(createStream());
433 expect(await events.hasNext, isTrue);
434 });
435
436 test("true after start", () async {
437 var events = new StreamQueue<int>(createStream());
438 expect(await events.next, 1);
439 expect(await events.hasNext, isTrue);
440 });
441
442 test("true at end", () async {
443 var events = new StreamQueue<int>(createStream());
444 for (int i = 1; i <= 4; i++) {
445 expect(await events.next, i);
446 }
447 expect(await events.hasNext, isFalse);
448 });
449
450 test("true when enqueued", () async {
451 var events = new StreamQueue<int>(createStream());
452 var values = [];
453 for (int i = 1; i <= 3; i++) {
454 events.next.then(values.add);
455 }
456 expect(values, isEmpty);
457 expect(await events.hasNext, isTrue);
458 expect(values, [1, 2, 3]);
459 });
460
461 test("false when enqueued", () async {
462 var events = new StreamQueue<int>(createStream());
463 var values = [];
464 for (int i = 1; i <= 4; i++) {
465 events.next.then(values.add);
466 }
467 expect(values, isEmpty);
468 expect(await events.hasNext, isFalse);
469 expect(values, [1, 2, 3, 4]);
470 });
471
472 test("true when data event", () async {
473 var controller = new StreamController();
474 var events = new StreamQueue<int>(controller.stream);
475
476 var hasNext;
477 events.hasNext.then((result) { hasNext = result; });
478 await flushMicrotasks();
479 expect(hasNext, isNull);
480 controller.add(42);
481 expect(hasNext, isNull);
482 await flushMicrotasks();
483 expect(hasNext, isTrue);
484 });
485
486 test("true when error event", () async {
487 var controller = new StreamController();
488 var events = new StreamQueue<int>(controller.stream);
489
490 var hasNext;
491 events.hasNext.then((result) { hasNext = result; });
492 await flushMicrotasks();
493 expect(hasNext, isNull);
494 controller.addError("BAD");
495 expect(hasNext, isNull);
496 await flushMicrotasks();
497 expect(hasNext, isTrue);
498 expect(events.next, throwsA("BAD"));
499 });
500
501 test("- hasNext after hasNext", () async {
502 var events = new StreamQueue<int>(createStream());
503 expect(await events.hasNext, true);
504 expect(await events.hasNext, true);
505 expect(await events.next, 1);
506 expect(await events.hasNext, true);
507 expect(await events.hasNext, true);
508 expect(await events.next, 2);
509 expect(await events.hasNext, true);
510 expect(await events.hasNext, true);
511 expect(await events.next, 3);
512 expect(await events.hasNext, true);
513 expect(await events.hasNext, true);
514 expect(await events.next, 4);
515 expect(await events.hasNext, false);
516 expect(await events.hasNext, false);
517 });
518
519 test("- next after true", () async {
520 var events = new StreamQueue<int>(createStream());
521 expect(await events.next, 1);
522 expect(await events.hasNext, true);
523 expect(await events.next, 2);
524 expect(await events.next, 3);
525 });
526
527 test("- next after true, enqueued", () async {
528 var events = new StreamQueue<int>(createStream());
529 var responses = [];
530 events.next.then(responses.add);
531 events.hasNext.then(responses.add);
532 events.next.then(responses.add);
533 do {
534 await flushMicrotasks();
535 } while (responses.length < 3);
536 expect(responses, [1, true, 2]);
537 });
538
539 test("- skip 0 after true", () async {
540 var events = new StreamQueue<int>(createStream());
541 expect(await events.next, 1);
542 expect(await events.hasNext, true);
543 expect(await events.skip(0), 0);
544 expect(await events.next, 2);
545 });
546
547 test("- skip 1 after true", () async {
548 var events = new StreamQueue<int>(createStream());
549 expect(await events.next, 1);
550 expect(await events.hasNext, true);
551 expect(await events.skip(1), 0);
552 expect(await events.next, 3);
553 });
554
555 test("- skip 2 after true", () async {
556 var events = new StreamQueue<int>(createStream());
557 expect(await events.next, 1);
558 expect(await events.hasNext, true);
559 expect(await events.skip(2), 0);
560 expect(await events.next, 4);
561 });
562
563 test("- take 0 after true", () async {
564 var events = new StreamQueue<int>(createStream());
565 expect(await events.next, 1);
566 expect(await events.hasNext, true);
567 expect(await events.take(0), isEmpty);
568 expect(await events.next, 2);
569 });
570
571 test("- take 1 after true", () async {
572 var events = new StreamQueue<int>(createStream());
573 expect(await events.next, 1);
574 expect(await events.hasNext, true);
575 expect(await events.take(1), [2]);
576 expect(await events.next, 3);
577 });
578
579 test("- take 2 after true", () async {
580 var events = new StreamQueue<int>(createStream());
581 expect(await events.next, 1);
582 expect(await events.hasNext, true);
583 expect(await events.take(2), [2, 3]);
584 expect(await events.next, 4);
585 });
586
587 test("- rest after true", () async {
588 var events = new StreamQueue<int>(createStream());
589 expect(await events.next, 1);
590 expect(await events.hasNext, true);
591 var stream = events.rest;
592 expect(await stream.toList(), [2, 3, 4]);
593 });
594
595 test("- rest after true, at last", () async {
596 var events = new StreamQueue<int>(createStream());
597 expect(await events.next, 1);
598 expect(await events.next, 2);
599 expect(await events.next, 3);
600 expect(await events.hasNext, true);
601 var stream = events.rest;
602 expect(await stream.toList(), [4]);
603 });
604
605 test("- rest after false", () async {
606 var events = new StreamQueue<int>(createStream());
607 expect(await events.next, 1);
608 expect(await events.next, 2);
609 expect(await events.next, 3);
610 expect(await events.next, 4);
611 expect(await events.hasNext, false);
612 var stream = events.rest;
613 expect(await stream.toList(), isEmpty);
614 });
615
616 test("- cancel after true on data", () async {
617 var events = new StreamQueue<int>(createStream());
618 expect(await events.next, 1);
619 expect(await events.next, 2);
620 expect(await events.hasNext, true);
621 expect(await events.cancel(), null);
622 });
623
624 test("- cancel after true on error", () async {
625 var events = new StreamQueue<int>(createErrorStream());
626 expect(await events.next, 1);
627 expect(await events.next, 2);
628 expect(await events.hasNext, true);
629 expect(await events.cancel(), null);
630 });
631 });
632
633 group("fork operation", () {
634 test("produces a stream queue with the same events", () async {
635 var queue1 = new StreamQueue<int>(createStream());
636 var queue2 = queue1.fork();
637
638 expect(await queue1.next, 1);
639 expect(await queue1.next, 2);
640 expect(await queue1.next, 3);
641 expect(await queue1.next, 4);
642 expect(await queue1.hasNext, isFalse);
643
644 expect(await queue2.next, 1);
645 expect(await queue2.next, 2);
646 expect(await queue2.next, 3);
647 expect(await queue2.next, 4);
648 expect(await queue2.hasNext, isFalse);
649 });
650
651 test("produces a stream queue with the same errors", () async {
652 var queue1 = new StreamQueue<int>(createErrorStream());
653 var queue2 = queue1.fork();
654
655 expect(await queue1.next, 1);
656 expect(await queue1.next, 2);
657 expect(queue1.next, throwsA("To err is divine!"));
658 expect(await queue1.next, 4);
659 expect(await queue1.hasNext, isFalse);
660
661 expect(await queue2.next, 1);
662 expect(await queue2.next, 2);
663 expect(queue2.next, throwsA("To err is divine!"));
664 expect(await queue2.next, 4);
665 expect(await queue2.hasNext, isFalse);
666 });
667
668 test("forks at the current point in the source queue", () {
669 var queue1 = new StreamQueue<int>(createStream());
670
671 expect(queue1.next, completion(1));
672 expect(queue1.next, completion(2));
673
674 var queue2 = queue1.fork();
675
676 expect(queue1.next, completion(3));
677 expect(queue1.next, completion(4));
678 expect(queue1.hasNext, completion(isFalse));
679
680 expect(queue2.next, completion(3));
681 expect(queue2.next, completion(4));
682 expect(queue2.hasNext, completion(isFalse));
683 });
684
685 test("can be created after there are pending values", () async {
686 var queue1 = new StreamQueue<int>(createStream());
687 await flushMicrotasks();
688
689 var queue2 = queue1.fork();
690 expect(await queue2.next, 1);
691 expect(await queue2.next, 2);
692 expect(await queue2.next, 3);
693 expect(await queue2.next, 4);
694 expect(await queue2.hasNext, isFalse);
695 });
696
697 test("multiple forks can be created at different points", () async {
698 var queue1 = new StreamQueue<int>(createStream());
699
700 var queue2 = queue1.fork();
701 expect(await queue1.next, 1);
702 expect(await queue2.next, 1);
703
704 var queue3 = queue1.fork();
705 expect(await queue1.next, 2);
706 expect(await queue2.next, 2);
707 expect(await queue3.next, 2);
708
709 var queue4 = queue1.fork();
710 expect(await queue1.next, 3);
711 expect(await queue2.next, 3);
712 expect(await queue3.next, 3);
713 expect(await queue4.next, 3);
714
715 var queue5 = queue1.fork();
716 expect(await queue1.next, 4);
717 expect(await queue2.next, 4);
718 expect(await queue3.next, 4);
719 expect(await queue4.next, 4);
720 expect(await queue5.next, 4);
721
722 var queue6 = queue1.fork();
723 expect(await queue1.hasNext, isFalse);
724 expect(await queue2.hasNext, isFalse);
725 expect(await queue3.hasNext, isFalse);
726 expect(await queue4.hasNext, isFalse);
727 expect(await queue5.hasNext, isFalse);
728 expect(await queue6.hasNext, isFalse);
729 });
730
731 test("same-level forks receive data in the order they were created",
732 () async {
733 var queue1 = new StreamQueue<int>(createStream());
734 var queue2 = queue1.fork();
735 var queue3 = queue1.fork();
736 var queue4 = queue1.fork();
737 var queue5 = queue1.fork();
738
739 for (var i = 0; i < 4; i++) {
740 var queue1Fired = false;
741 var queue2Fired = false;
742 var queue3Fired = false;
743 var queue4Fired = false;
744 var queue5Fired = false;
745
746 queue5.next.then(expectAsync((_) {
747 queue5Fired = true;
748 expect(queue1Fired, isTrue);
749 expect(queue2Fired, isTrue);
750 expect(queue3Fired, isTrue);
751 expect(queue4Fired, isTrue);
752 }));
753
754 queue1.next.then(expectAsync((_) {
755 queue1Fired = true;
756 expect(queue2Fired, isFalse);
757 expect(queue3Fired, isFalse);
758 expect(queue4Fired, isFalse);
759 expect(queue5Fired, isFalse);
760 }));
761
762 queue4.next.then(expectAsync((_) {
763 queue4Fired = true;
764 expect(queue1Fired, isTrue);
765 expect(queue2Fired, isTrue);
766 expect(queue3Fired, isTrue);
767 expect(queue5Fired, isFalse);
768 }));
769
770 queue2.next.then(expectAsync((_) {
771 queue2Fired = true;
772 expect(queue1Fired, isTrue);
773 expect(queue3Fired, isFalse);
774 expect(queue4Fired, isFalse);
775 expect(queue5Fired, isFalse);
776 }));
777
778 queue3.next.then(expectAsync((_) {
779 queue3Fired = true;
780 expect(queue1Fired, isTrue);
781 expect(queue2Fired, isTrue);
782 expect(queue4Fired, isFalse);
783 expect(queue5Fired, isFalse);
784 }));
785 }
786 });
787
788 test("forks can be created from forks", () async {
789 var queue1 = new StreamQueue<int>(createStream());
790
791 var queue2 = queue1.fork();
792 expect(await queue1.next, 1);
793 expect(await queue2.next, 1);
794
795 var queue3 = queue2.fork();
796 expect(await queue1.next, 2);
797 expect(await queue2.next, 2);
798 expect(await queue3.next, 2);
799
800 var queue4 = queue3.fork();
801 expect(await queue1.next, 3);
802 expect(await queue2.next, 3);
803 expect(await queue3.next, 3);
804 expect(await queue4.next, 3);
805
806 var queue5 = queue4.fork();
807 expect(await queue1.next, 4);
808 expect(await queue2.next, 4);
809 expect(await queue3.next, 4);
810 expect(await queue4.next, 4);
811 expect(await queue5.next, 4);
812
813 var queue6 = queue5.fork();
814 expect(await queue1.hasNext, isFalse);
815 expect(await queue2.hasNext, isFalse);
816 expect(await queue3.hasNext, isFalse);
817 expect(await queue4.hasNext, isFalse);
818 expect(await queue5.hasNext, isFalse);
819 expect(await queue6.hasNext, isFalse);
820 });
821
822 group("canceling:", () {
823 test("cancelling a fork doesn't cancel its source", () async {
824 var queue1 = new StreamQueue<int>(createStream());
825 var queue2 = queue1.fork();
826
827 queue2.cancel();
828 expect(() => queue2.next, throwsStateError);
829
830 expect(await queue1.next, 1);
831 expect(await queue1.next, 2);
832 expect(await queue1.next, 3);
833 expect(await queue1.next, 4);
834 expect(await queue1.hasNext, isFalse);
835 });
836
837 test("cancelling a source doesn't cancel its unmaterialized fork",
838 () async {
839 var queue1 = new StreamQueue<int>(createStream());
840 var queue2 = queue1.fork();
841
842 queue1.cancel();
843 expect(() => queue1.next, throwsStateError);
844
845 expect(await queue2.next, 1);
846 expect(await queue2.next, 2);
847 expect(await queue2.next, 3);
848 expect(await queue2.next, 4);
849 expect(await queue2.hasNext, isFalse);
850 });
851
852 test("cancelling a source doesn't cancel its materialized fork",
853 () async {
854 var queue1 = new StreamQueue<int>(createStream());
855 var queue2 = queue1.fork();
856
857 expect(await queue1.next, 1);
858
859 queue1.cancel();
860 expect(() => queue1.next, throwsStateError);
861
862 expect(await queue2.next, 1);
863 expect(await queue2.next, 2);
864 expect(await queue2.next, 3);
865 expect(await queue2.next, 4);
866 expect(await queue2.hasNext, isFalse);
867 });
868
869 test("the underlying stream is only canceled once all forks are canceled",
870 () async {
871 var controller = new StreamController();
872 var queue1 = new StreamQueue<int>(controller.stream);
873 var queue2 = queue1.fork();
874
875 await flushMicrotasks();
876 expect(controller.hasListener, isFalse);
877
878 expect(queue1.next, completion(1));
879 await flushMicrotasks();
880 expect(controller.hasListener, isTrue);
881
882 queue2.cancel();
883 await flushMicrotasks();
884 expect(controller.hasListener, isTrue);
885
886 controller.add(1);
887 queue1.cancel();
888 await flushMicrotasks();
889 expect(controller.hasListener, isFalse);
890 });
891
892 group("with immediate,", () {
893 test("cancelling a fork doesn't cancel its source", () async {
894 var queue1 = new StreamQueue<int>(createStream());
895 var queue2 = queue1.fork();
896
897 queue2.cancel(immediate: true);
898 expect(() => queue2.next, throwsStateError);
899
900 expect(await queue1.next, 1);
901 expect(await queue1.next, 2);
902 expect(await queue1.next, 3);
903 expect(await queue1.next, 4);
904 expect(await queue1.hasNext, isFalse);
905 });
906
907 test("cancelling a source doesn't cancel its unmaterialized fork",
908 () async {
909 var queue1 = new StreamQueue<int>(createStream());
910 var queue2 = queue1.fork();
911
912 queue1.cancel(immediate: true);
913 expect(() => queue1.next, throwsStateError);
914
915 expect(await queue2.next, 1);
916 expect(await queue2.next, 2);
917 expect(await queue2.next, 3);
918 expect(await queue2.next, 4);
919 expect(await queue2.hasNext, isFalse);
920 });
921
922 test("cancelling a source doesn't cancel its materialized fork",
923 () async {
924 var queue1 = new StreamQueue<int>(createStream());
925 var queue2 = queue1.fork();
926
927 expect(await queue1.next, 1);
928
929 queue1.cancel(immediate: true);
930 expect(() => queue1.next, throwsStateError);
931
932 expect(await queue2.next, 1);
933 expect(await queue2.next, 2);
934 expect(await queue2.next, 3);
935 expect(await queue2.next, 4);
936 expect(await queue2.hasNext, isFalse);
937 });
938
939 test("the underlying stream is only canceled once all forks are "
940 "canceled", () async {
941 var controller = new StreamController();
942 var queue1 = new StreamQueue<int>(controller.stream);
943 var queue2 = queue1.fork();
944
945 await flushMicrotasks();
946 expect(controller.hasListener, isFalse);
947
948 expect(queue1.next, throwsStateError);
949 await flushMicrotasks();
950 expect(controller.hasListener, isTrue);
951
952 queue2.cancel(immediate: true);
953 await flushMicrotasks();
954 expect(controller.hasListener, isTrue);
955
956 queue1.cancel(immediate: true);
957 await flushMicrotasks();
958 expect(controller.hasListener, isFalse);
959 });
960 });
961 });
962
963 group("pausing:", () {
964 test("the underlying stream is only implicitly paused when no forks are "
965 "awaiting input", () async {
966 var controller = new StreamController();
967 var queue1 = new StreamQueue<int>(controller.stream);
968 var queue2 = queue1.fork();
969
970 controller.add(1);
971 expect(await queue1.next, 1);
972 expect(controller.hasListener, isTrue);
973 expect(controller.isPaused, isTrue);
974
975 expect(queue1.next, completion(2));
976 await flushMicrotasks();
977 expect(controller.isPaused, isFalse);
978
979 controller.add(2);
980 await flushMicrotasks();
981 expect(controller.isPaused, isTrue);
982
983 expect(queue2.next, completion(1));
984 expect(queue2.next, completion(2));
985 expect(queue2.next, completion(3));
986 await flushMicrotasks();
987 expect(controller.isPaused, isFalse);
988
989 controller.add(3);
990 await flushMicrotasks();
991 expect(controller.isPaused, isTrue);
992 });
993
994 test("pausing a fork doesn't pause its source", () async {
995 var queue1 = new StreamQueue<int>(createStream());
996 var queue2 = queue1.fork();
997
998 queue2.rest.listen(expectAsync((_) {}, count: 0)).pause();
999
1000 expect(await queue1.next, 1);
1001 expect(await queue1.next, 2);
1002 expect(await queue1.next, 3);
1003 expect(await queue1.next, 4);
1004 expect(await queue1.hasNext, isFalse);
1005 });
1006
1007 test("pausing a source doesn't pause its fork", () async {
1008 var queue1 = new StreamQueue<int>(createStream());
1009 var queue2 = queue1.fork();
1010
1011 queue1.rest.listen(expectAsync((_) {}, count: 0)).pause();
1012
1013 expect(await queue2.next, 1);
1014 expect(await queue2.next, 2);
1015 expect(await queue2.next, 3);
1016 expect(await queue2.next, 4);
1017 expect(await queue2.hasNext, isFalse);
1018 });
1019
1020 test("the underlying stream is only paused when all forks are paused",
1021 () async {
1022 var controller = new StreamController();
1023 var queue1 = new StreamQueue<int>(controller.stream);
1024 var queue2 = queue1.fork();
1025
1026 await flushMicrotasks();
1027 expect(controller.hasListener, isFalse);
1028
1029 var sub1 = queue1.rest.listen(null);
1030 await flushMicrotasks();
1031 expect(controller.hasListener, isTrue);
1032 expect(controller.isPaused, isFalse);
1033
1034 sub1.pause();
1035 await flushMicrotasks();
1036 expect(controller.isPaused, isTrue);
1037
1038 expect(queue2.next, completion(1));
1039 await flushMicrotasks();
1040 expect(controller.isPaused, isFalse);
1041
1042 controller.add(1);
1043 await flushMicrotasks();
1044 expect(controller.isPaused, isTrue);
1045
1046 var sub2 = queue2.rest.listen(null);
1047 await flushMicrotasks();
1048 expect(controller.isPaused, isFalse);
1049
1050 sub2.pause();
1051 await flushMicrotasks();
1052 expect(controller.isPaused, isTrue);
1053
1054 sub1.resume();
1055 await flushMicrotasks();
1056 expect(controller.isPaused, isFalse);
1057 });
1058 });
1059 });
1060
1061 test("all combinations sequential skip/next/take operations", () async {
1062 // Takes all combinations of two of next, skip and take, then ends with
1063 // doing rest. Each of the first rounds do 10 events of each type,
1064 // the rest does 20 elements.
1065 var eventCount = 20 * (3 * 3 + 1);
1066 var events = new StreamQueue<int>(createLongStream(eventCount));
1067
1068 // Test expecting [startIndex .. startIndex + 9] as events using
1069 // `next`.
1070 nextTest(startIndex) {
1071 for (int i = 0; i < 10; i++) {
1072 expect(events.next, completion(startIndex + i));
1073 }
1074 }
1075
1076 // Test expecting 10 events to be skipped.
1077 skipTest(startIndex) {
1078 expect(events.skip(10), completion(0));
1079 }
1080
1081 // Test expecting [startIndex .. startIndex + 9] as events using
1082 // `take(10)`.
1083 takeTest(startIndex) {
1084 expect(events.take(10),
1085 completion(new List.generate(10, (i) => startIndex + i)));
1086 }
1087 var tests = [nextTest, skipTest, takeTest];
1088
1089 int counter = 0;
1090 // Run through all pairs of two tests and run them.
1091 for (int i = 0; i < tests.length; i++) {
1092 for (int j = 0; j < tests.length; j++) {
1093 tests[i](counter);
1094 tests[j](counter + 10);
1095 counter += 20;
1096 }
1097 }
1098 // Then expect 20 more events as a `rest` call.
1099 expect(events.rest.toList(),
1100 completion(new List.generate(20, (i) => counter + i)));
1101 });
1102 }
1103
1104 Stream<int> createStream() async* {
1105 yield 1;
1106 await flushMicrotasks();
1107 yield 2;
1108 await flushMicrotasks();
1109 yield 3;
1110 await flushMicrotasks();
1111 yield 4;
1112 }
1113
1114 Stream<int> createErrorStream() {
1115 StreamController controller = new StreamController<int>();
1116 () async {
1117 controller.add(1);
1118 await flushMicrotasks();
1119 controller.add(2);
1120 await flushMicrotasks();
1121 controller.addError("To err is divine!");
1122 await flushMicrotasks();
1123 controller.add(4);
1124 await flushMicrotasks();
1125 controller.close();
1126 }();
1127 return controller.stream;
1128 }
1129
1130 Stream<int> createLongStream(int eventCount) async* {
1131 for (int i = 0; i < eventCount; i++) yield i;
1132 }
1133
1134 Future flushMicrotasks() => new Future.delayed(Duration.ZERO);
OLDNEW
« lib/src/util/cancelable_future.dart ('K') | « test/util/forkable_stream_test.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698