Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6)

Side by Side Diff: sdk/lib/async/stream_impl.dart

Issue 17490002: Make asBroadcastStream take two callbacks. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/html/dart2js/html_dart2js.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Abstract and private interface for a place to put events. */ 7 /** Abstract and private interface for a place to put events. */
8 abstract class _EventSink<T> { 8 abstract class _EventSink<T> {
9 void _add(T data); 9 void _add(T data);
10 void _addError(Object error); 10 void _addError(Object error);
(...skipping 678 matching lines...) Expand 10 before | Expand all | Expand 10 after
689 689
690 void _insertBefore(_BroadcastLinkedList newNext) { 690 void _insertBefore(_BroadcastLinkedList newNext) {
691 _BroadcastLinkedList newPrevious = newNext._previous; 691 _BroadcastLinkedList newPrevious = newNext._previous;
692 newPrevious._next = this; 692 newPrevious._next = this;
693 newNext._previous = _previous; 693 newNext._previous = _previous;
694 _previous._next = newNext; 694 _previous._next = newNext;
695 _previous = newPrevious; 695 _previous = newPrevious;
696 } 696 }
697 } 697 }
698 698
699 typedef void _broadcastCallback(StreamSubscription subscription);
700
699 class _AsBroadcastStream<T> extends Stream<T> { 701 class _AsBroadcastStream<T> extends Stream<T> {
700 final Stream<T> _source; 702 final Stream<T> _source;
703 final _broadcastCallback _onListenHandler;
704 final _broadcastCallback _onCancelHandler;
705 final _Zone _zone;
706
701 _AsBroadcastStreamController<T> _controller; 707 _AsBroadcastStreamController<T> _controller;
702 StreamSubscription<T> _subscription; 708 StreamSubscription<T> _subscription;
703 709
704 _AsBroadcastStream(this._source) { 710 _AsBroadcastStream(this._source,
705 _controller = new _AsBroadcastStreamController<T>(null, _onCancel); 711 this._onListenHandler,
712 this._onCancelHandler)
713 : _zone = _Zone.current {
714 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
706 } 715 }
707 716
708 bool get isBroadcast => true; 717 bool get isBroadcast => true;
709 718
710 StreamSubscription<T> listen(void onData(T data), 719 StreamSubscription<T> listen(void onData(T data),
711 { void onError(Object error), 720 { void onError(Object error),
712 void onDone(), 721 void onDone(),
713 bool cancelOnError}) { 722 bool cancelOnError}) {
714 if (_controller == null) { 723 if (_controller == null) {
715 throw new StateError("Source stream has been closed."); 724 throw new StateError("Source stream has been closed.");
716 } 725 }
717 if (_subscription == null) { 726 if (_subscription == null) {
718 _subscription = _source.listen(_controller.add, 727 _subscription = _source.listen(_controller.add,
719 onError: _controller.addError, 728 onError: _controller.addError,
720 onDone: _controller.close); 729 onDone: _controller.close);
721 } 730 }
722 return _controller.stream.listen(onData, onError: onError, onDone: onDone, 731 return _controller.stream.listen(onData, onError: onError, onDone: onDone,
723 cancelOnError: cancelOnError); 732 cancelOnError: cancelOnError);
724 } 733 }
725 734
726 void _onCancel() { 735 void _onCancel() {
736 if (_onCancelHandler != null) {
737 _zone.executeCallback(() {
floitsch 2013/06/20 14:28:30 I'm not sure if these should be run in the zone: t
Lasse Reichstein Nielsen 2013/06/21 09:49:11 Zone has been removed from the callback. Uses curr
738 _onCancelHandler(new _BroadcastSubscriptionWrapper(this));
739 });
740 }
741 }
742
743 void _onListen() {
744 if (_onListenHandler != null) {
745 _zone.executeCallback(() {
floitsch 2013/06/20 14:28:30 ditto.
Lasse Reichstein Nielsen 2013/06/21 09:49:11 Done.
746 _onListenHandler(new _BroadcastSubscriptionWrapper(this));
747 });
748 }
749 }
750
751 // Methods called from _BroadcastSubscriptionWrapper.
752 void _cancelSubscription() {
753 if (_subscription == null) return;
727 // Called by [_controller] when it has no subscribers left. 754 // Called by [_controller] when it has no subscribers left.
728 StreamSubscription subscription = _subscription; 755 StreamSubscription subscription = _subscription;
729 _subscription = null; 756 _subscription = null;
730 _controller = null; // Marks the stream as no longer listenable. 757 _controller = null; // Marks the stream as no longer listenable.
731 subscription.cancel(); 758 subscription.cancel();
732 } 759 }
760
761 void _pauseSubscription(Future resumeSignal) {
762 if (_subscription == null) return;
763 _subscription.pause(resumeSignal);
764 }
765
766 void _resumeSubscription() {
767 if (_subscription == null) return;
768 _subscription.resume();
769 }
770
771 bool get _isSubscriptionPaused {
772 if (_subscription == null) return false;
773 return _subscription.isPaused;
774 }
733 } 775 }
734 776
735 /** 777 /**
778 * Wrapper for subscription that disallows changing handlers.
779 *
780 * Then wrapper is only valid while a callback using it is running. After
781 * that it is invalidated, so a user can't hand on to it and change the
floitsch 2013/06/20 14:28:30 Description seems to be wrong. Pause and resume ke
Lasse Reichstein Nielsen 2013/06/21 09:49:11 ACK, changed that but forgot the doc. It stays val
782 * underlying subscription at inopportune moments.
783 */
784 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
785 _AsBroadcastStream _stream;
786
787 _BroadcastSubscriptionWrapper(this._stream);
788
789 void onData(void handleData(T data)) {
790 throw new UnsupportedError(
791 "Cannot change handlers of asBroadcastStream source subscription.");
792 }
793
794 void onError(void handleError(Object data)) {
795 throw new UnsupportedError(
796 "Cannot change handlers of asBroadcastStream source subscription.");
797 }
798
799 void onDone(void handleDone()) {
800 throw new UnsupportedError(
801 "Cannot change handlers of asBroadcastStream source subscription.");
802 }
803
804 void pause([Future resumeSignal]) {
805 if (_stream == null) return;
floitsch 2013/06/20 14:28:30 throw.
Lasse Reichstein Nielsen 2013/06/21 09:49:11 actually, I removed the line, and just call throug
806 _stream._pauseSubscription(resumeSignal);
floitsch 2013/06/20 14:28:30 This is a little bit strange: we allow a resume-si
Lasse Reichstein Nielsen 2013/06/21 09:49:11 We do allow calling resume.
807 }
808
809 void resume() {
810 if (_stream == null) return;
floitsch 2013/06/20 14:28:30 ditto (throw).
Lasse Reichstein Nielsen 2013/06/21 09:49:11 Ditto, removed.
811 _stream._resumeSubscription();
812 }
813
814 void cancel() {
815 if (_stream == null) return;
floitsch 2013/06/20 14:28:30 ditto.
Lasse Reichstein Nielsen 2013/06/21 09:49:11 removed.
816 _AsBroadcastStream stream = _stream;
817 _stream = null;
818 stream._cancelSubscription();
819 }
820
821 bool get isPaused {
822 if (_stream == null) return false;
floitsch 2013/06/20 14:28:30 ditto.
Lasse Reichstein Nielsen 2013/06/21 09:49:11 Removed.
823 return _stream._isSubscriptionPaused;
824 }
825
826 Future asFuture([var futureValue]) {
827 throw new UnsupportedError(
828 "Cannot change handlers of asBroadcastStream source subscription.");
829 }
830 }
831
832
833 /**
736 * Simple implementation of [StreamIterator]. 834 * Simple implementation of [StreamIterator].
737 */ 835 */
738 class _StreamIteratorImpl<T> implements StreamIterator<T> { 836 class _StreamIteratorImpl<T> implements StreamIterator<T> {
739 // Internal state of the stream iterator. 837 // Internal state of the stream iterator.
740 // At any time, it is in one of these states. 838 // At any time, it is in one of these states.
741 // The interpretation of the [_futureOrPrefecth] field depends on the state. 839 // The interpretation of the [_futureOrPrefecth] field depends on the state.
742 // In _STATE_MOVING, the _data field holds the most recently returned 840 // In _STATE_MOVING, the _data field holds the most recently returned
743 // future. 841 // future.
744 // When in one of the _STATE_EXTRA_* states, the it may hold the 842 // When in one of the _STATE_EXTRA_* states, the it may hold the
745 // next data/error object, and the subscription is paused. 843 // next data/error object, and the subscription is paused.
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after
875 _FutureImpl<bool> hasNext = _futureOrPrefetch; 973 _FutureImpl<bool> hasNext = _futureOrPrefetch;
876 _clear(); 974 _clear();
877 hasNext._setValue(false); 975 hasNext._setValue(false);
878 return; 976 return;
879 } 977 }
880 _subscription.pause(); 978 _subscription.pause();
881 _futureOrPrefetch = null; 979 _futureOrPrefetch = null;
882 _state = _STATE_EXTRA_DONE; 980 _state = _STATE_EXTRA_DONE;
883 } 981 }
884 } 982 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/html/dart2js/html_dart2js.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698