Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 /** Abstract and private interface for a place to put events. */ | 7 /** Abstract and private interface for a place to put events. */ |
| 8 abstract class _EventSink<T> { | 8 abstract class _EventSink<T> { |
| 9 void _add(T data); | 9 void _add(T data); |
| 10 void _addError(Object error); | 10 void _addError(Object error); |
| (...skipping 678 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 689 | 689 |
| 690 void _insertBefore(_BroadcastLinkedList newNext) { | 690 void _insertBefore(_BroadcastLinkedList newNext) { |
| 691 _BroadcastLinkedList newPrevious = newNext._previous; | 691 _BroadcastLinkedList newPrevious = newNext._previous; |
| 692 newPrevious._next = this; | 692 newPrevious._next = this; |
| 693 newNext._previous = _previous; | 693 newNext._previous = _previous; |
| 694 _previous._next = newNext; | 694 _previous._next = newNext; |
| 695 _previous = newPrevious; | 695 _previous = newPrevious; |
| 696 } | 696 } |
| 697 } | 697 } |
| 698 | 698 |
| 699 typedef void _broadcastCallback(StreamSubscription subscription); | |
| 700 | |
| 699 class _AsBroadcastStream<T> extends Stream<T> { | 701 class _AsBroadcastStream<T> extends Stream<T> { |
| 700 final Stream<T> _source; | 702 final Stream<T> _source; |
| 703 final _broadcastCallback _onListenHandler; | |
| 704 final _broadcastCallback _onCancelHandler; | |
| 705 final _Zone _zone; | |
| 706 | |
| 701 _AsBroadcastStreamController<T> _controller; | 707 _AsBroadcastStreamController<T> _controller; |
| 702 StreamSubscription<T> _subscription; | 708 StreamSubscription<T> _subscription; |
| 703 | 709 |
| 704 _AsBroadcastStream(this._source) { | 710 _AsBroadcastStream(this._source, |
| 705 _controller = new _AsBroadcastStreamController<T>(null, _onCancel); | 711 this._onListenHandler, |
| 712 this._onCancelHandler) | |
| 713 : _zone = _Zone.current { | |
| 714 _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | |
| 706 } | 715 } |
| 707 | 716 |
| 708 bool get isBroadcast => true; | 717 bool get isBroadcast => true; |
| 709 | 718 |
| 710 StreamSubscription<T> listen(void onData(T data), | 719 StreamSubscription<T> listen(void onData(T data), |
| 711 { void onError(Object error), | 720 { void onError(Object error), |
| 712 void onDone(), | 721 void onDone(), |
| 713 bool cancelOnError}) { | 722 bool cancelOnError}) { |
| 714 if (_controller == null) { | 723 if (_controller == null) { |
| 715 throw new StateError("Source stream has been closed."); | 724 throw new StateError("Source stream has been closed."); |
| 716 } | 725 } |
| 717 if (_subscription == null) { | 726 if (_subscription == null) { |
| 718 _subscription = _source.listen(_controller.add, | 727 _subscription = _source.listen(_controller.add, |
| 719 onError: _controller.addError, | 728 onError: _controller.addError, |
| 720 onDone: _controller.close); | 729 onDone: _controller.close); |
| 721 } | 730 } |
| 722 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | 731 return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| 723 cancelOnError: cancelOnError); | 732 cancelOnError: cancelOnError); |
| 724 } | 733 } |
| 725 | 734 |
| 726 void _onCancel() { | 735 void _onCancel() { |
| 736 if (_onCancelHandler != null) { | |
| 737 _zone.executeCallback(() { | |
|
floitsch
2013/06/20 14:28:30
I'm not sure if these should be run in the zone: t
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Zone has been removed from the callback.
Uses curr
| |
| 738 _onCancelHandler(new _BroadcastSubscriptionWrapper(this)); | |
| 739 }); | |
| 740 } | |
| 741 } | |
| 742 | |
| 743 void _onListen() { | |
| 744 if (_onListenHandler != null) { | |
| 745 _zone.executeCallback(() { | |
|
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Done.
| |
| 746 _onListenHandler(new _BroadcastSubscriptionWrapper(this)); | |
| 747 }); | |
| 748 } | |
| 749 } | |
| 750 | |
| 751 // Methods called from _BroadcastSubscriptionWrapper. | |
| 752 void _cancelSubscription() { | |
| 753 if (_subscription == null) return; | |
| 727 // Called by [_controller] when it has no subscribers left. | 754 // Called by [_controller] when it has no subscribers left. |
| 728 StreamSubscription subscription = _subscription; | 755 StreamSubscription subscription = _subscription; |
| 729 _subscription = null; | 756 _subscription = null; |
| 730 _controller = null; // Marks the stream as no longer listenable. | 757 _controller = null; // Marks the stream as no longer listenable. |
| 731 subscription.cancel(); | 758 subscription.cancel(); |
| 732 } | 759 } |
| 760 | |
| 761 void _pauseSubscription(Future resumeSignal) { | |
| 762 if (_subscription == null) return; | |
| 763 _subscription.pause(resumeSignal); | |
| 764 } | |
| 765 | |
| 766 void _resumeSubscription() { | |
| 767 if (_subscription == null) return; | |
| 768 _subscription.resume(); | |
| 769 } | |
| 770 | |
| 771 bool get _isSubscriptionPaused { | |
| 772 if (_subscription == null) return false; | |
| 773 return _subscription.isPaused; | |
| 774 } | |
| 733 } | 775 } |
| 734 | 776 |
| 735 /** | 777 /** |
| 778 * Wrapper for subscription that disallows changing handlers. | |
| 779 * | |
| 780 * Then wrapper is only valid while a callback using it is running. After | |
| 781 * that it is invalidated, so a user can't hand on to it and change the | |
|
floitsch
2013/06/20 14:28:30
Description seems to be wrong. Pause and resume ke
Lasse Reichstein Nielsen
2013/06/21 09:49:11
ACK, changed that but forgot the doc.
It stays val
| |
| 782 * underlying subscription at inopportune moments. | |
| 783 */ | |
| 784 class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { | |
| 785 _AsBroadcastStream _stream; | |
| 786 | |
| 787 _BroadcastSubscriptionWrapper(this._stream); | |
| 788 | |
| 789 void onData(void handleData(T data)) { | |
| 790 throw new UnsupportedError( | |
| 791 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 792 } | |
| 793 | |
| 794 void onError(void handleError(Object data)) { | |
| 795 throw new UnsupportedError( | |
| 796 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 797 } | |
| 798 | |
| 799 void onDone(void handleDone()) { | |
| 800 throw new UnsupportedError( | |
| 801 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 802 } | |
| 803 | |
| 804 void pause([Future resumeSignal]) { | |
| 805 if (_stream == null) return; | |
|
floitsch
2013/06/20 14:28:30
throw.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
actually, I removed the line, and just call throug
| |
| 806 _stream._pauseSubscription(resumeSignal); | |
|
floitsch
2013/06/20 14:28:30
This is a little bit strange: we allow a resume-si
Lasse Reichstein Nielsen
2013/06/21 09:49:11
We do allow calling resume.
| |
| 807 } | |
| 808 | |
| 809 void resume() { | |
| 810 if (_stream == null) return; | |
|
floitsch
2013/06/20 14:28:30
ditto (throw).
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Ditto, removed.
| |
| 811 _stream._resumeSubscription(); | |
| 812 } | |
| 813 | |
| 814 void cancel() { | |
| 815 if (_stream == null) return; | |
|
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
removed.
| |
| 816 _AsBroadcastStream stream = _stream; | |
| 817 _stream = null; | |
| 818 stream._cancelSubscription(); | |
| 819 } | |
| 820 | |
| 821 bool get isPaused { | |
| 822 if (_stream == null) return false; | |
|
floitsch
2013/06/20 14:28:30
ditto.
Lasse Reichstein Nielsen
2013/06/21 09:49:11
Removed.
| |
| 823 return _stream._isSubscriptionPaused; | |
| 824 } | |
| 825 | |
| 826 Future asFuture([var futureValue]) { | |
| 827 throw new UnsupportedError( | |
| 828 "Cannot change handlers of asBroadcastStream source subscription."); | |
| 829 } | |
| 830 } | |
| 831 | |
| 832 | |
| 833 /** | |
| 736 * Simple implementation of [StreamIterator]. | 834 * Simple implementation of [StreamIterator]. |
| 737 */ | 835 */ |
| 738 class _StreamIteratorImpl<T> implements StreamIterator<T> { | 836 class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| 739 // Internal state of the stream iterator. | 837 // Internal state of the stream iterator. |
| 740 // At any time, it is in one of these states. | 838 // At any time, it is in one of these states. |
| 741 // The interpretation of the [_futureOrPrefecth] field depends on the state. | 839 // The interpretation of the [_futureOrPrefecth] field depends on the state. |
| 742 // In _STATE_MOVING, the _data field holds the most recently returned | 840 // In _STATE_MOVING, the _data field holds the most recently returned |
| 743 // future. | 841 // future. |
| 744 // When in one of the _STATE_EXTRA_* states, the it may hold the | 842 // When in one of the _STATE_EXTRA_* states, the it may hold the |
| 745 // next data/error object, and the subscription is paused. | 843 // next data/error object, and the subscription is paused. |
| (...skipping 129 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 875 _FutureImpl<bool> hasNext = _futureOrPrefetch; | 973 _FutureImpl<bool> hasNext = _futureOrPrefetch; |
| 876 _clear(); | 974 _clear(); |
| 877 hasNext._setValue(false); | 975 hasNext._setValue(false); |
| 878 return; | 976 return; |
| 879 } | 977 } |
| 880 _subscription.pause(); | 978 _subscription.pause(); |
| 881 _futureOrPrefetch = null; | 979 _futureOrPrefetch = null; |
| 882 _state = _STATE_EXTRA_DONE; | 980 _state = _STATE_EXTRA_DONE; |
| 883 } | 981 } |
| 884 } | 982 } |
| OLD | NEW |