Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 15673006: Implement asBroadcast using a _MultiplexStreamController. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Don't keep pending events when calling onCancel. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | tests/lib/async/stream_controller_async_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Throws the given error in the next cycle. */ 7 /** Throws the given error in the next cycle. */
8 _throwDelayed(var error, [Object stackTrace]) { 8 _throwDelayed(var error, [Object stackTrace]) {
9 // We are going to reach the top-level here, but there might be a global 9 // We are going to reach the top-level here, but there might be a global
10 // exception handler. This means that we shouldn't print the stack trace. 10 // exception handler. This means that we shouldn't print the stack trace.
(...skipping 580 matching lines...) Expand 10 before | Expand all | Expand 10 after
591 dispatch._sendDone(); 591 dispatch._sendDone();
592 } 592 }
593 593
594 _DelayedEvent get next => null; 594 _DelayedEvent get next => null;
595 595
596 void set next(_DelayedEvent _) { 596 void set next(_DelayedEvent _) {
597 throw new StateError("No events after a done."); 597 throw new StateError("No events after a done.");
598 } 598 }
599 } 599 }
600 600
601 /**
602 * Simple internal doubly-linked list implementation.
603 *
604 * In an internal linked list, the links are in the data objects themselves,
605 * instead of in a separate object. That means each element can be in at most
606 * one list at a time.
607 *
608 * All links are always members of an element cycle. At creation it's a
609 * singleton cycle.
610 */
611 abstract class _InternalLink {
612 _InternalLink _nextLink;
613 _InternalLink _previousLink;
614
615 _InternalLink() {
616 this._previousLink = this._nextLink = this;
617 }
618
619 /* Removes a link from any list it may be part of, and links it to itself. */
620 static void unlink(_InternalLink element) {
621 _InternalLink next = element._nextLink;
622 _InternalLink previous = element._previousLink;
623 next._previousLink = previous;
624 previous._nextLink = next;
625 element._nextLink = element._previousLink = element;
626 }
627
628 /** Check whether an element is unattached to other elements. */
629 static bool isUnlinked(_InternalLink element) {
630 return identical(element, element._nextLink);
631 }
632 }
633
634 /**
635 * Marker interface for "list" links.
636 *
637 * An "InternalLinkList" is an abstraction on top of a link cycle, where the
638 * "list" object itself is not considered an element (it's just a header link
639 * created to avoid edge cases).
640 * An element is considered part of a list if it is in the list's cycle.
641 * There should never be more than one "list" object in a cycle.
642 */
643 abstract class _InternalLinkList extends _InternalLink {
644 /**
645 * Adds an element to a list, just before the header link.
646 *
647 * This effectively adds it at the end of the list.
648 */
649 static void add(_InternalLinkList list, _InternalLink element) {
650 if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
651 _InternalLink listEnd = list._previousLink;
652 listEnd._nextLink = element;
653 list._previousLink = element;
654 element._previousLink = listEnd;
655 element._nextLink = list;
656 }
657
658 /** Removes an element from its list. */
659 static void remove(_InternalLink element) {
660 _InternalLink.unlink(element);
661 }
662
663 /** Check whether a list contains no elements, only the header link. */
664 static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
665
666 /** Moves all elements from the list [other] to [list]. */
667 static void addAll(_InternalLinkList list, _InternalLinkList other) {
668 if (isEmpty(other)) return;
669 _InternalLink listLast = list._previousLink;
670 _InternalLink otherNext = other._nextLink;
671 listLast._nextLink = otherNext;
672 otherNext._previousLink = listLast;
673 _InternalLink otherLast = other._previousLink;
674 list._previousLink = otherLast;
675 otherLast._nextLink = list;
676 // Clean up [other].
677 other._nextLink = other._previousLink = other;
678 }
679 }
680
681 /** Superclass for provider of pending events. */ 601 /** Superclass for provider of pending events. */
682 abstract class _PendingEvents { 602 abstract class _PendingEvents {
683 // No async event has been scheduled. 603 // No async event has been scheduled.
684 static const int _STATE_UNSCHEDULED = 0; 604 static const int _STATE_UNSCHEDULED = 0;
685 // An async event has been scheduled to run a function. 605 // An async event has been scheduled to run a function.
686 static const int _STATE_SCHEDULED = 1; 606 static const int _STATE_SCHEDULED = 1;
687 // An async event has been scheduled, but it will do nothing when it runs. 607 // An async event has been scheduled, but it will do nothing when it runs.
688 // Async events can't be preempted. 608 // Async events can't be preempted.
689 static const int _STATE_CANCELED = 3; 609 static const int _STATE_CANCELED = 3;
690 610
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
786 706
787 void _insertBefore(_MultiplexerLinkedList newNext) { 707 void _insertBefore(_MultiplexerLinkedList newNext) {
788 _MultiplexerLinkedList newPrevious = newNext._previous; 708 _MultiplexerLinkedList newPrevious = newNext._previous;
789 newPrevious._next = this; 709 newPrevious._next = this;
790 newNext._previous = _previous; 710 newNext._previous = _previous;
791 _previous._next = newNext; 711 _previous._next = newNext;
792 _previous = newPrevious; 712 _previous = newPrevious;
793 } 713 }
794 } 714 }
795 715
716 class _AsBroadcastStream<T> extends Stream<T> {
717 final Stream<T> _source;
718 _BufferingMultiplexStreamController<T> _controller;
719 StreamSubscription<T> _subscription;
720
721 _AsBroadcastStream(this._source) {
722 _controller = new _BufferingMultiplexStreamController<T>(null, _onCancel);
723 }
724
725 bool get isBroadcast => true;
726
727 StreamSubscription<T> listen(void onData(T data),
728 { void onError(Object error),
729 void onDone(),
730 bool cancelOnError}) {
731 if (_controller == null) {
732 throw new StateError("Source stream has been closed.");
733 }
734 if (_subscription == null) {
735 _subscription = _source.listen(_controller.add,
736 onError: _controller.addError,
737 onDone: _controller.close);
738 }
739 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
740 cancelOnError: cancelOnError);
741 }
742
743 void _onCancel() {
744 // Called by [_controller] when it has no subscribers left.
745 StreamSubscription subscription = _subscription;
746 _subscription = null;
747 _controller = null; // Marks the stream as no longer listenable.
748 subscription.cancel();
749 }
750 }
751
796 /** 752 /**
797 * A subscription used by [_SingleStreamMultiplexer].
798 *
799 * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
800 * listeners at a time. It is used to implement [Stream.asBroadcastStream].
801 *
802 * It is itself listening to another stream for events, and it forwards all
803 * events to all of its simultanous listeners.
804 *
805 * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
806 */
807 // TODO(lrn): Change "implements" to "with" when automatic mixin constructors
808 // are implemented.
809 class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
810 implements _MultiplexerLinkedList {
811 static const int _STATE_NOT_LISTENING = 0;
812 // Bit that alternates between event firings. If bit matches the one currently
813 // firing, the subscription will not be notified.
814 static const int _STATE_EVENT_ID_BIT = 1;
815 // Whether the subscription is listening at all. This should be set while
816 // it is part of the linked list of listeners of a multiplexer stream.
817 static const int _STATE_LISTENING = 2;
818 // State bit set while firing an event.
819 static const int _STATE_IS_FIRING = 4;
820 // Bit set if a subscription is canceled while it's firing (the
821 // [_STATE_IS_FIRING] bit is set).
822 // If the subscription is canceled while firing, it is not removed from the
823 // linked list immediately (to avoid breaking iteration), but is instead
824 // removed after it is done firing.
825 static const int _STATE_REMOVE_AFTER_FIRING = 8;
826
827 // Firing state.
828 int _multiplexState;
829
830 _SingleStreamMultiplexer _source;
831
832 _MultiplexerSubscription(this._source,
833 void onData(T data),
834 void onError(Object error),
835 void onDone(),
836 bool cancelOnError,
837 int nextEventId)
838 : _multiplexState = _STATE_LISTENING | nextEventId,
839 super(onData, onError, onDone, cancelOnError) {
840 _next = _previous = this;
841 }
842
843 // Mixin workaround.
844 _MultiplexerLinkedList _next;
845 _MultiplexerLinkedList _previous;
846
847 void _unlink() {
848 _previous._next = _next;
849 _next._previous = _previous;
850 _next = _previous = this;
851 }
852
853 void _insertBefore(_MultiplexerLinkedList newNext) {
854 _MultiplexerLinkedList newPrevious = newNext._previous;
855 newPrevious._next = this;
856 newNext._previous = _previous;
857 _previous._next = newNext;
858 _previous = newPrevious;
859 }
860 // End mixin.
861
862 bool get _isListening => _multiplexState >= _STATE_LISTENING;
863 bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
864 bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
865 int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
866
867 void _setRemoveAfterFiring() {
868 assert(_isFiring);
869 _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
870 }
871
872 void _startFiring() {
873 assert(!_isFiring);
874 _multiplexState |= _STATE_IS_FIRING;
875 }
876
877 /// Marks listener as no longer firing, and toggles its event id.
878 void _endFiring() {
879 assert(_isFiring);
880 _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
881 }
882
883 void _setNotListening() {
884 assert(_isListening);
885 _multiplexState = _STATE_NOT_LISTENING;
886 }
887
888 void _onCancel() {
889 assert(_isListening);
890 _source._removeListener(this);
891 }
892 }
893
894 /**
895 * A stream that sends events from another stream to multiple listeners.
896 *
897 * This is used to implement [Stream.asBroadcastStream].
898 *
899 * This stream allows listening more than once.
900 * When the first listener is added, it starts listening on its source
901 * stream for events. All events from the source stream are sent to all
902 * active listeners. The listeners handle their own buffering.
903 * When the last listener cancels, the source stream subscription is also
904 * canceled, and no further listening is possible.
905 */
906 // TODO(lrn): change "implements" to "with" when the VM supports it.
907 class _SingleStreamMultiplexer<T> extends Stream<T>
908 implements _MultiplexerLinkedList,
909 _EventDispatch<T> {
910 final Stream<T> _source;
911 StreamSubscription<T> _subscription;
912 // Alternates between zero and one for each event fired.
913 // Listeners are initialized with the next event's id, and will
914 // only be notified if they match the event being fired.
915 // That way listeners added during event firing will not receive
916 // the current event.
917 int _eventId = 0;
918
919 bool _isFiring = false;
920
921 // Remember events added while firing.
922 _StreamImplEvents _pending;
923
924 _SingleStreamMultiplexer(this._source) {
925 _next = _previous = this;
926 }
927
928 bool get _hasPending => _pending != null && !_pending.isEmpty;
929
930 // Should be mixin.
931 _MultiplexerLinkedList _next;
932 _MultiplexerLinkedList _previous;
933
934 void _unlink() {
935 _previous._next = _next;
936 _next._previous = _previous;
937 _next = _previous = this;
938 }
939
940 void _insertBefore(_MultiplexerLinkedList newNext) {
941 _MultiplexerLinkedList newPrevious = newNext._previous;
942 newPrevious._next = this;
943 newNext._previous = _previous;
944 _previous._next = newNext;
945 _previous = newPrevious;
946 }
947 // End of mixin.
948
949 StreamSubscription<T> listen(void onData(T data),
950 { void onError(Object error),
951 void onDone(),
952 bool cancelOnError }) {
953 if (onData == null) onData = _nullDataHandler;
954 if (onError == null) onError = _nullErrorHandler;
955 if (onDone == null) onDone = _nullDoneHandler;
956 cancelOnError = identical(true, cancelOnError);
957 _MultiplexerSubscription subscription =
958 new _MultiplexerSubscription(this, onData, onError, onDone,
959 cancelOnError, _eventId);
960 if (_subscription == null) {
961 _subscription = _source.listen(_add, onError: _addError, onDone: _close);
962 }
963 subscription._insertBefore(this);
964 return subscription;
965 }
966
967 /** Called by [_MultiplexerSubscription.remove]. */
968 void _removeListener(_MultiplexerSubscription listener) {
969 if (listener._isFiring) {
970 listener._setRemoveAfterFiring();
971 } else {
972 _unlinkListener(listener);
973 }
974 }
975
976 /** Remove a listener and close the subscription after the last one. */
977 void _unlinkListener(_MultiplexerSubscription listener) {
978 listener._setNotListening();
979 listener._unlink();
980 if (identical(_next, this)) {
981 // Last listener removed.
982 _cancel();
983 }
984 }
985
986 void _cancel() {
987 StreamSubscription subscription = _subscription;
988 _subscription = null;
989 subscription.cancel();
990 if (_pending != null) _pending.cancelSchedule();
991 }
992
993 void _forEachListener(void action(_MultiplexerSubscription listener)) {
994 int eventId = _eventId;
995 _eventId ^= 1;
996 _isFiring = true;
997 _MultiplexerLinkedList entry = _next;
998 // Call each listener in order. A listener can be removed during any
999 // other listener's event. During its own event it will only be marked
1000 // as "to be removed", and it will be handled after the event is done.
1001 while (!identical(entry, this)) {
1002 _MultiplexerSubscription listener = entry;
1003 if (listener._eventId == eventId) {
1004 listener._startFiring();
1005 action(listener);
1006 listener._endFiring(); // Also toggles the event id.
1007 }
1008 entry = listener._next;
1009 if (listener._removeAfterFiring) {
1010 _unlinkListener(listener);
1011 }
1012 }
1013 _isFiring = false;
1014 }
1015
1016 void _add(T data) {
1017 if (_isFiring || _hasPending) {
1018 _StreamImplEvents pending = _pending;
1019 if (pending == null) pending = _pending = new _StreamImplEvents();
1020 pending.add(new _DelayedData(data));
1021 } else {
1022 _sendData(data);
1023 }
1024 }
1025
1026 void _addError(Object error) {
1027 if (_isFiring || _hasPending) {
1028 _StreamImplEvents pending = _pending;
1029 if (pending == null) pending = _pending = new _StreamImplEvents();
1030 pending.add(new _DelayedError(error));
1031 } else {
1032 _sendError(error);
1033 }
1034 }
1035
1036 void _close() {
1037 if (_isFiring || _hasPending) {
1038 _StreamImplEvents pending = _pending;
1039 if (pending == null) pending = _pending = new _StreamImplEvents();
1040 pending.add(const _DelayedDone());
1041 } else {
1042 _sendDone();
1043 }
1044 }
1045
1046 void _sendData(T data) {
1047 _forEachListener((_MultiplexerSubscription listener) {
1048 listener._add(data);
1049 });
1050 if (_hasPending) {
1051 _pending.schedule(this);
1052 }
1053 }
1054
1055 void _sendError(Object error) {
1056 _forEachListener((_MultiplexerSubscription listener) {
1057 listener._addError(error);
1058 });
1059 if (_hasPending) {
1060 _pending.schedule(this);
1061 }
1062 }
1063
1064 void _sendDone() {
1065 _forEachListener((_MultiplexerSubscription listener) {
1066 listener._setRemoveAfterFiring();
1067 listener._close();
1068 });
1069 }
1070 }
1071
1072
1073 /**
1074 * Simple implementation of [StreamIterator]. 753 * Simple implementation of [StreamIterator].
1075 */ 754 */
1076 class _StreamIteratorImpl<T> implements StreamIterator<T> { 755 class _StreamIteratorImpl<T> implements StreamIterator<T> {
1077 // Internal state of the stream iterator. 756 // Internal state of the stream iterator.
1078 // At any time, it is in one of these states. 757 // At any time, it is in one of these states.
1079 // The interpretation of the [_futureOrPrefecth] field depends on the state. 758 // The interpretation of the [_futureOrPrefecth] field depends on the state.
1080 // In _STATE_MOVING, the _data field holds the most recently returned 759 // In _STATE_MOVING, the _data field holds the most recently returned
1081 // future. 760 // future.
1082 // When in one of the _STATE_EXTRA_* states, the it may hold the 761 // When in one of the _STATE_EXTRA_* states, the it may hold the
1083 // next data/error object, and the subscription is paused. 762 // next data/error object, and the subscription is paused.
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
1213 _FutureImpl<bool> hasNext = _futureOrPrefetch; 892 _FutureImpl<bool> hasNext = _futureOrPrefetch;
1214 _clear(); 893 _clear();
1215 hasNext._setValue(false); 894 hasNext._setValue(false);
1216 return; 895 return;
1217 } 896 }
1218 _subscription.pause(); 897 _subscription.pause();
1219 _futureOrPrefetch = null; 898 _futureOrPrefetch = null;
1220 _state = _STATE_EXTRA_DONE; 899 _state = _STATE_EXTRA_DONE;
1221 } 900 }
1222 } 901 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | tests/lib/async/stream_controller_async_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698