OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
9 void _add(T data); | 9 void _add(T data); |
10 void _addError(Object error); | 10 void _addError(Object error); |
(...skipping 678 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
689 | 689 |
690 void _insertBefore(_BroadcastLinkedList newNext) { | 690 void _insertBefore(_BroadcastLinkedList newNext) { |
691 _BroadcastLinkedList newPrevious = newNext._previous; | 691 _BroadcastLinkedList newPrevious = newNext._previous; |
692 newPrevious._next = this; | 692 newPrevious._next = this; |
693 newNext._previous = _previous; | 693 newNext._previous = _previous; |
694 _previous._next = newNext; | 694 _previous._next = newNext; |
695 _previous = newPrevious; | 695 _previous = newPrevious; |
696 } | 696 } |
697 } | 697 } |
698 | 698 |
| 699 typedef void _broadcastCallback(StreamSubscription subscription); |
| 700 |
| 701 /** |
| 702 * Dummy subscription that will never receive any events. |
| 703 */ |
| 704 class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
| 705 int _pauseCounter = 0; |
| 706 |
| 707 void onData(void handleData(T data)) {} |
| 708 void onError(void handleError(Object data)) {} |
| 709 void onDone(void handleDone()) {} |
| 710 |
| 711 void pause([Future resumeSignal]) { |
| 712 _pauseCounter++; |
| 713 if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
| 714 } |
| 715 void resume() { |
| 716 if (_pauseCounter > 0) _pauseCounter--; |
| 717 } |
| 718 void cancel() {} |
| 719 bool get isPaused => _pauseCounter > 0; |
| 720 |
| 721 Future asFuture([futureValue]) => new _FutureImpl(); |
| 722 } |
| 723 |
699 class _AsBroadcastStream<T> extends Stream<T> { | 724 class _AsBroadcastStream<T> extends Stream<T> { |
700 final Stream<T> _source; | 725 final Stream<T> _source; |
| 726 final _broadcastCallback _onListenHandler; |
| 727 final _broadcastCallback _onCancelHandler; |
| 728 final _Zone _zone; |
| 729 |
701 _AsBroadcastStreamController<T> _controller; | 730 _AsBroadcastStreamController<T> _controller; |
702 StreamSubscription<T> _subscription; | 731 StreamSubscription<T> _subscription; |
703 | 732 |
704 _AsBroadcastStream(this._source) { | 733 _AsBroadcastStream(this._source, |
705 _controller = new _AsBroadcastStreamController<T>(null, _onCancel); | 734 this._onListenHandler, |
| 735 this._onCancelHandler) |
| 736 : _zone = _Zone.current { |
| 737 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
| 738 // Keep zone alive until we are done doing callbacks. |
| 739 _zone.expectCallback(); |
706 } | 740 } |
707 | 741 |
708 bool get isBroadcast => true; | 742 bool get isBroadcast => true; |
709 | 743 |
710 StreamSubscription<T> listen(void onData(T data), | 744 StreamSubscription<T> listen(void onData(T data), |
711 { void onError(Object error), | 745 { void onError(Object error), |
712 void onDone(), | 746 void onDone(), |
713 bool cancelOnError}) { | 747 bool cancelOnError}) { |
714 if (_controller == null) { | 748 if (_controller == null) { |
715 throw new StateError("Source stream has been closed."); | 749 // Return a dummy subscription backed by nothing, since |
| 750 // it won't ever receive any events. |
| 751 return new _DummyStreamSubscription<T>(); |
716 } | 752 } |
717 if (_subscription == null) { | 753 if (_subscription == null) { |
718 _subscription = _source.listen(_controller.add, | 754 _subscription = _source.listen(_controller.add, |
719 onError: _controller.addError, | 755 onError: _controller.addError, |
720 onDone: _controller.close); | 756 onDone: _controller.close); |
721 } | 757 } |
722 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | 758 return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
723 cancelOnError: cancelOnError); | 759 cancelOnError: cancelOnError); |
724 } | 760 } |
725 | 761 |
726 void _onCancel() { | 762 void _onCancel() { |
| 763 bool shutdown = (_controller == null) || _controller.isClosed; |
| 764 if (_onCancelHandler != null) { |
| 765 _zone.executePeriodicCallbackGuarded( |
| 766 () => _onCancelHandler(new _BroadcastSubscriptionWrapper(this))); |
| 767 } |
| 768 if (shutdown) { |
| 769 if (_subscription != null) { |
| 770 _subscription.cancel(); |
| 771 _subscription = null; |
| 772 } |
| 773 _zone.cancelCallbackExpectation(); |
| 774 } |
| 775 } |
| 776 |
| 777 void _onListen() { |
| 778 if (_onListenHandler != null) { |
| 779 _zone.executePeriodicCallbackGuarded( |
| 780 () => _onListenHandler(new _BroadcastSubscriptionWrapper(this))); |
| 781 } |
| 782 } |
| 783 |
| 784 // Methods called from _BroadcastSubscriptionWrapper. |
| 785 void _cancelSubscription() { |
| 786 if (_subscription == null) return; |
727 // Called by [_controller] when it has no subscribers left. | 787 // Called by [_controller] when it has no subscribers left. |
728 StreamSubscription subscription = _subscription; | 788 StreamSubscription subscription = _subscription; |
729 _subscription = null; | 789 _subscription = null; |
| 790 if (_controller._isEmpty) { |
| 791 _zone.cancelCallbackExpectation(); |
| 792 } |
730 _controller = null; // Marks the stream as no longer listenable. | 793 _controller = null; // Marks the stream as no longer listenable. |
731 subscription.cancel(); | 794 subscription.cancel(); |
732 } | 795 } |
| 796 |
| 797 void _pauseSubscription(Future resumeSignal) { |
| 798 if (_subscription == null) return; |
| 799 _subscription.pause(resumeSignal); |
| 800 } |
| 801 |
| 802 void _resumeSubscription() { |
| 803 if (_subscription == null) return; |
| 804 _subscription.resume(); |
| 805 } |
| 806 |
| 807 bool get _isSubscriptionPaused { |
| 808 if (_subscription == null) return false; |
| 809 return _subscription.isPaused; |
| 810 } |
733 } | 811 } |
734 | 812 |
735 /** | 813 /** |
| 814 * Wrapper for subscription that disallows changing handlers. |
| 815 */ |
| 816 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
| 817 final _AsBroadcastStream _stream; |
| 818 |
| 819 _BroadcastSubscriptionWrapper(this._stream); |
| 820 |
| 821 void onData(void handleData(T data)) { |
| 822 throw new UnsupportedError( |
| 823 "Cannot change handlers of asBroadcastStream source subscription."); |
| 824 } |
| 825 |
| 826 void onError(void handleError(Object data)) { |
| 827 throw new UnsupportedError( |
| 828 "Cannot change handlers of asBroadcastStream source subscription."); |
| 829 } |
| 830 |
| 831 void onDone(void handleDone()) { |
| 832 throw new UnsupportedError( |
| 833 "Cannot change handlers of asBroadcastStream source subscription."); |
| 834 } |
| 835 |
| 836 void pause([Future resumeSignal]) { |
| 837 _stream._pauseSubscription(resumeSignal); |
| 838 } |
| 839 |
| 840 void resume() { |
| 841 _stream._resumeSubscription(); |
| 842 } |
| 843 |
| 844 void cancel() { |
| 845 _stream._cancelSubscription(); |
| 846 } |
| 847 |
| 848 bool get isPaused { |
| 849 return _stream._isSubscriptionPaused; |
| 850 } |
| 851 |
| 852 Future asFuture([var futureValue]) { |
| 853 throw new UnsupportedError( |
| 854 "Cannot change handlers of asBroadcastStream source subscription."); |
| 855 } |
| 856 } |
| 857 |
| 858 |
| 859 /** |
736 * Simple implementation of [StreamIterator]. | 860 * Simple implementation of [StreamIterator]. |
737 */ | 861 */ |
738 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 862 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
739 // Internal state of the stream iterator. | 863 // Internal state of the stream iterator. |
740 // At any time, it is in one of these states. | 864 // At any time, it is in one of these states. |
741 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 865 // The interpretation of the [_futureOrPrefecth] field depends on the state. |
742 // In _STATE_MOVING, the _data field holds the most recently returned | 866 // In _STATE_MOVING, the _data field holds the most recently returned |
743 // future. | 867 // future. |
744 // When in one of the _STATE_EXTRA_* states, the it may hold the | 868 // When in one of the _STATE_EXTRA_* states, the it may hold the |
745 // next data/error object, and the subscription is paused. | 869 // next data/error object, and the subscription is paused. |
(...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
875 _FutureImpl<bool> hasNext = _futureOrPrefetch; | 999 _FutureImpl<bool> hasNext = _futureOrPrefetch; |
876 _clear(); | 1000 _clear(); |
877 hasNext._setValue(false); | 1001 hasNext._setValue(false); |
878 return; | 1002 return; |
879 } | 1003 } |
880 _subscription.pause(); | 1004 _subscription.pause(); |
881 _futureOrPrefetch = null; | 1005 _futureOrPrefetch = null; |
882 _state = _STATE_EXTRA_DONE; | 1006 _state = _STATE_EXTRA_DONE; |
883 } | 1007 } |
884 } | 1008 } |
OLD | NEW |