| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // States shared by single/multi stream implementations. | 7 // States shared by single/multi stream implementations. |
| 8 | 8 |
| 9 // Completion state of the stream. | 9 // Completion state of the stream. |
| 10 /// Initial and default state where the stream can receive and send events. | 10 /// Initial and default state where the stream can receive and send events. |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 59 // ------------------------------------------------------------------- | 59 // ------------------------------------------------------------------- |
| 60 // Common base class for single and multi-subscription streams. | 60 // Common base class for single and multi-subscription streams. |
| 61 // ------------------------------------------------------------------- | 61 // ------------------------------------------------------------------- |
| 62 abstract class _StreamImpl<T> extends Stream<T> { | 62 abstract class _StreamImpl<T> extends Stream<T> { |
| 63 /** Current state of the stream. */ | 63 /** Current state of the stream. */ |
| 64 int _state = _STREAM_OPEN; | 64 int _state = _STREAM_OPEN; |
| 65 | 65 |
| 66 /** | 66 /** |
| 67 * List of pending events. | 67 * List of pending events. |
| 68 * | 68 * |
| 69 * If events are added to the stream (using [_add], [_signalError] or [_done]) | 69 * If events are added to the stream (using [_add], [_addError] or [_done]) |
| 70 * while the stream is paused, or while another event is firing, events will | 70 * while the stream is paused, or while another event is firing, events will |
| 71 * stored here. | 71 * stored here. |
| 72 * Also supports scheduling the events for later execution. | 72 * Also supports scheduling the events for later execution. |
| 73 */ | 73 */ |
| 74 _PendingEvents _pendingEvents; | 74 _PendingEvents _pendingEvents; |
| 75 | 75 |
| 76 // ------------------------------------------------------------------ | 76 // ------------------------------------------------------------------ |
| 77 // Stream interface. | 77 // Stream interface. |
| 78 | 78 |
| 79 StreamSubscription<T> listen(void onData(T data), | 79 StreamSubscription<T> listen(void onData(T data), |
| 80 { void onError(AsyncError error), | 80 { void onError(AsyncError error), |
| 81 void onDone(), | 81 void onDone(), |
| 82 bool unsubscribeOnError }) { | 82 bool unsubscribeOnError }) { |
| 83 if (_isComplete) { | 83 if (_isComplete) { |
| 84 return new _DoneSubscription(onDone); | 84 return new _DoneSubscription(onDone); |
| 85 } | 85 } |
| 86 if (onData == null) onData = _nullDataHandler; | 86 if (onData == null) onData = _nullDataHandler; |
| 87 if (onError == null) onError = _nullErrorHandler; | 87 if (onError == null) onError = _nullErrorHandler; |
| 88 if (onDone == null) onDone = _nullDoneHandler; | 88 if (onDone == null) onDone = _nullDoneHandler; |
| 89 unsubscribeOnError = identical(true, unsubscribeOnError); | 89 unsubscribeOnError = identical(true, unsubscribeOnError); |
| 90 _StreamSubscriptionImpl subscription = | 90 _StreamSubscriptionImpl subscription = |
| 91 _createSubscription(onData, onError, onDone, unsubscribeOnError); | 91 _createSubscription(onData, onError, onDone, unsubscribeOnError); |
| 92 _addListener(subscription); | 92 _addListener(subscription); |
| 93 return subscription; | 93 return subscription; |
| 94 } | 94 } |
| 95 | 95 |
| 96 // ------------------------------------------------------------------ | 96 // ------------------------------------------------------------------ |
| 97 // StreamSink interface-like methods for sending events into the stream. | 97 // EventSink interface-like methods for sending events into the stream. |
| 98 // It's the responsibility of the caller to ensure that the stream is not | 98 // It's the responsibility of the caller to ensure that the stream is not |
| 99 // paused when adding events. If the stream is paused, the events will be | 99 // paused when adding events. If the stream is paused, the events will be |
| 100 // queued, but it's better to not send events at all. | 100 // queued, but it's better to not send events at all. |
| 101 | 101 |
| 102 /** | 102 /** |
| 103 * Send or queue a data event. | 103 * Send or queue a data event. |
| 104 */ | 104 */ |
| 105 void _add(T value) { | 105 void _add(T value) { |
| 106 if (_isClosed) throw new StateError("Sending on closed stream"); | 106 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 107 if (!_mayFireState) { | 107 if (!_mayFireState) { |
| 108 // Not the time to send events. | 108 // Not the time to send events. |
| 109 _addPendingEvent(new _DelayedData<T>(value)); | 109 _addPendingEvent(new _DelayedData<T>(value)); |
| 110 return; | 110 return; |
| 111 } | 111 } |
| 112 if (_hasPendingEvent) { | 112 if (_hasPendingEvent) { |
| 113 _addPendingEvent(new _DelayedData<T>(value)); | 113 _addPendingEvent(new _DelayedData<T>(value)); |
| 114 } else { | 114 } else { |
| 115 _sendData(value); | 115 _sendData(value); |
| 116 } | 116 } |
| 117 _handlePendingEvents(); | 117 _handlePendingEvents(); |
| 118 } | 118 } |
| 119 | 119 |
| 120 /** | 120 /** |
| 121 * Send or enqueue an error event. | 121 * Send or enqueue an error event. |
| 122 * | 122 * |
| 123 * If a subscription has requested to be unsubscribed on errors, | 123 * If a subscription has requested to be unsubscribed on errors, |
| 124 * it will be unsubscribed after receiving this event. | 124 * it will be unsubscribed after receiving this event. |
| 125 */ | 125 */ |
| 126 void _signalError(AsyncError error) { | 126 void _addError(AsyncError error) { |
| 127 if (_isClosed) throw new StateError("Sending on closed stream"); | 127 if (_isClosed) throw new StateError("Sending on closed stream"); |
| 128 if (!_mayFireState) { | 128 if (!_mayFireState) { |
| 129 // Not the time to send events. | 129 // Not the time to send events. |
| 130 _addPendingEvent(new _DelayedError(error)); | 130 _addPendingEvent(new _DelayedError(error)); |
| 131 return; | 131 return; |
| 132 } | 132 } |
| 133 if (_hasPendingEvent) { | 133 if (_hasPendingEvent) { |
| 134 _addPendingEvent(new _DelayedError(error)); | 134 _addPendingEvent(new _DelayedError(error)); |
| 135 } else { | 135 } else { |
| 136 _sendError(error); | 136 _sendError(error); |
| (...skipping 418 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 555 * | 555 * |
| 556 * The only public methods are those of [Stream], so instances of | 556 * The only public methods are those of [Stream], so instances of |
| 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing | 557 * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
| 558 * internal functionality. | 558 * internal functionality. |
| 559 * | 559 * |
| 560 * The [StreamController] is a public facing version of this class, with | 560 * The [StreamController] is a public facing version of this class, with |
| 561 * some methods made public. | 561 * some methods made public. |
| 562 * | 562 * |
| 563 * The user interface of [_SingleStreamImpl] are the following methods: | 563 * The user interface of [_SingleStreamImpl] are the following methods: |
| 564 * * [_add]: Add a data event to the stream. | 564 * * [_add]: Add a data event to the stream. |
| 565 * * [_signalError]: Add an error event to the stream. | 565 * * [_addError]: Add an error event to the stream. |
| 566 * * [_close]: Request to close the stream. | 566 * * [_close]: Request to close the stream. |
| 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or | 567 * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
| 568 * when losing the last subscriber. | 568 * when losing the last subscriber. |
| 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 569 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 570 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 570 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
| 571 * * [_isInputPaused]: Test whether the stream is currently paused. | 571 * * [_isInputPaused]: Test whether the stream is currently paused. |
| 572 * The user should not add new events while the stream is paused, but if it | 572 * The user should not add new events while the stream is paused, but if it |
| 573 * happens anyway, the stream will enqueue the events just as when new events | 573 * happens anyway, the stream will enqueue the events just as when new events |
| 574 * arrive while still firing an old event. | 574 * arrive while still firing an old event. |
| 575 */ | 575 */ |
| (...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 669 * | 669 * |
| 670 * The only public methods are those of [Stream], so instances of | 670 * The only public methods are those of [Stream], so instances of |
| 671 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing | 671 * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
| 672 * internal functionality. | 672 * internal functionality. |
| 673 * | 673 * |
| 674 * The [StreamController] is a public facing version of this class, with | 674 * The [StreamController] is a public facing version of this class, with |
| 675 * some methods made public. | 675 * some methods made public. |
| 676 * | 676 * |
| 677 * The user interface of [_MultiStreamImpl] are the following methods: | 677 * The user interface of [_MultiStreamImpl] are the following methods: |
| 678 * * [_add]: Add a data event to the stream. | 678 * * [_add]: Add a data event to the stream. |
| 679 * * [_signalError]: Add an error event to the stream. | 679 * * [_addError]: Add an error event to the stream. |
| 680 * * [_close]: Request to close the stream. | 680 * * [_close]: Request to close the stream. |
| 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or | 681 * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
| 682 * when losing the last subscriber. | 682 * when losing the last subscriber. |
| 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. | 683 * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| 684 * * [_hasSubscribers]: Test whether there are currently any subscribers. | 684 * * [_hasSubscribers]: Test whether there are currently any subscribers. |
| 685 * * [_isPaused]: Test whether the stream is currently paused. | 685 * * [_isPaused]: Test whether the stream is currently paused. |
| 686 * The user should not add new events while the stream is paused, but if it | 686 * The user should not add new events while the stream is paused, but if it |
| 687 * happens anyway, the stream will enqueue the events just as when new events | 687 * happens anyway, the stream will enqueue the events just as when new events |
| 688 * arrive while still firing an old event. | 688 * arrive while still firing an old event. |
| 689 */ | 689 */ |
| (...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 834 */ | 834 */ |
| 835 _GeneratedSingleStreamImpl(_PendingEvents events) { | 835 _GeneratedSingleStreamImpl(_PendingEvents events) { |
| 836 _pendingEvents = events; | 836 _pendingEvents = events; |
| 837 _setClosed(); // Closed for input since all events are already pending. | 837 _setClosed(); // Closed for input since all events are already pending. |
| 838 } | 838 } |
| 839 | 839 |
| 840 void _add(T value) { | 840 void _add(T value) { |
| 841 throw new UnsupportedError("Cannot inject events into generated stream"); | 841 throw new UnsupportedError("Cannot inject events into generated stream"); |
| 842 } | 842 } |
| 843 | 843 |
| 844 void _signalError(AsyncError value) { | 844 void _addError(AsyncError value) { |
| 845 throw new UnsupportedError("Cannot inject events into generated stream"); | 845 throw new UnsupportedError("Cannot inject events into generated stream"); |
| 846 } | 846 } |
| 847 | 847 |
| 848 void _close() { | 848 void _close() { |
| 849 throw new UnsupportedError("Cannot inject events into generated stream"); | 849 throw new UnsupportedError("Cannot inject events into generated stream"); |
| 850 } | 850 } |
| 851 } | 851 } |
| 852 | 852 |
| 853 | 853 |
| 854 /** Pending events object that gets its events from an [Iterable]. */ | 854 /** Pending events object that gets its events from an [Iterable]. */ |
| (...skipping 236 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1091 otherNext._previousLink = listLast; | 1091 otherNext._previousLink = listLast; |
| 1092 _InternalLink otherLast = other._previousLink; | 1092 _InternalLink otherLast = other._previousLink; |
| 1093 list._previousLink = otherLast; | 1093 list._previousLink = otherLast; |
| 1094 otherLast._nextLink = list; | 1094 otherLast._nextLink = list; |
| 1095 // Clean up [other]. | 1095 // Clean up [other]. |
| 1096 other._nextLink = other._previousLink = other; | 1096 other._nextLink = other._previousLink = other; |
| 1097 } | 1097 } |
| 1098 } | 1098 } |
| 1099 | 1099 |
| 1100 /** Abstract type for an internal interface for sending events. */ | 1100 /** Abstract type for an internal interface for sending events. */ |
| 1101 abstract class _StreamOutputSink<T> { | 1101 abstract class _EventOutputSink<T> { |
| 1102 _sendData(T data); | 1102 _sendData(T data); |
| 1103 _sendError(AsyncError error); | 1103 _sendError(AsyncError error); |
| 1104 _sendDone(); | 1104 _sendDone(); |
| 1105 } | 1105 } |
| 1106 | 1106 |
| 1107 abstract class _StreamListener<T> extends _InternalLink | 1107 abstract class _StreamListener<T> extends _InternalLink |
| 1108 implements _StreamOutputSink<T> { | 1108 implements _EventOutputSink<T> { |
| 1109 final _StreamImpl _source; | 1109 final _StreamImpl _source; |
| 1110 int _state = _LISTENER_UNSUBSCRIBED; | 1110 int _state = _LISTENER_UNSUBSCRIBED; |
| 1111 | 1111 |
| 1112 _StreamListener(this._source); | 1112 _StreamListener(this._source); |
| 1113 | 1113 |
| 1114 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); | 1114 bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
| 1115 | 1115 |
| 1116 bool get _isPendingUnsubscribe => | 1116 bool get _isPendingUnsubscribe => |
| 1117 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; | 1117 (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
| 1118 | 1118 |
| (...skipping 206 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1325 } | 1325 } |
| 1326 | 1326 |
| 1327 /** | 1327 /** |
| 1328 * Subscribe or unsubscribe on [_source] depending on whether | 1328 * Subscribe or unsubscribe on [_source] depending on whether |
| 1329 * [_stream] has subscribers. | 1329 * [_stream] has subscribers. |
| 1330 */ | 1330 */ |
| 1331 void _onSubscriptionStateChange() { | 1331 void _onSubscriptionStateChange() { |
| 1332 if (_hasSubscribers) { | 1332 if (_hasSubscribers) { |
| 1333 assert(_subscription == null); | 1333 assert(_subscription == null); |
| 1334 _subscription = _source.listen(this._add, | 1334 _subscription = _source.listen(this._add, |
| 1335 onError: this._signalError, | 1335 onError: this._addError, |
| 1336 onDone: this._close); | 1336 onDone: this._close); |
| 1337 } else { | 1337 } else { |
| 1338 // TODO(lrn): Check why this can happen. | 1338 // TODO(lrn): Check why this can happen. |
| 1339 if (_subscription == null) return; | 1339 if (_subscription == null) return; |
| 1340 _subscription.cancel(); | 1340 _subscription.cancel(); |
| 1341 _subscription = null; | 1341 _subscription = null; |
| 1342 } | 1342 } |
| 1343 } | 1343 } |
| 1344 } | 1344 } |
| OLD | NEW |