Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 7183dc11ff0ed7025a95765c016f05fae5bdf3d1..0defe9c37e84febc53b6b4952716cb177f6e5a3a 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -204,6 +204,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
void _startFiring() { |
assert(!_isFiring); |
+ assert(_hasSubscribers); |
+ assert(!_isPaused); |
// This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
// bit. All current subscribers will now have a _LISTENER_EVENT_ID |
// that doesn't match _STREAM_EVENT_ID, and they will receive the |
@@ -214,6 +216,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
void _endFiring() { |
assert(_isFiring); |
_state ^= _STREAM_FIRING; |
+ if (_isPaused) _onPauseStateChange(); |
+ if (!_hasSubscribers) _onSubscriptionStateChange(); |
} |
/** |
@@ -236,7 +240,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
if (resumeSignal != null) { |
resumeSignal.whenComplete(() { this._resume(listener, true); }); |
} |
- if (!wasPaused) { |
+ if (!wasPaused && !_isFiring) { |
_onPauseStateChange(); |
} |
} |
@@ -248,18 +252,24 @@ abstract class _StreamImpl<T> extends Stream<T> { |
assert(_isPaused); |
_decrementPauseCount(listener); |
if (!_isPaused) { |
- _onPauseStateChange(); |
+ if (!_isFiring) _onPauseStateChange(); |
if (_hasPendingEvent) { |
// If we can fire events now, fire any pending events right away. |
if (fromEvent && !_isFiring) { |
_handlePendingEvents(); |
} else { |
- _pendingEvents.schedule(this); |
+ _schedulePendingEvents(); |
} |
} |
} |
} |
+ /** Schedule pending events to be executed. */ |
+ void _schedulePendingEvents() { |
+ assert(_hasPendingEvent); |
+ _pendingEvents.schedule(this); |
+ } |
+ |
/** Create a subscription object. Called by [subcribe]. */ |
_StreamSubscriptionImpl<T> _createSubscription( |
void onData(T data), |
@@ -383,7 +393,6 @@ abstract class _StreamImpl<T> extends Stream<T> { |
} |
}); |
assert(!_hasSubscribers); |
- _onSubscriptionStateChange(); |
} |
} |
@@ -453,9 +462,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
subscription._setSubscribed(0); |
_onSubscriptionStateChange(); |
if (_hasPendingEvent) { |
- new Timer(0, (_) { |
- _handlePendingEvents(); |
- }); |
+ _schedulePendingEvents(); |
} |
} |
@@ -479,14 +486,17 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
_subscriber = null; |
int timesPaused = subscriber._setUnsubscribed(); |
floitsch
2013/01/14 16:31:24
Add comment: Unsubscribing a paused subscriber can
Lasse Reichstein Nielsen
2013/01/15 08:52:54
Done.
|
_updatePauseCount(-timesPaused); |
- if (timesPaused > 0) { |
- _onPauseStateChange(); |
+ if (!_isFiring) { |
+ if (timesPaused > 0) { |
+ _onPauseStateChange(); |
+ } |
+ _onSubscriptionStateChange(); |
} |
- _onSubscriptionStateChange(); |
} |
void _forEachSubscriber( |
void action(_StreamListener<T> subscription)) { |
+ assert(!_isPaused); |
_StreamListener subscription = _subscriber; |
assert(subscription != null); |
_startFiring(); |
@@ -590,8 +600,6 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
} |
} |
_endFiring(); |
- if (_isPaused) _onPauseStateChange(); |
- if (!_hasSubscribers) _onSubscriptionStateChange(); |
} |
void _addListener(_StreamListener listener) { |
@@ -631,7 +639,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
} else { |
bool wasPaused = _isPaused; |
_removeListener(listener); |
- if (wasPaused != _isPaused) _onPauseStateChange(); |
+ if (!identical(wasPaused, _isPaused)) _onPauseStateChange(); |
floitsch
2013/01/14 16:31:24
why?
Lasse Reichstein Nielsen
2013/01/15 08:52:54
Probably overoptimizing :)
Reverted.
|
if (!_hasSubscribers) _onSubscriptionStateChange(); |
} |
} |
@@ -655,6 +663,15 @@ abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
bool _isHandlingPendingEvents = false; |
bool get _hasPendingEvent => !_isClosed; |
+ void _schedulePendingEvents() { |
+ if (_pendingEvents != null) { |
+ _pendingEvents.schedule(this); |
+ } else { |
+ // In the case where there only pending events are generated ones. |
floitsch
2013/01/14 16:31:24
Don't understand comment.
Lasse Reichstein Nielsen
2013/01/15 08:52:54
It's commenting that in this particular class, it'
floitsch
2013/01/15 14:40:19
Did you commit? The comment still looks the same.
|
+ new Timer(0, (_) { _handlePendingEvents(); }); |
+ } |
+ } |
+ |
/** |
* Generate one (or possibly more) new events. |
* |