| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index e3ebd367c89591497d6919a6df067b94281b8ffc..e6de3d7dc38c073531119041c2bbee913ef0cfc0 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -226,7 +226,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
|
|
|
| /** Whether there is currently a subscriber on this [Stream]. */
|
| - bool get _hasSubscribers;
|
| + bool get _hasListener;
|
|
|
|
|
| /** Whether the state bits allow firing. */
|
| @@ -286,7 +286,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| void _startFiring() {
|
| assert(!_isFiring);
|
| assert(!_isInCallback);
|
| - assert(_hasSubscribers);
|
| + assert(_hasListener);
|
| assert(!_isPaused);
|
| // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
|
| // bit. All current subscribers will now have a _LISTENER_EVENT_ID
|
| @@ -417,7 +417,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| * provided one. Repeats calling callbacks as long as the call changes
|
| * the state.
|
| */
|
| - void _checkCallbacks(bool hadSubscribers, bool wasPaused) {
|
| + void _checkCallbacks(bool hadListener, bool wasPaused) {
|
| assert(!_isFiring);
|
| // Will be handled after the current callback.
|
| if (_isInCallback) return;
|
| @@ -426,9 +426,9 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| }
|
| _state |= _STREAM_CALLBACK;
|
| while (true) {
|
| - bool hasSubscribers = _hasSubscribers;
|
| + bool hasListener = _hasListener;
|
| bool isPaused = _isInputPaused;
|
| - if (hadSubscribers != hasSubscribers) {
|
| + if (hadListener != hasListener) {
|
| _onSubscriptionStateChange();
|
| } else if (isPaused != wasPaused) {
|
| _onPauseStateChange();
|
| @@ -437,7 +437,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| return;
|
| }
|
| wasPaused = isPaused;
|
| - hadSubscribers = hasSubscribers;
|
| + hadListener = hasListener;
|
| }
|
| }
|
|
|
| @@ -451,7 +451,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| /**
|
| * Called when the first listener subscribes or the last unsubscribes.
|
| *
|
| - * Read [hasSubscribers] to see what the new state is.
|
| + * Read [hasListener] to see what the new state is.
|
| */
|
| void _onSubscriptionStateChange() {}
|
|
|
| @@ -489,7 +489,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| _sendData(T value) {
|
| assert(!_isPaused);
|
| assert(!_isComplete);
|
| - if (!_hasSubscribers) return;
|
| + if (!_hasListener) return;
|
| _forEachSubscriber((subscriber) {
|
| try {
|
| subscriber._sendData(value);
|
| @@ -507,7 +507,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| void _sendError(AsyncError error) {
|
| assert(!_isPaused);
|
| assert(!_isComplete);
|
| - if (!_hasSubscribers) return;
|
| + if (!_hasListener) return;
|
| _forEachSubscriber((subscriber) {
|
| try {
|
| subscriber._sendError(error);
|
| @@ -528,7 +528,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| assert(!_isPaused);
|
| assert(_isClosed);
|
| _setComplete();
|
| - if (!_hasSubscribers) return;
|
| + if (!_hasListener) return;
|
| _forEachSubscriber((subscriber) {
|
| _cancel(subscriber);
|
| try {
|
| @@ -539,7 +539,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| new AsyncError(e, s).throwDelayed();
|
| }
|
| });
|
| - assert(!_hasSubscribers);
|
| + assert(!_hasListener);
|
| }
|
| }
|
|
|
| @@ -567,7 +567,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
|
| * when losing the last subscriber.
|
| * * [_onPauseStateChange]: Called when entering or leaving paused mode.
|
| - * * [_hasSubscribers]: Test whether there are currently any subscribers.
|
| + * * [_hasListener]: Test whether there are currently any subscribers.
|
| * * [_isInputPaused]: Test whether the stream is currently paused.
|
| * The user should not add new events while the stream is paused, but if it
|
| * happens anyway, the stream will enqueue the events just as when new events
|
| @@ -577,7 +577,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| _StreamListener _subscriber = null;
|
|
|
| /** Whether there is currently a subscriber on this [Stream]. */
|
| - bool get _hasSubscribers => _subscriber != null;
|
| + bool get _hasListener => _subscriber != null;
|
|
|
| // -------------------------------------------------------------------
|
| // Internal implementation.
|
| @@ -601,7 +601,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
|
|
| void _addListener(_StreamListener subscription) {
|
| assert(!_isComplete);
|
| - if (_hasSubscribers) {
|
| + if (_hasListener) {
|
| throw new StateError("Stream already has subscriber.");
|
| }
|
| assert(_pauseCount == 1);
|
| @@ -681,7 +681,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
|
| * when losing the last subscriber.
|
| * * [_onPauseStateChange]: Called when entering or leaving paused mode.
|
| - * * [_hasSubscribers]: Test whether there are currently any subscribers.
|
| + * * [_hasListener]: Test whether there are currently any subscribers.
|
| * * [_isPaused]: Test whether the stream is currently paused.
|
| * The user should not add new events while the stream is paused, but if it
|
| * happens anyway, the stream will enqueue the events just as when new events
|
| @@ -705,7 +705,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
|
| // Helper functions that can be overridden in subclasses.
|
|
|
| /** Whether there are currently any subscribers on this [Stream]. */
|
| - bool get _hasSubscribers => !_InternalLinkList.isEmpty(this);
|
| + bool get _hasListener => !_InternalLinkList.isEmpty(this);
|
|
|
| /**
|
| * Create the new subscription object.
|
| @@ -737,7 +737,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
|
| void _forEachSubscriber(
|
| void action(_StreamListener<T> subscription)) {
|
| assert(!_isFiring);
|
| - if (!_hasSubscribers) return;
|
| + if (!_hasListener) return;
|
| bool wasInputPaused = _isInputPaused;
|
| _startFiring();
|
| _InternalLink cursor = this._nextLink;
|
| @@ -758,9 +758,9 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
|
|
|
| void _addListener(_StreamListener listener) {
|
| listener._setSubscribed(_currentEventIdBit);
|
| - bool hadSubscribers = _hasSubscribers;
|
| + bool hadListener = _hasListener;
|
| _InternalLinkList.add(this, listener);
|
| - if (!hadSubscribers && _isInactive) {
|
| + if (!hadListener && _isInactive) {
|
| _checkCallbacks(false, false);
|
| if (!_isPaused && _hasPendingEvent) {
|
| _schedulePendingEvents();
|
| @@ -1323,7 +1323,7 @@ class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
|
| * [_stream] has subscribers.
|
| */
|
| void _onSubscriptionStateChange() {
|
| - if (_hasSubscribers) {
|
| + if (_hasListener) {
|
| assert(_subscription == null);
|
| _subscription = _source.listen(this._add,
|
| onError: this._addError,
|
|
|