Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 7183dc11ff0ed7025a95765c016f05fae5bdf3d1..60b73d1b2a316a1c8cf847fdb3980dba872c11b3 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -421,9 +421,11 @@ class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| _StreamListener _subscriber = null; |
| Stream<T> asMultiSubscriberStream() { |
| - return new _ForwardingMultiStream<T, T>().._source = this; |
| + return new _SingleStreamMultiplexer<T>(this); |
| } |
| + bool get isSingleSubscription => true; |
| + |
| /** Whether one or more active subscribers have requested a pause. */ |
| bool get _isPaused => !_hasSubscribers || super._isPaused; |
| @@ -541,6 +543,8 @@ class _MultiStreamImpl<T> extends _StreamImpl<T> |
| // ------------------------------------------------------------------ |
| // Helper functions that can be overridden in subclasses. |
| + bool get isSingleSubscription => false; |
|
floitsch
2013/01/14 16:09:09
I don't think anybody wants to override this metho
Lasse Reichstein Nielsen
2013/01/15 07:19:51
Done.
|
| + |
| /** Whether there are currently any subscribers on this [Stream]. */ |
| bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); |
| @@ -1093,3 +1097,40 @@ class _DoneSubscription<T> implements StreamSubscription<T> { |
| _pauseCount = 0; |
| } |
| } |
| + |
| +class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
| + final _SingleStreamImpl<T> _source; |
| + StreamSubscription<T> _subscription; |
| + |
| + _SingleStreamMultiplexer(this._source); |
| + |
| + void _onPauseStateChange() { |
| + if (_isPaused) { |
| + if (_subscription != null) { |
| + _subscription.pause(); |
| + } |
| + } else { |
| + if (_subscription != null) { |
| + _subscription.resume(); |
| + } |
| + } |
| + } |
| + |
| + /** |
| + * Subscribe or unsubscribe on [_source] depending on whether |
| + * [_stream] has subscribers. |
| + */ |
| + void _onSubscriptionStateChange() { |
| + if (_hasSubscribers) { |
| + assert(_subscription == null); |
| + _subscription = _source.listen(this._add, |
| + onError: this._signalError, |
| + onDone: this._close); |
| + } else { |
| + // TODO(lrn): Check why this can happen. |
| + if (_subscription == null) return; |
| + _subscription.cancel(); |
| + _subscription = null; |
| + } |
| + } |
| +} |