Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Throws the given error in the next cycle. */ | 7 /** Throws the given error in the next cycle. */ |
| 8 _throwDelayed(var error, [Object stackTrace]) { | 8 _throwDelayed(var error, [Object stackTrace]) { |
| 9 // We are going to reach the top-level here, but there might be a global | 9 // We are going to reach the top-level here, but there might be a global |
| 10 // exception handler. This means that we shouldn't print the stack trace. | 10 // exception handler. This means that we shouldn't print the stack trace. |
| (...skipping 580 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 591 dispatch._sendDone(); | 591 dispatch._sendDone(); |
| 592 } | 592 } |
| 593 | 593 |
| 594 _DelayedEvent get next => null; | 594 _DelayedEvent get next => null; |
| 595 | 595 |
| 596 void set next(_DelayedEvent _) { | 596 void set next(_DelayedEvent _) { |
| 597 throw new StateError("No events after a done."); | 597 throw new StateError("No events after a done."); |
| 598 } | 598 } |
| 599 } | 599 } |
| 600 | 600 |
| 601 /** | |
| 602 * Simple internal doubly-linked list implementation. | |
| 603 * | |
| 604 * In an internal linked list, the links are in the data objects themselves, | |
| 605 * instead of in a separate object. That means each element can be in at most | |
| 606 * one list at a time. | |
| 607 * | |
| 608 * All links are always members of an element cycle. At creation it's a | |
| 609 * singleton cycle. | |
| 610 */ | |
| 611 abstract class _InternalLink { | |
| 612 _InternalLink _nextLink; | |
| 613 _InternalLink _previousLink; | |
| 614 | |
| 615 _InternalLink() { | |
| 616 this._previousLink = this._nextLink = this; | |
| 617 } | |
| 618 | |
| 619 /* Removes a link from any list it may be part of, and links it to itself. */ | |
| 620 static void unlink(_InternalLink element) { | |
| 621 _InternalLink next = element._nextLink; | |
| 622 _InternalLink previous = element._previousLink; | |
| 623 next._previousLink = previous; | |
| 624 previous._nextLink = next; | |
| 625 element._nextLink = element._previousLink = element; | |
| 626 } | |
| 627 | |
| 628 /** Check whether an element is unattached to other elements. */ | |
| 629 static bool isUnlinked(_InternalLink element) { | |
| 630 return identical(element, element._nextLink); | |
| 631 } | |
| 632 } | |
| 633 | |
| 634 /** | |
| 635 * Marker interface for "list" links. | |
| 636 * | |
| 637 * An "InternalLinkList" is an abstraction on top of a link cycle, where the | |
| 638 * "list" object itself is not considered an element (it's just a header link | |
| 639 * created to avoid edge cases). | |
| 640 * An element is considered part of a list if it is in the list's cycle. | |
| 641 * There should never be more than one "list" object in a cycle. | |
| 642 */ | |
| 643 abstract class _InternalLinkList extends _InternalLink { | |
| 644 /** | |
| 645 * Adds an element to a list, just before the header link. | |
| 646 * | |
| 647 * This effectively adds it at the end of the list. | |
| 648 */ | |
| 649 static void add(_InternalLinkList list, _InternalLink element) { | |
| 650 if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element); | |
| 651 _InternalLink listEnd = list._previousLink; | |
| 652 listEnd._nextLink = element; | |
| 653 list._previousLink = element; | |
| 654 element._previousLink = listEnd; | |
| 655 element._nextLink = list; | |
| 656 } | |
| 657 | |
| 658 /** Removes an element from its list. */ | |
| 659 static void remove(_InternalLink element) { | |
| 660 _InternalLink.unlink(element); | |
| 661 } | |
| 662 | |
| 663 /** Check whether a list contains no elements, only the header link. */ | |
| 664 static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list); | |
| 665 | |
| 666 /** Moves all elements from the list [other] to [list]. */ | |
| 667 static void addAll(_InternalLinkList list, _InternalLinkList other) { | |
| 668 if (isEmpty(other)) return; | |
| 669 _InternalLink listLast = list._previousLink; | |
| 670 _InternalLink otherNext = other._nextLink; | |
| 671 listLast._nextLink = otherNext; | |
| 672 otherNext._previousLink = listLast; | |
| 673 _InternalLink otherLast = other._previousLink; | |
| 674 list._previousLink = otherLast; | |
| 675 otherLast._nextLink = list; | |
| 676 // Clean up [other]. | |
| 677 other._nextLink = other._previousLink = other; | |
| 678 } | |
| 679 } | |
| 680 | |
| 681 /** Superclass for provider of pending events. */ | 601 /** Superclass for provider of pending events. */ |
| 682 abstract class _PendingEvents { | 602 abstract class _PendingEvents { |
| 683 // No async event has been scheduled. | 603 // No async event has been scheduled. |
| 684 static const int _STATE_UNSCHEDULED = 0; | 604 static const int _STATE_UNSCHEDULED = 0; |
| 685 // An async event has been scheduled to run a function. | 605 // An async event has been scheduled to run a function. |
| 686 static const int _STATE_SCHEDULED = 1; | 606 static const int _STATE_SCHEDULED = 1; |
| 687 // An async event has been scheduled, but it will do nothing when it runs. | 607 // An async event has been scheduled, but it will do nothing when it runs. |
| 688 // Async events can't be preempted. | 608 // Async events can't be preempted. |
| 689 static const int _STATE_CANCELED = 3; | 609 static const int _STATE_CANCELED = 3; |
| 690 | 610 |
| (...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 786 | 706 |
| 787 void _insertBefore(_MultiplexerLinkedList newNext) { | 707 void _insertBefore(_MultiplexerLinkedList newNext) { |
| 788 _MultiplexerLinkedList newPrevious = newNext._previous; | 708 _MultiplexerLinkedList newPrevious = newNext._previous; |
| 789 newPrevious._next = this; | 709 newPrevious._next = this; |
| 790 newNext._previous = _previous; | 710 newNext._previous = _previous; |
| 791 _previous._next = newNext; | 711 _previous._next = newNext; |
| 792 _previous = newPrevious; | 712 _previous = newPrevious; |
| 793 } | 713 } |
| 794 } | 714 } |
| 795 | 715 |
| 716 class _AsBroadcastStream<T> extends Stream<T> { | |
| 717 final Stream<T> _source; | |
| 718 _BufferingMultiplexStreamController<T> _controller; | |
| 719 StreamSubscription<T> _subscription; | |
| 720 | |
| 721 _AsBroadcastStream(this._source) { | |
| 722 _controller = new _BufferingMultiplexStreamController<T>(null, _close); | |
| 723 } | |
| 724 | |
| 725 bool get isBroadcast => true; | |
| 726 | |
| 727 StreamSubscription<T> listen(void onData(T data), | |
| 728 { void onError(Object error), | |
| 729 void onDone(), | |
| 730 bool cancelOnError}) { | |
| 731 if (_controller == null) { | |
| 732 throw new StateError("Source stream has been closed."); | |
| 733 } | |
| 734 if (_subscription == null) { | |
| 735 _subscription = _source.listen(_controller.add, | |
| 736 onError: _controller.addError, | |
| 737 onDone: _controller.close); | |
| 738 } | |
| 739 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | |
| 740 cancelOnError: cancelOnError); | |
| 741 } | |
| 742 | |
| 743 void _close() { | |
|
floitsch
2013/05/29 09:39:58
call it _onCancel.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
| |
| 744 StreamSubscription subscription = _subscription; | |
| 745 _subscription = null; | |
| 746 _controller = null; | |
| 747 subscription.cancel(); | |
| 748 } | |
| 749 } | |
| 750 | |
| 796 /** | 751 /** |
| 797 * A subscription used by [_SingleStreamMultiplexer]. | |
| 798 * | |
| 799 * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple | |
| 800 * listeners at a time. It is used to implement [Stream.asBroadcastStream]. | |
| 801 * | |
| 802 * It is itself listening to another stream for events, and it forwards all | |
| 803 * events to all of its simultanous listeners. | |
| 804 * | |
| 805 * The listeners are [_MultiplexerSubscription]s and are kept as a linked list. | |
| 806 */ | |
| 807 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors | |
| 808 // are implemented. | |
| 809 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> | |
| 810 implements _MultiplexerLinkedList { | |
| 811 static const int _STATE_NOT_LISTENING = 0; | |
| 812 // Bit that alternates between event firings. If bit matches the one currently | |
| 813 // firing, the subscription will not be notified. | |
| 814 static const int _STATE_EVENT_ID_BIT = 1; | |
| 815 // Whether the subscription is listening at all. This should be set while | |
| 816 // it is part of the linked list of listeners of a multiplexer stream. | |
| 817 static const int _STATE_LISTENING = 2; | |
| 818 // State bit set while firing an event. | |
| 819 static const int _STATE_IS_FIRING = 4; | |
| 820 // Bit set if a subscription is canceled while it's firing (the | |
| 821 // [_STATE_IS_FIRING] bit is set). | |
| 822 // If the subscription is canceled while firing, it is not removed from the | |
| 823 // linked list immediately (to avoid breaking iteration), but is instead | |
| 824 // removed after it is done firing. | |
| 825 static const int _STATE_REMOVE_AFTER_FIRING = 8; | |
| 826 | |
| 827 // Firing state. | |
| 828 int _multiplexState; | |
| 829 | |
| 830 _SingleStreamMultiplexer _source; | |
| 831 | |
| 832 _MultiplexerSubscription(this._source, | |
| 833 void onData(T data), | |
| 834 void onError(Object error), | |
| 835 void onDone(), | |
| 836 bool cancelOnError, | |
| 837 int nextEventId) | |
| 838 : _multiplexState = _STATE_LISTENING | nextEventId, | |
| 839 super(onData, onError, onDone, cancelOnError) { | |
| 840 _next = _previous = this; | |
| 841 } | |
| 842 | |
| 843 // Mixin workaround. | |
| 844 _MultiplexerLinkedList _next; | |
| 845 _MultiplexerLinkedList _previous; | |
| 846 | |
| 847 void _unlink() { | |
| 848 _previous._next = _next; | |
| 849 _next._previous = _previous; | |
| 850 _next = _previous = this; | |
| 851 } | |
| 852 | |
| 853 void _insertBefore(_MultiplexerLinkedList newNext) { | |
| 854 _MultiplexerLinkedList newPrevious = newNext._previous; | |
| 855 newPrevious._next = this; | |
| 856 newNext._previous = _previous; | |
| 857 _previous._next = newNext; | |
| 858 _previous = newPrevious; | |
| 859 } | |
| 860 // End mixin. | |
| 861 | |
| 862 bool get _isListening => _multiplexState >= _STATE_LISTENING; | |
| 863 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; | |
| 864 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; | |
| 865 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; | |
| 866 | |
| 867 void _setRemoveAfterFiring() { | |
| 868 assert(_isFiring); | |
| 869 _multiplexState |= _STATE_REMOVE_AFTER_FIRING; | |
| 870 } | |
| 871 | |
| 872 void _startFiring() { | |
| 873 assert(!_isFiring); | |
| 874 _multiplexState |= _STATE_IS_FIRING; | |
| 875 } | |
| 876 | |
| 877 /// Marks listener as no longer firing, and toggles its event id. | |
| 878 void _endFiring() { | |
| 879 assert(_isFiring); | |
| 880 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); | |
| 881 } | |
| 882 | |
| 883 void _setNotListening() { | |
| 884 assert(_isListening); | |
| 885 _multiplexState = _STATE_NOT_LISTENING; | |
| 886 } | |
| 887 | |
| 888 void _onCancel() { | |
| 889 assert(_isListening); | |
| 890 _source._removeListener(this); | |
| 891 } | |
| 892 } | |
| 893 | |
| 894 /** | |
| 895 * A stream that sends events from another stream to multiple listeners. | |
| 896 * | |
| 897 * This is used to implement [Stream.asBroadcastStream]. | |
| 898 * | |
| 899 * This stream allows listening more than once. | |
| 900 * When the first listener is added, it starts listening on its source | |
| 901 * stream for events. All events from the source stream are sent to all | |
| 902 * active listeners. The listeners handle their own buffering. | |
| 903 * When the last listener cancels, the source stream subscription is also | |
| 904 * canceled, and no further listening is possible. | |
| 905 */ | |
| 906 // TODO(lrn): change "implements" to "with" when the VM supports it. | |
| 907 class _SingleStreamMultiplexer<T> extends Stream<T> | |
| 908 implements _MultiplexerLinkedList, | |
| 909 _EventDispatch<T> { | |
| 910 final Stream<T> _source; | |
| 911 StreamSubscription<T> _subscription; | |
| 912 // Alternates between zero and one for each event fired. | |
| 913 // Listeners are initialized with the next event's id, and will | |
| 914 // only be notified if they match the event being fired. | |
| 915 // That way listeners added during event firing will not receive | |
| 916 // the current event. | |
| 917 int _eventId = 0; | |
| 918 | |
| 919 bool _isFiring = false; | |
| 920 | |
| 921 // Remember events added while firing. | |
| 922 _StreamImplEvents _pending; | |
| 923 | |
| 924 _SingleStreamMultiplexer(this._source) { | |
| 925 _next = _previous = this; | |
| 926 } | |
| 927 | |
| 928 bool get _hasPending => _pending != null && !_pending.isEmpty; | |
| 929 | |
| 930 // Should be mixin. | |
| 931 _MultiplexerLinkedList _next; | |
| 932 _MultiplexerLinkedList _previous; | |
| 933 | |
| 934 void _unlink() { | |
| 935 _previous._next = _next; | |
| 936 _next._previous = _previous; | |
| 937 _next = _previous = this; | |
| 938 } | |
| 939 | |
| 940 void _insertBefore(_MultiplexerLinkedList newNext) { | |
| 941 _MultiplexerLinkedList newPrevious = newNext._previous; | |
| 942 newPrevious._next = this; | |
| 943 newNext._previous = _previous; | |
| 944 _previous._next = newNext; | |
| 945 _previous = newPrevious; | |
| 946 } | |
| 947 // End of mixin. | |
| 948 | |
| 949 StreamSubscription<T> listen(void onData(T data), | |
| 950 { void onError(Object error), | |
| 951 void onDone(), | |
| 952 bool cancelOnError }) { | |
| 953 if (onData == null) onData = _nullDataHandler; | |
| 954 if (onError == null) onError = _nullErrorHandler; | |
| 955 if (onDone == null) onDone = _nullDoneHandler; | |
| 956 cancelOnError = identical(true, cancelOnError); | |
| 957 _MultiplexerSubscription subscription = | |
| 958 new _MultiplexerSubscription(this, onData, onError, onDone, | |
| 959 cancelOnError, _eventId); | |
| 960 if (_subscription == null) { | |
| 961 _subscription = _source.listen(_add, onError: _addError, onDone: _close); | |
| 962 } | |
| 963 subscription._insertBefore(this); | |
| 964 return subscription; | |
| 965 } | |
| 966 | |
| 967 /** Called by [_MultiplexerSubscription.remove]. */ | |
| 968 void _removeListener(_MultiplexerSubscription listener) { | |
| 969 if (listener._isFiring) { | |
| 970 listener._setRemoveAfterFiring(); | |
| 971 } else { | |
| 972 _unlinkListener(listener); | |
| 973 } | |
| 974 } | |
| 975 | |
| 976 /** Remove a listener and close the subscription after the last one. */ | |
| 977 void _unlinkListener(_MultiplexerSubscription listener) { | |
| 978 listener._setNotListening(); | |
| 979 listener._unlink(); | |
| 980 if (identical(_next, this)) { | |
| 981 // Last listener removed. | |
| 982 _cancel(); | |
| 983 } | |
| 984 } | |
| 985 | |
| 986 void _cancel() { | |
| 987 StreamSubscription subscription = _subscription; | |
| 988 _subscription = null; | |
| 989 subscription.cancel(); | |
| 990 if (_pending != null) _pending.cancelSchedule(); | |
| 991 } | |
| 992 | |
| 993 void _forEachListener(void action(_MultiplexerSubscription listener)) { | |
| 994 int eventId = _eventId; | |
| 995 _eventId ^= 1; | |
| 996 _isFiring = true; | |
| 997 _MultiplexerLinkedList entry = _next; | |
| 998 // Call each listener in order. A listener can be removed during any | |
| 999 // other listener's event. During its own event it will only be marked | |
| 1000 // as "to be removed", and it will be handled after the event is done. | |
| 1001 while (!identical(entry, this)) { | |
| 1002 _MultiplexerSubscription listener = entry; | |
| 1003 if (listener._eventId == eventId) { | |
| 1004 listener._startFiring(); | |
| 1005 action(listener); | |
| 1006 listener._endFiring(); // Also toggles the event id. | |
| 1007 } | |
| 1008 entry = listener._next; | |
| 1009 if (listener._removeAfterFiring) { | |
| 1010 _unlinkListener(listener); | |
| 1011 } | |
| 1012 } | |
| 1013 _isFiring = false; | |
| 1014 } | |
| 1015 | |
| 1016 void _add(T data) { | |
| 1017 if (_isFiring || _hasPending) { | |
| 1018 _StreamImplEvents pending = _pending; | |
| 1019 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
| 1020 pending.add(new _DelayedData(data)); | |
| 1021 } else { | |
| 1022 _sendData(data); | |
| 1023 } | |
| 1024 } | |
| 1025 | |
| 1026 void _addError(Object error) { | |
| 1027 if (_isFiring || _hasPending) { | |
| 1028 _StreamImplEvents pending = _pending; | |
| 1029 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
| 1030 pending.add(new _DelayedError(error)); | |
| 1031 } else { | |
| 1032 _sendError(error); | |
| 1033 } | |
| 1034 } | |
| 1035 | |
| 1036 void _close() { | |
| 1037 if (_isFiring || _hasPending) { | |
| 1038 _StreamImplEvents pending = _pending; | |
| 1039 if (pending == null) pending = _pending = new _StreamImplEvents(); | |
| 1040 pending.add(const _DelayedDone()); | |
| 1041 } else { | |
| 1042 _sendDone(); | |
| 1043 } | |
| 1044 } | |
| 1045 | |
| 1046 void _sendData(T data) { | |
| 1047 _forEachListener((_MultiplexerSubscription listener) { | |
| 1048 listener._add(data); | |
| 1049 }); | |
| 1050 if (_hasPending) { | |
| 1051 _pending.schedule(this); | |
| 1052 } | |
| 1053 } | |
| 1054 | |
| 1055 void _sendError(Object error) { | |
| 1056 _forEachListener((_MultiplexerSubscription listener) { | |
| 1057 listener._addError(error); | |
| 1058 }); | |
| 1059 if (_hasPending) { | |
| 1060 _pending.schedule(this); | |
| 1061 } | |
| 1062 } | |
| 1063 | |
| 1064 void _sendDone() { | |
| 1065 _forEachListener((_MultiplexerSubscription listener) { | |
| 1066 listener._setRemoveAfterFiring(); | |
| 1067 listener._close(); | |
| 1068 }); | |
| 1069 } | |
| 1070 } | |
| 1071 | |
| 1072 | |
| 1073 /** | |
| 1074 * Simple implementation of [StreamIterator]. | 752 * Simple implementation of [StreamIterator]. |
| 1075 */ | 753 */ |
| 1076 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 754 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| 1077 // Internal state of the stream iterator. | 755 // Internal state of the stream iterator. |
| 1078 // At any time, it is in one of these states. | 756 // At any time, it is in one of these states. |
| 1079 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 757 // The interpretation of the [_futureOrPrefecth] field depends on the state. |
| 1080 // In _STATE_MOVING, the _data field holds the most recently returned | 758 // In _STATE_MOVING, the _data field holds the most recently returned |
| 1081 // future. | 759 // future. |
| 1082 // When in one of the _STATE_EXTRA_* states, the it may hold the | 760 // When in one of the _STATE_EXTRA_* states, the it may hold the |
| 1083 // next data/error object, and the subscription is paused. | 761 // next data/error object, and the subscription is paused. |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1213 _FutureImpl<bool> hasNext = _futureOrPrefetch; | 891 _FutureImpl<bool> hasNext = _futureOrPrefetch; |
| 1214 _clear(); | 892 _clear(); |
| 1215 hasNext._setValue(false); | 893 hasNext._setValue(false); |
| 1216 return; | 894 return; |
| 1217 } | 895 } |
| 1218 _subscription.pause(); | 896 _subscription.pause(); |
| 1219 _futureOrPrefetch = null; | 897 _futureOrPrefetch = null; |
| 1220 _state = _STATE_EXTRA_DONE; | 898 _state = _STATE_EXTRA_DONE; |
| 1221 } | 899 } |
| 1222 } | 900 } |
| OLD | NEW |