OLD | NEW |
---|---|
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error); | 10 void _addError(Object error); |
(...skipping 678 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
689 | 689 |
690 void _insertBefore(_BroadcastLinkedList newNext) { | 690 void _insertBefore(_BroadcastLinkedList newNext) { |
691 _BroadcastLinkedList newPrevious = newNext._previous; | 691 _BroadcastLinkedList newPrevious = newNext._previous; |
692 newPrevious._next = this; | 692 newPrevious._next = this; |
693 newNext._previous = _previous; | 693 newNext._previous = _previous; |
694 _previous._next = newNext; | 694 _previous._next = newNext; |
695 _previous = newPrevious; | 695 _previous = newPrevious; |
696 } | 696 } |
697 } | 697 } |
698 | 698 |
699 typedef void _broadcastCallback(StreamSubscription subscription); | |
700 | |
699 class _AsBroadcastStream<T> extends Stream<T> { | 701 class _AsBroadcastStream<T> extends Stream<T> { |
700 final Stream<T> _source; | 702 final Stream<T> _source; |
703 final _broadcastCallback _onListenHandler; | |
704 final _broadcastCallback _onCancelHandler; | |
705 final _Zone _zone; | |
706 | |
701 _AsBroadcastStreamController<T> _controller; | 707 _AsBroadcastStreamController<T> _controller; |
702 StreamSubscription<T> _subscription; | 708 StreamSubscription<T> _subscription; |
703 | 709 |
704 _AsBroadcastStream(this._source) { | 710 _AsBroadcastStream(this._source, |
705 _controller = new _AsBroadcastStreamController<T>(null, _onCancel); | 711 this._onListenHandler, |
712 this._onCancelHandler) | |
713 : _zone = _Zone.current { | |
714 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | |
706 } | 715 } |
707 | 716 |
708 bool get isBroadcast => true; | 717 bool get isBroadcast => true; |
709 | 718 |
710 StreamSubscription<T> listen(void onData(T data), | 719 StreamSubscription<T> listen(void onData(T data), |
711 { void onError(Object error), | 720 { void onError(Object error), |
712 void onDone(), | 721 void onDone(), |
713 bool cancelOnError}) { | 722 bool cancelOnError}) { |
714 if (_controller == null) { | 723 if (_controller == null) { |
715 throw new StateError("Source stream has been closed."); | 724 throw new StateError("Source stream has been closed."); |
716 } | 725 } |
717 if (_subscription == null) { | 726 if (_subscription == null) { |
718 _subscription = _source.listen(_controller.add, | 727 _subscription = _source.listen(_controller.add, |
719 onError: _controller.addError, | 728 onError: _controller.addError, |
720 onDone: _controller.close); | 729 onDone: _controller.close); |
721 } | 730 } |
722 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | 731 return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
723 cancelOnError: cancelOnError); | 732 cancelOnError: cancelOnError); |
724 } | 733 } |
725 | 734 |
726 void _onCancel() { | 735 void _onCancel() { |
736 if (_onCancelHandler != null) { | |
737 _zone.executeCallback(() { | |
floitsch
2013/06/20 14:28:30
I'm not sure if these should be run in the zone: t
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Zone has been removed from the callback.
Uses curr
| |
738 _onCancelHandler(new _BroadcastSubscriptionWrapper(this)); | |
739 }); | |
740 } | |
741 } | |
742 | |
743 void _onListen() { | |
744 if (_onListenHandler != null) { | |
745 _zone.executeCallback(() { | |
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Done.
| |
746 _onListenHandler(new _BroadcastSubscriptionWrapper(this)); | |
747 }); | |
748 } | |
749 } | |
750 | |
751 // Methods called from _BroadcastSubscriptionWrapper. | |
752 void _cancelSubscription() { | |
753 if (_subscription == null) return; | |
727 // Called by [_controller] when it has no subscribers left. | 754 // Called by [_controller] when it has no subscribers left. |
728 StreamSubscription subscription = _subscription; | 755 StreamSubscription subscription = _subscription; |
729 _subscription = null; | 756 _subscription = null; |
730 _controller = null; // Marks the stream as no longer listenable. | 757 _controller = null; // Marks the stream as no longer listenable. |
731 subscription.cancel(); | 758 subscription.cancel(); |
732 } | 759 } |
760 | |
761 void _pauseSubscription(Future resumeSignal) { | |
762 if (_subscription == null) return; | |
763 _subscription.pause(resumeSignal); | |
764 } | |
765 | |
766 void _resumeSubscription() { | |
767 if (_subscription == null) return; | |
768 _subscription.resume(); | |
769 } | |
770 | |
771 bool get _isSubscriptionPaused { | |
772 if (_subscription == null) return false; | |
773 return _subscription.isPaused; | |
774 } | |
733 } | 775 } |
734 | 776 |
735 /** | 777 /** |
778 * Wrapper for subscription that disallows changing handlers. | |
779 * | |
780 * Then wrapper is only valid while a callback using it is running. After | |
781 * that it is invalidated, so a user can't hand on to it and change the | |
floitsch
2013/06/20 14:28:30
Description seems to be wrong. Pause and resume ke
Lasse Reichstein Nielsen
2013/06/21 09:49:11
ACK, changed that but forgot the doc.
It stays val
| |
782 * underlying subscription at inopportune moments. | |
783 */ | |
784 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { | |
785 _AsBroadcastStream _stream; | |
786 | |
787 _BroadcastSubscriptionWrapper(this._stream); | |
788 | |
789 void onData(void handleData(T data)) { | |
790 throw new UnsupportedError( | |
791 "Cannot change handlers of asBroadcastStream source subscription."); | |
792 } | |
793 | |
794 void onError(void handleError(Object data)) { | |
795 throw new UnsupportedError( | |
796 "Cannot change handlers of asBroadcastStream source subscription."); | |
797 } | |
798 | |
799 void onDone(void handleDone()) { | |
800 throw new UnsupportedError( | |
801 "Cannot change handlers of asBroadcastStream source subscription."); | |
802 } | |
803 | |
804 void pause([Future resumeSignal]) { | |
805 if (_stream == null) return; | |
floitsch
2013/06/20 14:28:30
throw.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
actually, I removed the line, and just call throug
| |
806 _stream._pauseSubscription(resumeSignal); | |
floitsch
2013/06/20 14:28:30
This is a little bit strange: we allow a resume-si
Lasse Reichstein Nielsen
2013/06/21 09:49:11
We do allow calling resume.
| |
807 } | |
808 | |
809 void resume() { | |
810 if (_stream == null) return; | |
floitsch
2013/06/20 14:28:30
ditto (throw).
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Ditto, removed.
| |
811 _stream._resumeSubscription(); | |
812 } | |
813 | |
814 void cancel() { | |
815 if (_stream == null) return; | |
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
removed.
| |
816 _AsBroadcastStream stream = _stream; | |
817 _stream = null; | |
818 stream._cancelSubscription(); | |
819 } | |
820 | |
821 bool get isPaused { | |
822 if (_stream == null) return false; | |
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Removed.
| |
823 return _stream._isSubscriptionPaused; | |
824 } | |
825 | |
826 Future asFuture([var futureValue]) { | |
827 throw new UnsupportedError( | |
828 "Cannot change handlers of asBroadcastStream source subscription."); | |
829 } | |
830 } | |
831 | |
832 | |
833 /** | |
736 * Simple implementation of [StreamIterator]. | 834 * Simple implementation of [StreamIterator]. |
737 */ | 835 */ |
738 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 836 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
739 // Internal state of the stream iterator. | 837 // Internal state of the stream iterator. |
740 // At any time, it is in one of these states. | 838 // At any time, it is in one of these states. |
741 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 839 // The interpretation of the [_futureOrPrefecth] field depends on the state. |
742 // In _STATE_MOVING, the _data field holds the most recently returned | 840 // In _STATE_MOVING, the _data field holds the most recently returned |
743 // future. | 841 // future. |
744 // When in one of the _STATE_EXTRA_* states, the it may hold the | 842 // When in one of the _STATE_EXTRA_* states, the it may hold the |
745 // next data/error object, and the subscription is paused. | 843 // next data/error object, and the subscription is paused. |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
875 _FutureImpl<bool> hasNext = _futureOrPrefetch; | 973 _FutureImpl<bool> hasNext = _futureOrPrefetch; |
876 _clear(); | 974 _clear(); |
877 hasNext._setValue(false); | 975 hasNext._setValue(false); |
878 return; | 976 return; |
879 } | 977 } |
880 _subscription.pause(); | 978 _subscription.pause(); |
881 _futureOrPrefetch = null; | 979 _futureOrPrefetch = null; |
882 _state = _STATE_EXTRA_DONE; | 980 _state = _STATE_EXTRA_DONE; |
883 } | 981 } |
884 } | 982 } |
OLD | NEW |