Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 11886013: Make Stream transformation respect the single/multi subscriber nature of the source. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added missing isSingleSubscription impl. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 7183dc11ff0ed7025a95765c016f05fae5bdf3d1..60b73d1b2a316a1c8cf847fdb3980dba872c11b3 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -421,9 +421,11 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> {
_StreamListener _subscriber = null;
Stream<T> asMultiSubscriberStream() {
- return new _ForwardingMultiStream<T, T>().._source = this;
+ return new _SingleStreamMultiplexer<T>(this);
}
+ bool get isSingleSubscription => true;
+
/** Whether one or more active subscribers have requested a pause. */
bool get _isPaused => !_hasSubscribers || super._isPaused;
@@ -541,6 +543,8 @@ class _MultiStreamImpl<T> extends _StreamImpl<T>
// ------------------------------------------------------------------
// Helper functions that can be overridden in subclasses.
+ bool get isSingleSubscription => false;
floitsch 2013/01/14 16:09:09 I don't think anybody wants to override this metho
Lasse Reichstein Nielsen 2013/01/15 07:19:51 Done.
+
/** Whether there are currently any subscribers on this [Stream]. */
bool get _hasSubscribers => !_InternalLinkList.isEmpty(this);
@@ -1093,3 +1097,40 @@ class _DoneSubscription<T> implements StreamSubscription<T> {
_pauseCount = 0;
}
}
+
+class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
+ final _SingleStreamImpl<T> _source;
+ StreamSubscription<T> _subscription;
+
+ _SingleStreamMultiplexer(this._source);
+
+ void _onPauseStateChange() {
+ if (_isPaused) {
+ if (_subscription != null) {
+ _subscription.pause();
+ }
+ } else {
+ if (_subscription != null) {
+ _subscription.resume();
+ }
+ }
+ }
+
+ /**
+ * Subscribe or unsubscribe on [_source] depending on whether
+ * [_stream] has subscribers.
+ */
+ void _onSubscriptionStateChange() {
+ if (_hasSubscribers) {
+ assert(_subscription == null);
+ _subscription = _source.listen(this._add,
+ onError: this._signalError,
+ onDone: this._close);
+ } else {
+ // TODO(lrn): Check why this can happen.
+ if (_subscription == null) return;
+ _subscription.cancel();
+ _subscription = null;
+ }
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698