Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1799)

Side by Side Diff: test/stream_queue_test.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« lib/src/stream_queue.dart ('K') | « test/forkable_stream_test.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE filevents. 3 // BSD-style license that can be found in the LICENSE filevents.
4 4
5 import "dart:async"; 5 import "dart:async";
6 6
7 import "package:async/async.dart" show StreamQueue; 7 import "package:async/async.dart" show StreamQueue;
8 import "package:test/test.dart"; 8 import "package:test/test.dart";
9 9
10 import "utils.dart"; 10 import "utils.dart";
(...skipping 326 matching lines...) Expand 10 before | Expand all | Expand 10 after
337 test("closes the events, prevents any other operation", () async { 337 test("closes the events, prevents any other operation", () async {
338 var events = new StreamQueue<int>(createStream()); 338 var events = new StreamQueue<int>(createStream());
339 await events.cancel(); 339 await events.cancel();
340 expect(() => events.next, throwsStateError); 340 expect(() => events.next, throwsStateError);
341 expect(() => events.skip(1), throwsStateError); 341 expect(() => events.skip(1), throwsStateError);
342 expect(() => events.take(1), throwsStateError); 342 expect(() => events.take(1), throwsStateError);
343 expect(() => events.rest, throwsStateError); 343 expect(() => events.rest, throwsStateError);
344 expect(() => events.cancel(), throwsStateError); 344 expect(() => events.cancel(), throwsStateError);
345 }); 345 });
346 346
347 test("cancels underlying subscription when called before any event",
348 () async {
349 var cancelFuture = new Future.value(42);
350 var controller = new StreamController(onCancel: () => cancelFuture);
351 var events = new StreamQueue<int>(controller.stream);
352 expect(await events.cancel(), 42);
353 });
354
347 test("cancels underlying subscription, returns result", () async { 355 test("cancels underlying subscription, returns result", () async {
348 var cancelFuture = new Future.value(42); 356 var cancelFuture = new Future.value(42);
349 var controller = new StreamController(onCancel: () => cancelFuture); 357 var controller = new StreamController(onCancel: () => cancelFuture);
350 var events = new StreamQueue<int>(controller.stream); 358 var events = new StreamQueue<int>(controller.stream);
351 controller.add(1); 359 controller.add(1);
352 expect(await events.next, 1); 360 expect(await events.next, 1);
353 expect(await events.cancel(), 42); 361 expect(await events.cancel(), 42);
354 }); 362 });
355 363
356 group("with immediate: true", () async { 364 group("with immediate: true", () {
357 test("closes the events, prevents any other operation", () async { 365 test("closes the events, prevents any other operation", () async {
358 var events = new StreamQueue<int>(createStream()); 366 var events = new StreamQueue<int>(createStream());
359 await events.cancel(immediate: true); 367 await events.cancel(immediate: true);
360 expect(() => events.next, throwsStateError); 368 expect(() => events.next, throwsStateError);
361 expect(() => events.skip(1), throwsStateError); 369 expect(() => events.skip(1), throwsStateError);
362 expect(() => events.take(1), throwsStateError); 370 expect(() => events.take(1), throwsStateError);
363 expect(() => events.rest, throwsStateError); 371 expect(() => events.rest, throwsStateError);
364 expect(() => events.cancel(), throwsStateError); 372 expect(() => events.cancel(), throwsStateError);
365 }); 373 });
366 374
367 test("cancels the underlying subscription immediately", () async { 375 test("cancels the underlying subscription immediately", () async {
368 var controller = new StreamController(); 376 var controller = new StreamController();
369 controller.add(1); 377 controller.add(1);
370 378
371 var events = new StreamQueue<int>(controller.stream); 379 var events = new StreamQueue<int>(controller.stream);
372 expect(await events.next, 1); 380 expect(await events.next, 1);
373 expect(controller.hasListener, isTrue); 381 expect(controller.hasListener, isTrue);
374 382
375 events.cancel(immediate: true); 383 events.cancel(immediate: true);
376 await expect(controller.hasListener, isFalse); 384 await expect(controller.hasListener, isFalse);
377 }); 385 });
378 386
387 test("cancels the underlying subscription when called before any event",
388 () async {
389 var cancelFuture = new Future.value(42);
390 var controller = new StreamController(onCancel: () => cancelFuture);
391
392 var events = new StreamQueue<int>(controller.stream);
393 expect(await events.cancel(immediate: true), 42);
394 });
395
379 test("closes pending requests", () async { 396 test("closes pending requests", () async {
380 var events = new StreamQueue<int>(createStream()); 397 var events = new StreamQueue<int>(createStream());
381 expect(await events.next, 1); 398 expect(await events.next, 1);
382 expect(events.next, throwsStateError); 399 expect(events.next, throwsStateError);
383 expect(events.hasNext, completion(isFalse)); 400 expect(events.hasNext, completion(isFalse));
384 401
385 await events.cancel(immediate: true); 402 await events.cancel(immediate: true);
386 }); 403 });
387 404
388 test("returns the result of closing the underlying subscription", 405 test("returns the result of closing the underlying subscription",
(...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after
605 622
606 test("- cancel after true on error", () async { 623 test("- cancel after true on error", () async {
607 var events = new StreamQueue<int>(createErrorStream()); 624 var events = new StreamQueue<int>(createErrorStream());
608 expect(await events.next, 1); 625 expect(await events.next, 1);
609 expect(await events.next, 2); 626 expect(await events.next, 2);
610 expect(await events.hasNext, true); 627 expect(await events.hasNext, true);
611 expect(await events.cancel(), null); 628 expect(await events.cancel(), null);
612 }); 629 });
613 }); 630 });
614 631
632 group("fork operation", () {
633 test("produces a stream queue with the same events", () async {
634 var queue1 = new StreamQueue<int>(createStream());
635 var queue2 = queue1.fork();
636
637 expect(await queue1.next, 1);
638 expect(await queue1.next, 2);
639 expect(await queue1.next, 3);
640 expect(await queue1.next, 4);
641 expect(await queue1.hasNext, isFalse);
642
643 expect(await queue2.next, 1);
644 expect(await queue2.next, 2);
645 expect(await queue2.next, 3);
646 expect(await queue2.next, 4);
647 expect(await queue2.hasNext, isFalse);
648 });
649
650 test("produces a stream queue with the same errors", () async {
651 var queue1 = new StreamQueue<int>(createErrorStream());
652 var queue2 = queue1.fork();
653
654 expect(await queue1.next, 1);
655 expect(await queue1.next, 2);
656 expect(queue1.next, throwsA("To err is divine!"));
657 expect(await queue1.next, 4);
658 expect(await queue1.hasNext, isFalse);
659
660 expect(await queue2.next, 1);
661 expect(await queue2.next, 2);
662 expect(queue2.next, throwsA("To err is divine!"));
663 expect(await queue2.next, 4);
664 expect(await queue2.hasNext, isFalse);
665 });
666
667 test("forks at the current point in the source queue", () {
668 var queue1 = new StreamQueue<int>(createStream());
669
670 expect(queue1.next, completion(1));
671 expect(queue1.next, completion(2));
672
673 var queue2 = queue1.fork();
674
675 expect(queue1.next, completion(3));
676 expect(queue1.next, completion(4));
677 expect(queue1.hasNext, completion(isFalse));
678
679 expect(queue2.next, completion(3));
680 expect(queue2.next, completion(4));
681 expect(queue2.hasNext, completion(isFalse));
682 });
683
684 test("can be created after there are pending values", () async {
685 var queue1 = new StreamQueue<int>(createStream());
686 await flushMicrotasks();
687
688 var queue2 = queue1.fork();
689 expect(await queue2.next, 1);
690 expect(await queue2.next, 2);
691 expect(await queue2.next, 3);
692 expect(await queue2.next, 4);
693 expect(await queue2.hasNext, isFalse);
694 });
695
696 test("multiple forks can be created at different points", () async {
697 var queue1 = new StreamQueue<int>(createStream());
698
699 var queue2 = queue1.fork();
700 expect(await queue1.next, 1);
701 expect(await queue2.next, 1);
702
703 var queue3 = queue1.fork();
704 expect(await queue1.next, 2);
705 expect(await queue2.next, 2);
706 expect(await queue3.next, 2);
707
708 var queue4 = queue1.fork();
709 expect(await queue1.next, 3);
710 expect(await queue2.next, 3);
711 expect(await queue3.next, 3);
712 expect(await queue4.next, 3);
713
714 var queue5 = queue1.fork();
715 expect(await queue1.next, 4);
716 expect(await queue2.next, 4);
717 expect(await queue3.next, 4);
718 expect(await queue4.next, 4);
719 expect(await queue5.next, 4);
720
721 var queue6 = queue1.fork();
722 expect(await queue1.hasNext, isFalse);
723 expect(await queue2.hasNext, isFalse);
724 expect(await queue3.hasNext, isFalse);
725 expect(await queue4.hasNext, isFalse);
726 expect(await queue5.hasNext, isFalse);
727 expect(await queue6.hasNext, isFalse);
728 });
729
730 test("same-level forks receive data in the order they were created",
731 () async {
732 var queue1 = new StreamQueue<int>(createStream());
733 var queue2 = queue1.fork();
734 var queue3 = queue1.fork();
735 var queue4 = queue1.fork();
736 var queue5 = queue1.fork();
737
738 for (var i = 0; i < 4; i++) {
739 var queue1Fired = false;
740 var queue2Fired = false;
741 var queue3Fired = false;
742 var queue4Fired = false;
743 var queue5Fired = false;
744
745 queue5.next.then(expectAsync((_) {
746 queue5Fired = true;
747 expect(queue1Fired, isTrue);
748 expect(queue2Fired, isTrue);
749 expect(queue3Fired, isTrue);
750 expect(queue4Fired, isTrue);
751 }));
752
753 queue1.next.then(expectAsync((_) {
754 queue1Fired = true;
755 expect(queue2Fired, isFalse);
756 expect(queue3Fired, isFalse);
757 expect(queue4Fired, isFalse);
758 expect(queue5Fired, isFalse);
759 }));
760
761 queue4.next.then(expectAsync((_) {
762 queue4Fired = true;
763 expect(queue1Fired, isTrue);
764 expect(queue2Fired, isTrue);
765 expect(queue3Fired, isTrue);
766 expect(queue5Fired, isFalse);
767 }));
768
769 queue2.next.then(expectAsync((_) {
770 queue2Fired = true;
771 expect(queue1Fired, isTrue);
772 expect(queue3Fired, isFalse);
773 expect(queue4Fired, isFalse);
774 expect(queue5Fired, isFalse);
775 }));
776
777 queue3.next.then(expectAsync((_) {
778 queue3Fired = true;
779 expect(queue1Fired, isTrue);
780 expect(queue2Fired, isTrue);
781 expect(queue4Fired, isFalse);
782 expect(queue5Fired, isFalse);
783 }));
784 }
785 });
786
787 test("forks can be created from forks", () async {
788 var queue1 = new StreamQueue<int>(createStream());
789
790 var queue2 = queue1.fork();
791 expect(await queue1.next, 1);
792 expect(await queue2.next, 1);
793
794 var queue3 = queue2.fork();
795 expect(await queue1.next, 2);
796 expect(await queue2.next, 2);
797 expect(await queue3.next, 2);
798
799 var queue4 = queue3.fork();
800 expect(await queue1.next, 3);
801 expect(await queue2.next, 3);
802 expect(await queue3.next, 3);
803 expect(await queue4.next, 3);
804
805 var queue5 = queue4.fork();
806 expect(await queue1.next, 4);
807 expect(await queue2.next, 4);
808 expect(await queue3.next, 4);
809 expect(await queue4.next, 4);
810 expect(await queue5.next, 4);
811
812 var queue6 = queue5.fork();
813 expect(await queue1.hasNext, isFalse);
814 expect(await queue2.hasNext, isFalse);
815 expect(await queue3.hasNext, isFalse);
816 expect(await queue4.hasNext, isFalse);
817 expect(await queue5.hasNext, isFalse);
818 expect(await queue6.hasNext, isFalse);
819 });
820
821 group("canceling:", () {
822 test("cancelling a fork doesn't cancel its source", () async {
823 var queue1 = new StreamQueue<int>(createStream());
824 var queue2 = queue1.fork();
825
826 queue2.cancel();
827 expect(() => queue2.next, throwsStateError);
828
829 expect(await queue1.next, 1);
830 expect(await queue1.next, 2);
831 expect(await queue1.next, 3);
832 expect(await queue1.next, 4);
833 expect(await queue1.hasNext, isFalse);
834 });
835
836 test("cancelling a source doesn't cancel its unmaterialized fork",
837 () async {
838 var queue1 = new StreamQueue<int>(createStream());
839 var queue2 = queue1.fork();
840
841 queue1.cancel();
842 expect(() => queue1.next, throwsStateError);
843
844 expect(await queue2.next, 1);
845 expect(await queue2.next, 2);
846 expect(await queue2.next, 3);
847 expect(await queue2.next, 4);
848 expect(await queue2.hasNext, isFalse);
849 });
850
851 test("cancelling a source doesn't cancel its materialized fork",
852 () async {
853 var queue1 = new StreamQueue<int>(createStream());
854 var queue2 = queue1.fork();
855
856 expect(await queue1.next, 1);
857
858 queue1.cancel();
859 expect(() => queue1.next, throwsStateError);
860
861 expect(await queue2.next, 1);
862 expect(await queue2.next, 2);
863 expect(await queue2.next, 3);
864 expect(await queue2.next, 4);
865 expect(await queue2.hasNext, isFalse);
866 });
867
868 test("the underlying stream is only canceled once all forks are canceled",
869 () async {
870 var controller = new StreamController();
871 var queue1 = new StreamQueue<int>(controller.stream);
872 var queue2 = queue1.fork();
873
874 await flushMicrotasks();
875 expect(controller.hasListener, isFalse);
876
877 expect(queue1.next, completion(1));
878 await flushMicrotasks();
879 expect(controller.hasListener, isTrue);
880
881 queue2.cancel();
882 await flushMicrotasks();
883 expect(controller.hasListener, isTrue);
884
885 controller.add(1);
886 queue1.cancel();
887 await flushMicrotasks();
888 expect(controller.hasListener, isFalse);
889 });
890
891 group("with immediate,", () {
892 test("cancelling a fork doesn't cancel its source", () async {
893 var queue1 = new StreamQueue<int>(createStream());
894 var queue2 = queue1.fork();
895
896 queue2.cancel(immediate: true);
897 expect(() => queue2.next, throwsStateError);
898
899 expect(await queue1.next, 1);
900 expect(await queue1.next, 2);
901 expect(await queue1.next, 3);
902 expect(await queue1.next, 4);
903 expect(await queue1.hasNext, isFalse);
904 });
905
906 test("cancelling a source doesn't cancel its unmaterialized fork",
907 () async {
908 var queue1 = new StreamQueue<int>(createStream());
909 var queue2 = queue1.fork();
910
911 queue1.cancel(immediate: true);
912 expect(() => queue1.next, throwsStateError);
913
914 expect(await queue2.next, 1);
915 expect(await queue2.next, 2);
916 expect(await queue2.next, 3);
917 expect(await queue2.next, 4);
918 expect(await queue2.hasNext, isFalse);
919 });
920
921 test("cancelling a source doesn't cancel its materialized fork",
922 () async {
923 var queue1 = new StreamQueue<int>(createStream());
924 var queue2 = queue1.fork();
925
926 expect(await queue1.next, 1);
927
928 queue1.cancel(immediate: true);
929 expect(() => queue1.next, throwsStateError);
930
931 expect(await queue2.next, 1);
932 expect(await queue2.next, 2);
933 expect(await queue2.next, 3);
934 expect(await queue2.next, 4);
935 expect(await queue2.hasNext, isFalse);
936 });
937
938 test("the underlying stream is only canceled once all forks are "
939 "canceled", () async {
940 var controller = new StreamController();
941 var queue1 = new StreamQueue<int>(controller.stream);
942 var queue2 = queue1.fork();
943
944 await flushMicrotasks();
945 expect(controller.hasListener, isFalse);
946
947 expect(queue1.next, throwsStateError);
948 await flushMicrotasks();
949 expect(controller.hasListener, isTrue);
950
951 queue2.cancel(immediate: true);
952 await flushMicrotasks();
953 expect(controller.hasListener, isTrue);
954
955 queue1.cancel(immediate: true);
956 await flushMicrotasks();
957 expect(controller.hasListener, isFalse);
958 });
959 });
960 });
961
962 group("pausing:", () {
963 test("the underlying stream is only implicitly paused when no forks are "
964 "awaiting input", () async {
965 var controller = new StreamController();
966 var queue1 = new StreamQueue<int>(controller.stream);
967 var queue2 = queue1.fork();
968
969 controller.add(1);
970 expect(await queue1.next, 1);
971 expect(controller.hasListener, isTrue);
972 expect(controller.isPaused, isTrue);
973
974 expect(queue1.next, completion(2));
975 await flushMicrotasks();
976 expect(controller.isPaused, isFalse);
977
978 controller.add(2);
979 await flushMicrotasks();
980 expect(controller.isPaused, isTrue);
981
982 expect(queue2.next, completion(1));
983 expect(queue2.next, completion(2));
984 expect(queue2.next, completion(3));
985 await flushMicrotasks();
986 expect(controller.isPaused, isFalse);
987
988 controller.add(3);
989 await flushMicrotasks();
990 expect(controller.isPaused, isTrue);
991 });
992
993 test("pausing a fork doesn't pause its source", () async {
994 var queue1 = new StreamQueue<int>(createStream());
995 var queue2 = queue1.fork();
996
997 queue2.rest.listen(expectAsync((_) {}, count: 0)).pause();
998
999 expect(await queue1.next, 1);
1000 expect(await queue1.next, 2);
1001 expect(await queue1.next, 3);
1002 expect(await queue1.next, 4);
1003 expect(await queue1.hasNext, isFalse);
1004 });
1005
1006 test("pausing a source doesn't pause its fork", () async {
1007 var queue1 = new StreamQueue<int>(createStream());
1008 var queue2 = queue1.fork();
1009
1010 queue1.rest.listen(expectAsync((_) {}, count: 0)).pause();
1011
1012 expect(await queue2.next, 1);
1013 expect(await queue2.next, 2);
1014 expect(await queue2.next, 3);
1015 expect(await queue2.next, 4);
1016 expect(await queue2.hasNext, isFalse);
1017 });
1018
1019 test("the underlying stream is only paused when all forks are paused",
1020 () async {
1021 var controller = new StreamController();
1022 var queue1 = new StreamQueue<int>(controller.stream);
1023 var queue2 = queue1.fork();
1024
1025 await flushMicrotasks();
1026 expect(controller.hasListener, isFalse);
1027
1028 var sub1 = queue1.rest.listen(null);
1029 await flushMicrotasks();
1030 expect(controller.hasListener, isTrue);
1031 expect(controller.isPaused, isFalse);
1032
1033 sub1.pause();
1034 await flushMicrotasks();
1035 expect(controller.isPaused, isTrue);
1036
1037 expect(queue2.next, completion(1));
1038 await flushMicrotasks();
1039 expect(controller.isPaused, isFalse);
1040
1041 controller.add(1);
1042 await flushMicrotasks();
1043 expect(controller.isPaused, isTrue);
1044
1045 var sub2 = queue2.rest.listen(null);
1046 await flushMicrotasks();
1047 expect(controller.isPaused, isFalse);
1048
1049 sub2.pause();
1050 await flushMicrotasks();
1051 expect(controller.isPaused, isTrue);
1052
1053 sub1.resume();
1054 await flushMicrotasks();
1055 expect(controller.isPaused, isFalse);
1056 });
1057 });
1058 });
1059
615 test("all combinations sequential skip/next/take operations", () async { 1060 test("all combinations sequential skip/next/take operations", () async {
616 // Takes all combinations of two of next, skip and take, then ends with 1061 // Takes all combinations of two of next, skip and take, then ends with
617 // doing rest. Each of the first rounds do 10 events of each type, 1062 // doing rest. Each of the first rounds do 10 events of each type,
618 // the rest does 20 elements. 1063 // the rest does 20 elements.
619 var eventCount = 20 * (3 * 3 + 1); 1064 var eventCount = 20 * (3 * 3 + 1);
620 var events = new StreamQueue<int>(createLongStream(eventCount)); 1065 var events = new StreamQueue<int>(createLongStream(eventCount));
621 1066
622 // Test expecting [startIndex .. startIndex + 9] as events using 1067 // Test expecting [startIndex .. startIndex + 9] as events using
623 // `next`. 1068 // `next`.
624 nextTest(startIndex) { 1069 nextTest(startIndex) {
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
677 controller.add(4); 1122 controller.add(4);
678 await flushMicrotasks(); 1123 await flushMicrotasks();
679 controller.close(); 1124 controller.close();
680 }(); 1125 }();
681 return controller.stream; 1126 return controller.stream;
682 } 1127 }
683 1128
684 Stream<int> createLongStream(int eventCount) async* { 1129 Stream<int> createLongStream(int eventCount) async* {
685 for (int i = 0; i < eventCount; i++) yield i; 1130 for (int i = 0; i < eventCount; i++) yield i;
686 } 1131 }
OLDNEW
« lib/src/stream_queue.dart ('K') | « test/forkable_stream_test.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698