Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(17)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 11880019: Avoid the _onSubscriptionStateChange being called twice in some cases. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | sdk/lib/async/stream_pipe.dart » ('j') | sdk/lib/async/stream_pipe.dart » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 7183dc11ff0ed7025a95765c016f05fae5bdf3d1..0defe9c37e84febc53b6b4952716cb177f6e5a3a 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -204,6 +204,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _startFiring() {
assert(!_isFiring);
+ assert(_hasSubscribers);
+ assert(!_isPaused);
// This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
// bit. All current subscribers will now have a _LISTENER_EVENT_ID
// that doesn't match _STREAM_EVENT_ID, and they will receive the
@@ -214,6 +216,8 @@ abstract class _StreamImpl<T> extends Stream<T> {
void _endFiring() {
assert(_isFiring);
_state ^= _STREAM_FIRING;
+ if (_isPaused) _onPauseStateChange();
+ if (!_hasSubscribers) _onSubscriptionStateChange();
}
/**
@@ -236,7 +240,7 @@ abstract class _StreamImpl<T> extends Stream<T> {
if (resumeSignal != null) {
resumeSignal.whenComplete(() { this._resume(listener, true); });
}
- if (!wasPaused) {
+ if (!wasPaused && !_isFiring) {
_onPauseStateChange();
}
}
@@ -248,18 +252,24 @@ abstract class _StreamImpl<T> extends Stream<T> {
assert(_isPaused);
_decrementPauseCount(listener);
if (!_isPaused) {
- _onPauseStateChange();
+ if (!_isFiring) _onPauseStateChange();
if (_hasPendingEvent) {
// If we can fire events now, fire any pending events right away.
if (fromEvent && !_isFiring) {
_handlePendingEvents();
} else {
- _pendingEvents.schedule(this);
+ _schedulePendingEvents();
}
}
}
}
+ /** Schedule pending events to be executed. */
+ void _schedulePendingEvents() {
+ assert(_hasPendingEvent);
+ _pendingEvents.schedule(this);
+ }
+
/** Create a subscription object. Called by [subcribe]. */
_StreamSubscriptionImpl<T> _createSubscription(
void onData(T data),
@@ -383,7 +393,6 @@ abstract class _StreamImpl<T> extends Stream<T> {
}
});
assert(!_hasSubscribers);
- _onSubscriptionStateChange();
}
}
@@ -453,9 +462,7 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
subscription._setSubscribed(0);
_onSubscriptionStateChange();
if (_hasPendingEvent) {
- new Timer(0, (_) {
- _handlePendingEvents();
- });
+ _schedulePendingEvents();
}
}
@@ -479,14 +486,17 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
_subscriber = null;
int timesPaused = subscriber._setUnsubscribed();
floitsch 2013/01/14 16:31:24 Add comment: Unsubscribing a paused subscriber can
Lasse Reichstein Nielsen 2013/01/15 08:52:54 Done.
_updatePauseCount(-timesPaused);
- if (timesPaused > 0) {
- _onPauseStateChange();
+ if (!_isFiring) {
+ if (timesPaused > 0) {
+ _onPauseStateChange();
+ }
+ _onSubscriptionStateChange();
}
- _onSubscriptionStateChange();
}
void _forEachSubscriber(
void action(_StreamListener<T> subscription)) {
+ assert(!_isPaused);
_StreamListener subscription = _subscriber;
assert(subscription != null);
_startFiring();
@@ -590,8 +600,6 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
}
}
_endFiring();
- if (_isPaused) _onPauseStateChange();
- if (!_hasSubscribers) _onSubscriptionStateChange();
}
void _addListener(_StreamListener listener) {
@@ -631,7 +639,7 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
} else {
bool wasPaused = _isPaused;
_removeListener(listener);
- if (wasPaused != _isPaused) _onPauseStateChange();
+ if (!identical(wasPaused, _isPaused)) _onPauseStateChange();
floitsch 2013/01/14 16:31:24 why?
Lasse Reichstein Nielsen 2013/01/15 08:52:54 Probably overoptimizing :) Reverted.
if (!_hasSubscribers) _onSubscriptionStateChange();
}
}
@@ -655,6 +663,15 @@ abstract class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
bool _isHandlingPendingEvents = false;
bool get _hasPendingEvent => !_isClosed;
+ void _schedulePendingEvents() {
+ if (_pendingEvents != null) {
+ _pendingEvents.schedule(this);
+ } else {
+ // In the case where there only pending events are generated ones.
floitsch 2013/01/14 16:31:24 Don't understand comment.
Lasse Reichstein Nielsen 2013/01/15 08:52:54 It's commenting that in this particular class, it'
floitsch 2013/01/15 14:40:19 Did you commit? The comment still looks the same.
+ new Timer(0, (_) { _handlePendingEvents(); });
+ }
+ }
+
/**
* Generate one (or possibly more) new events.
*
« no previous file with comments | « no previous file | sdk/lib/async/stream_pipe.dart » ('j') | sdk/lib/async/stream_pipe.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698