Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index fe66c2e6b1793425d735c8f37c41e0f1aa1b23bf..544737119297046686601bf9ae3d2ba13e249428 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -204,6 +204,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
void _startFiring() { |
assert(!_isFiring); |
+ assert(_hasSubscribers); |
+ assert(!_isPaused); |
// This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
// bit. All current subscribers will now have a _LISTENER_EVENT_ID |
// that doesn't match _STREAM_EVENT_ID, and they will receive the |
@@ -214,6 +216,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
void _endFiring() { |
assert(_isFiring); |
_state ^= _STREAM_FIRING; |
+ if (_isPaused) _onPauseStateChange(); |
+ if (!_hasSubscribers) _onSubscriptionStateChange(); |
} |
/** |
@@ -236,7 +240,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
if (resumeSignal != null) { |
resumeSignal.whenComplete(() { this._resume(listener, true); }); |
} |
- if (!wasPaused) { |
+ if (!wasPaused && !_isFiring) { |
_onPauseStateChange(); |
} |
} |
@@ -248,18 +252,24 @@ abstract class _StreamImpl<T> extends Stream<T> { |
assert(_isPaused); |
_decrementPauseCount(listener); |
if (!_isPaused) { |
- _onPauseStateChange(); |
+ if (!_isFiring) _onPauseStateChange(); |
if (_hasPendingEvent) { |
// If we can fire events now, fire any pending events right away. |
if (fromEvent && !_isFiring) { |
_handlePendingEvents(); |
} else { |
- _pendingEvents.schedule(this); |
+ _schedulePendingEvents(); |
} |
} |
} |
} |
+ /** Schedule pending events to be executed. */ |
+ void _schedulePendingEvents() { |
+ assert(_hasPendingEvent); |
+ _pendingEvents.schedule(this); |
+ } |
+ |
/** Create a subscription object. Called by [subcribe]. */ |
_StreamSubscriptionImpl<T> _createSubscription( |
void onData(T data), |
@@ -383,7 +393,6 @@ abstract class _StreamImpl<T> extends Stream<T> { |
} |
}); |
assert(!_hasSubscribers); |
- _onSubscriptionStateChange(); |
} |
} |
@@ -427,7 +436,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
bool get isSingleSubscription => true; |
/** Whether one or more active subscribers have requested a pause. */ |
- bool get _isPaused => !_hasSubscribers || super._isPaused; |
+ bool get _isPaused => (!_hasSubscribers && !_isClosed) || super._isPaused; |
/** Whether there is currently a subscriber on this [Stream]. */ |
bool get _hasSubscribers => _subscriber != null; |
@@ -455,9 +464,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
subscription._setSubscribed(0); |
_onSubscriptionStateChange(); |
if (_hasPendingEvent) { |
- new Timer(0, (_) { |
- _handlePendingEvents(); |
- }); |
+ _schedulePendingEvents(); |
} |
} |
@@ -479,16 +486,20 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
return; |
} |
_subscriber = null; |
- int timesPaused = subscriber._setUnsubscribed(); |
- _updatePauseCount(-timesPaused); |
- if (timesPaused > 0) { |
- _onPauseStateChange(); |
+ // Unsubscribing a paused subscription also cancels its pauses. |
+ int subscriptionPauseCount = subscriber._setUnsubscribed(); |
+ _updatePauseCount(-subscriptionPauseCount); |
+ if (!_isFiring) { |
+ if (subscriptionPauseCount > 0) { |
+ _onPauseStateChange(); |
+ } |
+ _onSubscriptionStateChange(); |
} |
- _onSubscriptionStateChange(); |
} |
void _forEachSubscriber( |
void action(_StreamListener<T> subscription)) { |
+ assert(!_isPaused); |
_StreamListener subscription = _subscriber; |
assert(subscription != null); |
_startFiring(); |
@@ -594,8 +605,6 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
} |
} |
_endFiring(); |
- if (_isPaused) _onPauseStateChange(); |
- if (!_hasSubscribers) _onSubscriptionStateChange(); |
} |
void _addListener(_StreamListener listener) { |
@@ -659,6 +668,15 @@ abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
bool _isHandlingPendingEvents = false; |
bool get _hasPendingEvent => !_isClosed; |
+ void _schedulePendingEvents() { |
+ if (_pendingEvents != null) { |
+ _pendingEvents.schedule(this); |
+ } else { |
+ // In the case where there only pending events are generated ones. |
+ new Timer(0, (_) { _handlePendingEvents(); }); |
+ } |
+ } |
+ |
/** |
* Generate one (or possibly more) new events. |
* |