Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 7183dc11ff0ed7025a95765c016f05fae5bdf3d1..0defe9c37e84febc53b6b4952716cb177f6e5a3a 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -204,6 +204,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| void _startFiring() { |
| assert(!_isFiring); |
| + assert(_hasSubscribers); |
| + assert(!_isPaused); |
| // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
| // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
| // that doesn't match _STREAM_EVENT_ID, and they will receive the |
| @@ -214,6 +216,8 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| void _endFiring() { |
| assert(_isFiring); |
| _state ^= _STREAM_FIRING; |
| + if (_isPaused) _onPauseStateChange(); |
| + if (!_hasSubscribers) _onSubscriptionStateChange(); |
| } |
| /** |
| @@ -236,7 +240,7 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| if (resumeSignal != null) { |
| resumeSignal.whenComplete(() { this._resume(listener, true); }); |
| } |
| - if (!wasPaused) { |
| + if (!wasPaused && !_isFiring) { |
| _onPauseStateChange(); |
| } |
| } |
| @@ -248,18 +252,24 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| assert(_isPaused); |
| _decrementPauseCount(listener); |
| if (!_isPaused) { |
| - _onPauseStateChange(); |
| + if (!_isFiring) _onPauseStateChange(); |
| if (_hasPendingEvent) { |
| // If we can fire events now, fire any pending events right away. |
| if (fromEvent && !_isFiring) { |
| _handlePendingEvents(); |
| } else { |
| - _pendingEvents.schedule(this); |
| + _schedulePendingEvents(); |
| } |
| } |
| } |
| } |
| + /** Schedule pending events to be executed. */ |
| + void _schedulePendingEvents() { |
| + assert(_hasPendingEvent); |
| + _pendingEvents.schedule(this); |
| + } |
| + |
| /** Create a subscription object. Called by [subcribe]. */ |
| _StreamSubscriptionImpl<T> _createSubscription( |
| void onData(T data), |
| @@ -383,7 +393,6 @@ abstract class _StreamImpl<T> extends Stream<T> { |
| } |
| }); |
| assert(!_hasSubscribers); |
| - _onSubscriptionStateChange(); |
| } |
| } |
| @@ -453,9 +462,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| subscription._setSubscribed(0); |
| _onSubscriptionStateChange(); |
| if (_hasPendingEvent) { |
| - new Timer(0, (_) { |
| - _handlePendingEvents(); |
| - }); |
| + _schedulePendingEvents(); |
| } |
| } |
| @@ -479,14 +486,17 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| _subscriber = null; |
| int timesPaused = subscriber._setUnsubscribed(); |
|
floitsch
2013/01/14 16:31:24
Add comment: Unsubscribing a paused subscriber can
Lasse Reichstein Nielsen
2013/01/15 08:52:54
Done.
|
| _updatePauseCount(-timesPaused); |
| - if (timesPaused > 0) { |
| - _onPauseStateChange(); |
| + if (!_isFiring) { |
| + if (timesPaused > 0) { |
| + _onPauseStateChange(); |
| + } |
| + _onSubscriptionStateChange(); |
| } |
| - _onSubscriptionStateChange(); |
| } |
| void _forEachSubscriber( |
| void action(_StreamListener<T> subscription)) { |
| + assert(!_isPaused); |
| _StreamListener subscription = _subscriber; |
| assert(subscription != null); |
| _startFiring(); |
| @@ -590,8 +600,6 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
| } |
| } |
| _endFiring(); |
| - if (_isPaused) _onPauseStateChange(); |
| - if (!_hasSubscribers) _onSubscriptionStateChange(); |
| } |
| void _addListener(_StreamListener listener) { |
| @@ -631,7 +639,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
| } else { |
| bool wasPaused = _isPaused; |
| _removeListener(listener); |
| - if (wasPaused != _isPaused) _onPauseStateChange(); |
| + if (!identical(wasPaused, _isPaused)) _onPauseStateChange(); |
|
floitsch
2013/01/14 16:31:24
why?
Lasse Reichstein Nielsen
2013/01/15 08:52:54
Probably overoptimizing :)
Reverted.
|
| if (!_hasSubscribers) _onSubscriptionStateChange(); |
| } |
| } |
| @@ -655,6 +663,15 @@ abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
| bool _isHandlingPendingEvents = false; |
| bool get _hasPendingEvent => !_isClosed; |
| + void _schedulePendingEvents() { |
| + if (_pendingEvents != null) { |
| + _pendingEvents.schedule(this); |
| + } else { |
| + // In the case where there only pending events are generated ones. |
|
floitsch
2013/01/14 16:31:24
Don't understand comment.
Lasse Reichstein Nielsen
2013/01/15 08:52:54
It's commenting that in this particular class, it'
floitsch
2013/01/15 14:40:19
Did you commit? The comment still looks the same.
|
| + new Timer(0, (_) { _handlePendingEvents(); }); |
| + } |
| + } |
| + |
| /** |
| * Generate one (or possibly more) new events. |
| * |