| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index fe66c2e6b1793425d735c8f37c41e0f1aa1b23bf..544737119297046686601bf9ae3d2ba13e249428 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -204,6 +204,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
|
|
| void _startFiring() {
|
| assert(!_isFiring);
|
| + assert(_hasSubscribers);
|
| + assert(!_isPaused);
|
| // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
|
| // bit. All current subscribers will now have a _LISTENER_EVENT_ID
|
| // that doesn't match _STREAM_EVENT_ID, and they will receive the
|
| @@ -214,6 +216,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| void _endFiring() {
|
| assert(_isFiring);
|
| _state ^= _STREAM_FIRING;
|
| + if (_isPaused) _onPauseStateChange();
|
| + if (!_hasSubscribers) _onSubscriptionStateChange();
|
| }
|
|
|
| /**
|
| @@ -236,7 +240,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| if (resumeSignal != null) {
|
| resumeSignal.whenComplete(() { this._resume(listener, true); });
|
| }
|
| - if (!wasPaused) {
|
| + if (!wasPaused && !_isFiring) {
|
| _onPauseStateChange();
|
| }
|
| }
|
| @@ -248,18 +252,24 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| assert(_isPaused);
|
| _decrementPauseCount(listener);
|
| if (!_isPaused) {
|
| - _onPauseStateChange();
|
| + if (!_isFiring) _onPauseStateChange();
|
| if (_hasPendingEvent) {
|
| // If we can fire events now, fire any pending events right away.
|
| if (fromEvent && !_isFiring) {
|
| _handlePendingEvents();
|
| } else {
|
| - _pendingEvents.schedule(this);
|
| + _schedulePendingEvents();
|
| }
|
| }
|
| }
|
| }
|
|
|
| + /** Schedule pending events to be executed. */
|
| + void _schedulePendingEvents() {
|
| + assert(_hasPendingEvent);
|
| + _pendingEvents.schedule(this);
|
| + }
|
| +
|
| /** Create a subscription object. Called by [subcribe]. */
|
| _StreamSubscriptionImpl<T> _createSubscription(
|
| void onData(T data),
|
| @@ -383,7 +393,6 @@ abstract class _StreamImpl<T> extends Stream<T> {
|
| }
|
| });
|
| assert(!_hasSubscribers);
|
| - _onSubscriptionStateChange();
|
| }
|
| }
|
|
|
| @@ -427,7 +436,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| bool get isSingleSubscription => true;
|
|
|
| /** Whether one or more active subscribers have requested a pause. */
|
| - bool get _isPaused => !_hasSubscribers || super._isPaused;
|
| + bool get _isPaused => (!_hasSubscribers && !_isClosed) || super._isPaused;
|
|
|
| /** Whether there is currently a subscriber on this [Stream]. */
|
| bool get _hasSubscribers => _subscriber != null;
|
| @@ -455,9 +464,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| subscription._setSubscribed(0);
|
| _onSubscriptionStateChange();
|
| if (_hasPendingEvent) {
|
| - new Timer(0, (_) {
|
| - _handlePendingEvents();
|
| - });
|
| + _schedulePendingEvents();
|
| }
|
| }
|
|
|
| @@ -479,16 +486,20 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| return;
|
| }
|
| _subscriber = null;
|
| - int timesPaused = subscriber._setUnsubscribed();
|
| - _updatePauseCount(-timesPaused);
|
| - if (timesPaused > 0) {
|
| - _onPauseStateChange();
|
| + // Unsubscribing a paused subscription also cancels its pauses.
|
| + int subscriptionPauseCount = subscriber._setUnsubscribed();
|
| + _updatePauseCount(-subscriptionPauseCount);
|
| + if (!_isFiring) {
|
| + if (subscriptionPauseCount > 0) {
|
| + _onPauseStateChange();
|
| + }
|
| + _onSubscriptionStateChange();
|
| }
|
| - _onSubscriptionStateChange();
|
| }
|
|
|
| void _forEachSubscriber(
|
| void action(_StreamListener<T> subscription)) {
|
| + assert(!_isPaused);
|
| _StreamListener subscription = _subscriber;
|
| assert(subscription != null);
|
| _startFiring();
|
| @@ -594,8 +605,6 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
|
| }
|
| }
|
| _endFiring();
|
| - if (_isPaused) _onPauseStateChange();
|
| - if (!_hasSubscribers) _onSubscriptionStateChange();
|
| }
|
|
|
| void _addListener(_StreamListener listener) {
|
| @@ -659,6 +668,15 @@ abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
|
| bool _isHandlingPendingEvents = false;
|
| bool get _hasPendingEvent => !_isClosed;
|
|
|
| + void _schedulePendingEvents() {
|
| + if (_pendingEvents != null) {
|
| + _pendingEvents.schedule(this);
|
| + } else {
|
| + // In the case where there only pending events are generated ones.
|
| + new Timer(0, (_) { _handlePendingEvents(); });
|
| + }
|
| + }
|
| +
|
| /**
|
| * Generate one (or possibly more) new events.
|
| *
|
|
|