Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(526)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 11880019: Avoid the _onSubscriptionStateChange being called twice in some cases. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Make single-stream not paused when it's closed. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/lib/async/stream_from_iterable_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index fe66c2e6b1793425d735c8f37c41e0f1aa1b23bf..544737119297046686601bf9ae3d2ba13e249428 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -204,6 +204,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _startFiring() {
assert(!_isFiring);
+ assert(_hasSubscribers);
+ assert(!_isPaused);
// This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
// bit. All current subscribers will now have a _LISTENER_EVENT_ID
// that doesn't match _STREAM_EVENT_ID, and they will receive the
@@ -214,6 +216,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _endFiring() {
assert(_isFiring);
_state ^= _STREAM_FIRING;
+ if (_isPaused) _onPauseStateChange();
+ if (!_hasSubscribers) _onSubscriptionStateChange();
}
/**
@@ -236,7 +240,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
if (resumeSignal != null) {
resumeSignal.whenComplete(() { this._resume(listener, true); });
}
- if (!wasPaused) {
+ if (!wasPaused && !_isFiring) {
_onPauseStateChange();
}
}
@@ -248,18 +252,24 @@ abstract class _StreamImpl<T> extends Stream<T> {
assert(_isPaused);
_decrementPauseCount(listener);
if (!_isPaused) {
- _onPauseStateChange();
+ if (!_isFiring) _onPauseStateChange();
if (_hasPendingEvent) {
// If we can fire events now, fire any pending events right away.
if (fromEvent && !_isFiring) {
_handlePendingEvents();
} else {
- _pendingEvents.schedule(this);
+ _schedulePendingEvents();
}
}
}
}
+ /** Schedule pending events to be executed. */
+ void _schedulePendingEvents() {
+ assert(_hasPendingEvent);
+ _pendingEvents.schedule(this);
+ }
+
/** Create a subscription object. Called by [subcribe]. */
_StreamSubscriptionImpl<T> _createSubscription(
void onData(T data),
@@ -383,7 +393,6 @@ abstract class _StreamImpl<T> extends Stream<T> {
}
});
assert(!_hasSubscribers);
- _onSubscriptionStateChange();
}
}
@@ -427,7 +436,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
bool get isSingleSubscription => true;
/** Whether one or more active subscribers have requested a pause. */
- bool get _isPaused => !_hasSubscribers || super._isPaused;
+ bool get _isPaused => (!_hasSubscribers && !_isClosed) || super._isPaused;
/** Whether there is currently a subscriber on this [Stream]. */
bool get _hasSubscribers => _subscriber != null;
@@ -455,9 +464,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
subscription._setSubscribed(0);
_onSubscriptionStateChange();
if (_hasPendingEvent) {
- new Timer(0, (_) {
- _handlePendingEvents();
- });
+ _schedulePendingEvents();
}
}
@@ -479,16 +486,20 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
return;
}
_subscriber = null;
- int timesPaused = subscriber._setUnsubscribed();
- _updatePauseCount(-timesPaused);
- if (timesPaused > 0) {
- _onPauseStateChange();
+ // Unsubscribing a paused subscription also cancels its pauses.
+ int subscriptionPauseCount = subscriber._setUnsubscribed();
+ _updatePauseCount(-subscriptionPauseCount);
+ if (!_isFiring) {
+ if (subscriptionPauseCount > 0) {
+ _onPauseStateChange();
+ }
+ _onSubscriptionStateChange();
}
- _onSubscriptionStateChange();
}
void _forEachSubscriber(
void action(_StreamListener<T> subscription)) {
+ assert(!_isPaused);
_StreamListener subscription = _subscriber;
assert(subscription != null);
_startFiring();
@@ -594,8 +605,6 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
}
}
_endFiring();
- if (_isPaused) _onPauseStateChange();
- if (!_hasSubscribers) _onSubscriptionStateChange();
}
void _addListener(_StreamListener listener) {
@@ -659,6 +668,15 @@ abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
bool _isHandlingPendingEvents = false;
bool get _hasPendingEvent => !_isClosed;
+ void _schedulePendingEvents() {
+ if (_pendingEvents != null) {
+ _pendingEvents.schedule(this);
+ } else {
+ // In the case where there only pending events are generated ones.
+ new Timer(0, (_) { _handlePendingEvents(); });
+ }
+ }
+
/**
* Generate one (or possibly more) new events.
*
« no previous file with comments | « no previous file | tests/lib/async/stream_from_iterable_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698