Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 17490002: Make asBroadcastStream take two callbacks. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Reintroduce zone. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/html/dart2js/html_dart2js.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error); 10 void _addError(Object error);
(...skipping 678 matching lines...) Expand 10 before | Expand all | Expand 10 after
689 689
690 void _insertBefore(_BroadcastLinkedList newNext) { 690 void _insertBefore(_BroadcastLinkedList newNext) {
691 _BroadcastLinkedList newPrevious = newNext._previous; 691 _BroadcastLinkedList newPrevious = newNext._previous;
692 newPrevious._next = this; 692 newPrevious._next = this;
693 newNext._previous = _previous; 693 newNext._previous = _previous;
694 _previous._next = newNext; 694 _previous._next = newNext;
695 _previous = newPrevious; 695 _previous = newPrevious;
696 } 696 }
697 } 697 }
698 698
699 typedef void _broadcastCallback(StreamSubscription subscription);
700
701 /**
702 * Dummy subscription that will never receive any events.
703 */
704 class _DummyStreamSubscription<T> implements StreamSubscription<T> {
705 int _pauseCounter = 0;
706
707 void onData(void handleData(T data)) {}
708 void onError(void handleError(Object data)) {}
709 void onDone(void handleDone()) {}
710
711 void pause([Future resumeSignal]) {
712 _pauseCounter++;
713 if (resumeSignal != null) resumeSignal.then((_) { resume(); });
714 }
715 void resume() {
716 if (_pauseCounter > 0) _pauseCounter--;
717 }
718 void cancel() {}
719 bool get isPaused => _pauseCounter > 0;
720
721 Future asFuture([futureValue]) => new _FutureImpl();
722 }
723
699 class _AsBroadcastStream<T> extends Stream<T> { 724 class _AsBroadcastStream<T> extends Stream<T> {
700 final Stream<T> _source; 725 final Stream<T> _source;
726 final _broadcastCallback _onListenHandler;
727 final _broadcastCallback _onCancelHandler;
728 final _Zone _zone;
729
701 _AsBroadcastStreamController<T> _controller; 730 _AsBroadcastStreamController<T> _controller;
702 StreamSubscription<T> _subscription; 731 StreamSubscription<T> _subscription;
703 732
704 _AsBroadcastStream(this._source) { 733 _AsBroadcastStream(this._source,
705 _controller = new _AsBroadcastStreamController<T>(null, _onCancel); 734 this._onListenHandler,
735 this._onCancelHandler)
736 : _zone = _Zone.current {
737 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
738 // Keep zone alive until we are done doing callbacks.
739 _zone.expectCallback();
706 } 740 }
707 741
708 bool get isBroadcast => true; 742 bool get isBroadcast => true;
709 743
710 StreamSubscription<T> listen(void onData(T data), 744 StreamSubscription<T> listen(void onData(T data),
711 { void onError(Object error), 745 { void onError(Object error),
712 void onDone(), 746 void onDone(),
713 bool cancelOnError}) { 747 bool cancelOnError}) {
714 if (_controller == null) { 748 if (_controller == null) {
715 throw new StateError("Source stream has been closed."); 749 // Return a dummy subscription backed by nothing, since
750 // it won't ever receive any events.
751 return new _DummyStreamSubscription<T>();
716 } 752 }
717 if (_subscription == null) { 753 if (_subscription == null) {
718 _subscription = _source.listen(_controller.add, 754 _subscription = _source.listen(_controller.add,
719 onError: _controller.addError, 755 onError: _controller.addError,
720 onDone: _controller.close); 756 onDone: _controller.close);
721 } 757 }
722 return _controller.stream.listen(onData, onError: onError, onDone: onDone, 758 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
723 cancelOnError: cancelOnError); 759 cancelOnError: cancelOnError);
724 } 760 }
725 761
726 void _onCancel() { 762 void _onCancel() {
763 bool shutdown = (_controller == null) || _controller.isClosed;
764 if (_onCancelHandler != null) {
765 _zone.executePeriodicCallbackGuarded(
766 () => _onCancelHandler(new _BroadcastSubscriptionWrapper(this)));
767 }
768 if (shutdown) {
769 if (_subscription != null) {
770 _subscription.cancel();
771 _subscription = null;
772 }
773 _zone.cancelCallbackExpectation();
774 }
775 }
776
777 void _onListen() {
778 if (_onListenHandler != null) {
779 _zone.executePeriodicCallbackGuarded(
780 () => _onListenHandler(new _BroadcastSubscriptionWrapper(this)));
781 }
782 }
783
784 // Methods called from _BroadcastSubscriptionWrapper.
785 void _cancelSubscription() {
786 if (_subscription == null) return;
727 // Called by [_controller] when it has no subscribers left. 787 // Called by [_controller] when it has no subscribers left.
728 StreamSubscription subscription = _subscription; 788 StreamSubscription subscription = _subscription;
729 _subscription = null; 789 _subscription = null;
790 if (_controller._isEmpty) {
791 _zone.cancelCallbackExpectation();
792 }
730 _controller = null; // Marks the stream as no longer listenable. 793 _controller = null; // Marks the stream as no longer listenable.
731 subscription.cancel(); 794 subscription.cancel();
732 } 795 }
796
797 void _pauseSubscription(Future resumeSignal) {
798 if (_subscription == null) return;
799 _subscription.pause(resumeSignal);
800 }
801
802 void _resumeSubscription() {
803 if (_subscription == null) return;
804 _subscription.resume();
805 }
806
807 bool get _isSubscriptionPaused {
808 if (_subscription == null) return false;
809 return _subscription.isPaused;
810 }
733 } 811 }
734 812
735 /** 813 /**
814 * Wrapper for subscription that disallows changing handlers.
815 */
816 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
817 final _AsBroadcastStream _stream;
818
819 _BroadcastSubscriptionWrapper(this._stream);
820
821 void onData(void handleData(T data)) {
822 throw new UnsupportedError(
823 "Cannot change handlers of asBroadcastStream source subscription.");
824 }
825
826 void onError(void handleError(Object data)) {
827 throw new UnsupportedError(
828 "Cannot change handlers of asBroadcastStream source subscription.");
829 }
830
831 void onDone(void handleDone()) {
832 throw new UnsupportedError(
833 "Cannot change handlers of asBroadcastStream source subscription.");
834 }
835
836 void pause([Future resumeSignal]) {
837 _stream._pauseSubscription(resumeSignal);
838 }
839
840 void resume() {
841 _stream._resumeSubscription();
842 }
843
844 void cancel() {
845 _stream._cancelSubscription();
846 }
847
848 bool get isPaused {
849 return _stream._isSubscriptionPaused;
850 }
851
852 Future asFuture([var futureValue]) {
853 throw new UnsupportedError(
854 "Cannot change handlers of asBroadcastStream source subscription.");
855 }
856 }
857
858
859 /**
736 * Simple implementation of [StreamIterator]. 860 * Simple implementation of [StreamIterator].
737 */ 861 */
738 class _StreamIteratorImpl<T> implements StreamIterator<T> { 862 class _StreamIteratorImpl<T> implements StreamIterator<T> {
739 // Internal state of the stream iterator. 863 // Internal state of the stream iterator.
740 // At any time, it is in one of these states. 864 // At any time, it is in one of these states.
741 // The interpretation of the [_futureOrPrefecth] field depends on the state. 865 // The interpretation of the [_futureOrPrefecth] field depends on the state.
742 // In _STATE_MOVING, the _data field holds the most recently returned 866 // In _STATE_MOVING, the _data field holds the most recently returned
743 // future. 867 // future.
744 // When in one of the _STATE_EXTRA_* states, the it may hold the 868 // When in one of the _STATE_EXTRA_* states, the it may hold the
745 // next data/error object, and the subscription is paused. 869 // next data/error object, and the subscription is paused.
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
875 _FutureImpl<bool> hasNext = _futureOrPrefetch; 999 _FutureImpl<bool> hasNext = _futureOrPrefetch;
876 _clear(); 1000 _clear();
877 hasNext._setValue(false); 1001 hasNext._setValue(false);
878 return; 1002 return;
879 } 1003 }
880 _subscription.pause(); 1004 _subscription.pause();
881 _futureOrPrefetch = null; 1005 _futureOrPrefetch = null;
882 _state = _STATE_EXTRA_DONE; 1006 _state = _STATE_EXTRA_DONE;
883 } 1007 }
884 } 1008 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/html/dart2js/html_dart2js.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698