Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 7183dc11ff0ed7025a95765c016f05fae5bdf3d1..60b73d1b2a316a1c8cf847fdb3980dba872c11b3 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -421,9 +421,11 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
_StreamListener _subscriber = null; |
Stream<T> asMultiSubscriberStream() { |
- return new _ForwardingMultiStream<T, T>().._source = this; |
+ return new _SingleStreamMultiplexer<T>(this); |
} |
+ bool get isSingleSubscription => true; |
+ |
/** Whether one or more active subscribers have requested a pause. */ |
bool get _isPaused => !_hasSubscribers || super._isPaused; |
@@ -541,6 +543,8 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
// ------------------------------------------------------------------ |
// Helper functions that can be overridden in subclasses. |
+ bool get isSingleSubscription => false; |
floitsch
2013/01/14 16:09:09
I don't think anybody wants to override this metho
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
|
+ |
/** Whether there are currently any subscribers on this [Stream]. */ |
bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); |
@@ -1093,3 +1097,40 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
_pauseCount = 0; |
} |
} |
+ |
+class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
+ final _SingleStreamImpl<T> _source; |
+ StreamSubscription<T> _subscription; |
+ |
+ _SingleStreamMultiplexer(this._source); |
+ |
+ void _onPauseStateChange() { |
+ if (_isPaused) { |
+ if (_subscription != null) { |
+ _subscription.pause(); |
+ } |
+ } else { |
+ if (_subscription != null) { |
+ _subscription.resume(); |
+ } |
+ } |
+ } |
+ |
+ /** |
+ * Subscribe or unsubscribe on [_source] depending on whether |
+ * [_stream] has subscribers. |
+ */ |
+ void _onSubscriptionStateChange() { |
+ if (_hasSubscribers) { |
+ assert(_subscription == null); |
+ _subscription = _source.listen(this._add, |
+ onError: this._signalError, |
+ onDone: this._close); |
+ } else { |
+ // TODO(lrn): Check why this can happen. |
+ if (_subscription == null) return; |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
+ } |
+} |