OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Throws the given error in the next cycle. */ | 7 /** Throws the given error in the next cycle. */ |
8 _throwDelayed(var error, [Object stackTrace]) { | 8 _throwDelayed(var error, [Object stackTrace]) { |
9 // We are going to reach the top-level here, but there might be a global | 9 // We are going to reach the top-level here, but there might be a global |
10 // exception handler. This means that we shouldn't print the stack trace. | 10 // exception handler. This means that we shouldn't print the stack trace. |
(...skipping 580 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
591 dispatch._sendDone(); | 591 dispatch._sendDone(); |
592 } | 592 } |
593 | 593 |
594 _DelayedEvent get next => null; | 594 _DelayedEvent get next => null; |
595 | 595 |
596 void set next(_DelayedEvent _) { | 596 void set next(_DelayedEvent _) { |
597 throw new StateError("No events after a done."); | 597 throw new StateError("No events after a done."); |
598 } | 598 } |
599 } | 599 } |
600 | 600 |
601 /** | |
602 * Simple internal doubly-linked list implementation. | |
603 * | |
604 * In an internal linked list, the links are in the data objects themselves, | |
605 * instead of in a separate object. That means each element can be in at most | |
606 * one list at a time. | |
607 * | |
608 * All links are always members of an element cycle. At creation it's a | |
609 * singleton cycle. | |
610 */ | |
611 abstract class _InternalLink { | |
612 _InternalLink _nextLink; | |
613 _InternalLink _previousLink; | |
614 | |
615 _InternalLink() { | |
616 this._previousLink = this._nextLink = this; | |
617 } | |
618 | |
619 /* Removes a link from any list it may be part of, and links it to itself. */ | |
620 static void unlink(_InternalLink element) { | |
621 _InternalLink next = element._nextLink; | |
622 _InternalLink previous = element._previousLink; | |
623 next._previousLink = previous; | |
624 previous._nextLink = next; | |
625 element._nextLink = element._previousLink = element; | |
626 } | |
627 | |
628 /** Check whether an element is unattached to other elements. */ | |
629 static bool isUnlinked(_InternalLink element) { | |
630 return identical(element, element._nextLink); | |
631 } | |
632 } | |
633 | |
634 /** | |
635 * Marker interface for "list" links. | |
636 * | |
637 * An "InternalLinkList" is an abstraction on top of a link cycle, where the | |
638 * "list" object itself is not considered an element (it's just a header link | |
639 * created to avoid edge cases). | |
640 * An element is considered part of a list if it is in the list's cycle. | |
641 * There should never be more than one "list" object in a cycle. | |
642 */ | |
643 abstract class _InternalLinkList extends _InternalLink { | |
644 /** | |
645 * Adds an element to a list, just before the header link. | |
646 * | |
647 * This effectively adds it at the end of the list. | |
648 */ | |
649 static void add(_InternalLinkList list, _InternalLink element) { | |
650 if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element); | |
651 _InternalLink listEnd = list._previousLink; | |
652 listEnd._nextLink = element; | |
653 list._previousLink = element; | |
654 element._previousLink = listEnd; | |
655 element._nextLink = list; | |
656 } | |
657 | |
658 /** Removes an element from its list. */ | |
659 static void remove(_InternalLink element) { | |
660 _InternalLink.unlink(element); | |
661 } | |
662 | |
663 /** Check whether a list contains no elements, only the header link. */ | |
664 static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list); | |
665 | |
666 /** Moves all elements from the list [other] to [list]. */ | |
667 static void addAll(_InternalLinkList list, _InternalLinkList other) { | |
668 if (isEmpty(other)) return; | |
669 _InternalLink listLast = list._previousLink; | |
670 _InternalLink otherNext = other._nextLink; | |
671 listLast._nextLink = otherNext; | |
672 otherNext._previousLink = listLast; | |
673 _InternalLink otherLast = other._previousLink; | |
674 list._previousLink = otherLast; | |
675 otherLast._nextLink = list; | |
676 // Clean up [other]. | |
677 other._nextLink = other._previousLink = other; | |
678 } | |
679 } | |
680 | |
681 /** Superclass for provider of pending events. */ | 601 /** Superclass for provider of pending events. */ |
682 abstract class _PendingEvents { | 602 abstract class _PendingEvents { |
683 // No async event has been scheduled. | 603 // No async event has been scheduled. |
684 static const int _STATE_UNSCHEDULED = 0; | 604 static const int _STATE_UNSCHEDULED = 0; |
685 // An async event has been scheduled to run a function. | 605 // An async event has been scheduled to run a function. |
686 static const int _STATE_SCHEDULED = 1; | 606 static const int _STATE_SCHEDULED = 1; |
687 // An async event has been scheduled, but it will do nothing when it runs. | 607 // An async event has been scheduled, but it will do nothing when it runs. |
688 // Async events can't be preempted. | 608 // Async events can't be preempted. |
689 static const int _STATE_CANCELED = 3; | 609 static const int _STATE_CANCELED = 3; |
690 | 610 |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
786 | 706 |
787 void _insertBefore(_MultiplexerLinkedList newNext) { | 707 void _insertBefore(_MultiplexerLinkedList newNext) { |
788 _MultiplexerLinkedList newPrevious = newNext._previous; | 708 _MultiplexerLinkedList newPrevious = newNext._previous; |
789 newPrevious._next = this; | 709 newPrevious._next = this; |
790 newNext._previous = _previous; | 710 newNext._previous = _previous; |
791 _previous._next = newNext; | 711 _previous._next = newNext; |
792 _previous = newPrevious; | 712 _previous = newPrevious; |
793 } | 713 } |
794 } | 714 } |
795 | 715 |
716 class _AsBroadcastStream<T> extends Stream<T> { | |
717 final Stream<T> _source; | |
718 _BufferingMultiplexStreamController<T> _controller; | |
719 StreamSubscription<T> _subscription; | |
720 | |
721 _AsBroadcastStream(this._source) { | |
722 _controller = new _BufferingMultiplexStreamController<T>(null, _close); | |
723 } | |
724 | |
725 bool get isBroadcast => true; | |
726 | |
727 StreamSubscription<T> listen(void onData(T data), | |
728 { void onError(Object error), | |
729 void onDone(), | |
730 bool cancelOnError}) { | |
731 if (_controller == null) { | |
732 throw new StateError("Source stream has been closed."); | |
733 } | |
734 if (_subscription == null) { | |
735 _subscription = _source.listen(_controller.add, | |
736 onError: _controller.addError, | |
737 onDone: _controller.close); | |
738 } | |
739 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | |
740 cancelOnError: cancelOnError); | |
741 } | |
742 | |
743 void _close() { | |
floitsch
2013/05/29 09:39:58
call it _onCancel.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
| |
744 StreamSubscription subscription = _subscription; | |
745 _subscription = null; | |
746 _controller = null; | |
747 subscription.cancel(); | |
748 } | |
749 } | |
750 | |
796 /** | 751 /** |
797 * A subscription used by [_SingleStreamMultiplexer]. | |
798 * | |
799 * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple | |
800 * listeners at a time. It is used to implement [Stream.asBroadcastStream]. | |
801 * | |
802 * It is itself listening to another stream for events, and it forwards all | |
803 * events to all of its simultanous listeners. | |
804 * | |
805 * The listeners are [_MultiplexerSubscription]s and are kept as a linked list. | |
806 */ | |
807 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors | |
808 // are implemented. | |
809 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> | |
810 implements _MultiplexerLinkedList { | |
811 static const int _STATE_NOT_LISTENING = 0; | |
812 // Bit that alternates between event firings. If bit matches the one currently | |
813 // firing, the subscription will not be notified. | |
814 static const int _STATE_EVENT_ID_BIT = 1; | |
815 // Whether the subscription is listening at all. This should be set while | |
816 // it is part of the linked list of listeners of a multiplexer stream. | |
817 static const int _STATE_LISTENING = 2; | |
818 // State bit set while firing an event. | |
819 static const int _STATE_IS_FIRING = 4; | |
820 // Bit set if a subscription is canceled while it's firing (the | |
821 // [_STATE_IS_FIRING] bit is set). | |
822 // If the subscription is canceled while firing, it is not removed from the | |
823 // linked list immediately (to avoid breaking iteration), but is instead | |
824 // removed after it is done firing. | |
825 static const int _STATE_REMOVE_AFTER_FIRING = 8; | |
826 | |
827 // Firing state. | |
828 int _multiplexState; | |
829 | |
830 _SingleStreamMultiplexer _source; | |
831 | |
832 _MultiplexerSubscription(this._source, | |
833 void onData(T data), | |
834 void onError(Object error), | |
835 void onDone(), | |
836 bool cancelOnError, | |
837 int nextEventId) | |
838 : _multiplexState = _STATE_LISTENING | nextEventId, | |
839 super(onData, onError, onDone, cancelOnError) { | |
840 _next = _previous = this; | |
841 } | |
842 | |
843 // Mixin workaround. | |
844 _MultiplexerLinkedList _next; | |
845 _MultiplexerLinkedList _previous; | |
846 | |
847 void _unlink() { | |
848 _previous._next = _next; | |
849 _next._previous = _previous; | |
850 _next = _previous = this; | |
851 } | |
852 | |
853 void _insertBefore(_MultiplexerLinkedList newNext) { | |
854 _MultiplexerLinkedList newPrevious = newNext._previous; | |
855 newPrevious._next = this; | |
856 newNext._previous = _previous; | |
857 _previous._next = newNext; | |
858 _previous = newPrevious; | |
859 } | |
860 // End mixin. | |
861 | |
862 bool get _isListening => _multiplexState >= _STATE_LISTENING; | |
863 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; | |
864 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; | |
865 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; | |
866 | |
867 void _setRemoveAfterFiring() { | |
868 assert(_isFiring); | |
869 _multiplexState |= _STATE_REMOVE_AFTER_FIRING; | |
870 } | |
871 | |
872 void _startFiring() { | |
873 assert(!_isFiring); | |
874 _multiplexState |= _STATE_IS_FIRING; | |
875 } | |
876 | |
877 /// Marks listener as no longer firing, and toggles its event id. | |
878 void _endFiring() { | |
879 assert(_isFiring); | |
880 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); | |
881 } | |
882 | |
883 void _setNotListening() { | |
884 assert(_isListening); | |
885 _multiplexState = _STATE_NOT_LISTENING; | |
886 } | |
887 | |
888 void _onCancel() { | |
889 assert(_isListening); | |
890 _source._removeListener(this); | |
891 } | |
892 } | |
893 | |
894 /** | |
895 * A stream that sends events from another stream to multiple listeners. | |
896 * | |
897 * This is used to implement [Stream.asBroadcastStream]. | |
898 * | |
899 * This stream allows listening more than once. | |
900 * When the first listener is added, it starts listening on its source | |
901 * stream for events. All events from the source stream are sent to all | |
902 * active listeners. The listeners handle their own buffering. | |
903 * When the last listener cancels, the source stream subscription is also | |
904 * canceled, and no further listening is possible. | |
905 */ | |
906 // TODO(lrn): change "implements" to "with" when the VM supports it. | |
907 class _SingleStreamMultiplexer<T> extends Stream<T> | |
908 implements _MultiplexerLinkedList, | |
909 _EventDispatch<T> { | |
910 final Stream<T> _source; | |
911 StreamSubscription<T> _subscription; | |
912 // Alternates between zero and one for each event fired. | |
913 // Listeners are initialized with the next event's id, and will | |
914 // only be notified if they match the event being fired. | |
915 // That way listeners added during event firing will not receive | |
916 // the current event. | |
917 int _eventId = 0; | |
918 | |
919 bool _isFiring = false; | |
920 | |
921 // Remember events added while firing. | |
922 _StreamImplEvents _pending; | |
923 | |
924 _SingleStreamMultiplexer(this._source) { | |
925 _next = _previous = this; | |
926 } | |
927 | |
928 bool get _hasPending => _pending != null && !_pending.isEmpty; | |
929 | |
930 // Should be mixin. | |
931 _MultiplexerLinkedList _next; | |
932 _MultiplexerLinkedList _previous; | |
933 | |
934 void _unlink() { | |
935 _previous._next = _next; | |
936 _next._previous = _previous; | |
937 _next = _previous = this; | |
938 } | |
939 | |
940 void _insertBefore(_MultiplexerLinkedList newNext) { | |
941 _MultiplexerLinkedList newPrevious = newNext._previous; | |
942 newPrevious._next = this; | |
943 newNext._previous = _previous; | |
944 _previous._next = newNext; | |
945 _previous = newPrevious; | |
946 } | |
947 // End of mixin. | |
948 | |
949 StreamSubscription<T> listen(void onData(T data), | |
950 { void onError(Object error), | |
951 void onDone(), | |
952 bool cancelOnError }) { | |
953 if (onData == null) onData = _nullDataHandler; | |
954 if (onError == null) onError = _nullErrorHandler; | |
955 if (onDone == null) onDone = _nullDoneHandler; | |
956 cancelOnError = identical(true, cancelOnError); | |
957 _MultiplexerSubscription subscription = | |
958 new _MultiplexerSubscription(this, onData, onError, onDone, | |
959 cancelOnError, _eventId); | |
960 if (_subscription == null) { | |
961 _subscription = _source.listen(_add, onError: _addError, onDone: _close); | |
962 } | |
963 subscription._insertBefore(this); | |
964 return subscription; | |
965 } | |
966 | |
967 /** Called by [_MultiplexerSubscription.remove]. */ | |
968 void _removeListener(_MultiplexerSubscription listener) { | |
969 if (listener._isFiring) { | |
970 listener._setRemoveAfterFiring(); | |
971 } else { | |
972 _unlinkListener(listener); | |
973 } | |
974 } | |
975 | |
976 /** Remove a listener and close the subscription after the last one. */ | |
977 void _unlinkListener(_MultiplexerSubscription listener) { | |
978 listener._setNotListening(); | |
979 listener._unlink(); | |
980 if (identical(_next, this)) { | |
981 // Last listener removed. | |
982 _cancel(); | |
983 } | |
984 } | |
985 | |
986 void _cancel() { | |
987 StreamSubscription subscription = _subscription; | |
988 _subscription = null; | |
989 subscription.cancel(); | |
990 if (_pending != null) _pending.cancelSchedule(); | |
991 } | |
992 | |
993 void _forEachListener(void action(_MultiplexerSubscription listener)) { | |
994 int eventId = _eventId; | |
995 _eventId ^= 1; | |
996 _isFiring = true; | |
997 _MultiplexerLinkedList entry = _next; | |
998 // Call each listener in order. A listener can be removed during any | |
999 // other listener's event. During its own event it will only be marked | |
1000 // as "to be removed", and it will be handled after the event is done. | |
1001 while (!identical(entry, this)) { | |
1002 _MultiplexerSubscription listener = entry; | |
1003 if (listener._eventId == eventId) { | |
1004 listener._startFiring(); | |
1005 action(listener); | |
1006 listener._endFiring(); // Also toggles the event id. | |
1007 } | |
1008 entry = listener._next; | |
1009 if (listener._removeAfterFiring) { | |
1010 _unlinkListener(listener); | |
1011 } | |
1012 } | |
1013 _isFiring = false; | |
1014 } | |
1015 | |
1016 void _add(T data) { | |
1017 if (_isFiring || _hasPending) { | |
1018 _StreamImplEvents pending = _pending; | |
1019 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1020 pending.add(new _DelayedData(data)); | |
1021 } else { | |
1022 _sendData(data); | |
1023 } | |
1024 } | |
1025 | |
1026 void _addError(Object error) { | |
1027 if (_isFiring || _hasPending) { | |
1028 _StreamImplEvents pending = _pending; | |
1029 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1030 pending.add(new _DelayedError(error)); | |
1031 } else { | |
1032 _sendError(error); | |
1033 } | |
1034 } | |
1035 | |
1036 void _close() { | |
1037 if (_isFiring || _hasPending) { | |
1038 _StreamImplEvents pending = _pending; | |
1039 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
1040 pending.add(const _DelayedDone()); | |
1041 } else { | |
1042 _sendDone(); | |
1043 } | |
1044 } | |
1045 | |
1046 void _sendData(T data) { | |
1047 _forEachListener((_MultiplexerSubscription listener) { | |
1048 listener._add(data); | |
1049 }); | |
1050 if (_hasPending) { | |
1051 _pending.schedule(this); | |
1052 } | |
1053 } | |
1054 | |
1055 void _sendError(Object error) { | |
1056 _forEachListener((_MultiplexerSubscription listener) { | |
1057 listener._addError(error); | |
1058 }); | |
1059 if (_hasPending) { | |
1060 _pending.schedule(this); | |
1061 } | |
1062 } | |
1063 | |
1064 void _sendDone() { | |
1065 _forEachListener((_MultiplexerSubscription listener) { | |
1066 listener._setRemoveAfterFiring(); | |
1067 listener._close(); | |
1068 }); | |
1069 } | |
1070 } | |
1071 | |
1072 | |
1073 /** | |
1074 * Simple implementation of [StreamIterator]. | 752 * Simple implementation of [StreamIterator]. |
1075 */ | 753 */ |
1076 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 754 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
1077 // Internal state of the stream iterator. | 755 // Internal state of the stream iterator. |
1078 // At any time, it is in one of these states. | 756 // At any time, it is in one of these states. |
1079 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 757 // The interpretation of the [_futureOrPrefecth] field depends on the state. |
1080 // In _STATE_MOVING, the _data field holds the most recently returned | 758 // In _STATE_MOVING, the _data field holds the most recently returned |
1081 // future. | 759 // future. |
1082 // When in one of the _STATE_EXTRA_* states, the it may hold the | 760 // When in one of the _STATE_EXTRA_* states, the it may hold the |
1083 // next data/error object, and the subscription is paused. | 761 // next data/error object, and the subscription is paused. |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1213 _FutureImpl<bool> hasNext = _futureOrPrefetch; | 891 _FutureImpl<bool> hasNext = _futureOrPrefetch; |
1214 _clear(); | 892 _clear(); |
1215 hasNext._setValue(false); | 893 hasNext._setValue(false); |
1216 return; | 894 return; |
1217 } | 895 } |
1218 _subscription.pause(); | 896 _subscription.pause(); |
1219 _futureOrPrefetch = null; | 897 _futureOrPrefetch = null; |
1220 _state = _STATE_EXTRA_DONE; | 898 _state = _STATE_EXTRA_DONE; |
1221 } | 899 } |
1222 } | 900 } |
OLD | NEW |