Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 85aa0833b6f0233cf7399ffb256e21aee5261169..40cc3da63e5ec550249be0dcce588766a272a4a9 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -696,13 +696,47 @@ class _BroadcastLinkedList { |
} |
} |
+typedef void _broadcastCallback(StreamSubscription subscription); |
+ |
+/** |
+ * Dummy subscription that will never receive any events. |
+ */ |
+class _DummyStreamSubscription<T> implements StreamSubscription<T> { |
+ int _pauseCounter = 0; |
+ |
+ void onData(void handleData(T data)) {} |
+ void onError(void handleError(Object data)) {} |
+ void onDone(void handleDone()) {} |
+ |
+ void pause([Future resumeSignal]) { |
+ _pauseCounter++; |
+ if (resumeSignal != null) resumeSignal.then((_) { resume(); }); |
+ } |
+ void resume() { |
+ if (_pauseCounter > 0) _pauseCounter--; |
+ } |
+ void cancel() {} |
+ bool get isPaused => _pauseCounter > 0; |
+ |
+ Future asFuture([futureValue]) => new _FutureImpl(); |
+} |
+ |
class _AsBroadcastStream<T> extends Stream<T> { |
final Stream<T> _source; |
+ final _broadcastCallback _onListenHandler; |
+ final _broadcastCallback _onCancelHandler; |
+ final _Zone _zone; |
+ |
_AsBroadcastStreamController<T> _controller; |
StreamSubscription<T> _subscription; |
- _AsBroadcastStream(this._source) { |
- _controller = new _AsBroadcastStreamController<T>(null, _onCancel); |
+ _AsBroadcastStream(this._source, |
+ this._onListenHandler, |
+ this._onCancelHandler) |
+ : _zone = _Zone.current { |
+ _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); |
+ // Keep zone alive until we are done doing callbacks. |
+ _zone.expectCallback(); |
} |
bool get isBroadcast => true; |
@@ -712,7 +746,9 @@ class _AsBroadcastStream<T> extends Stream<T> { |
void onDone(), |
bool cancelOnError}) { |
if (_controller == null) { |
- throw new StateError("Source stream has been closed."); |
+ // Return a dummy subscription backed by nothing, since |
+ // it won't ever receive any events. |
+ return new _DummyStreamSubscription<T>(); |
} |
if (_subscription == null) { |
_subscription = _source.listen(_controller.add, |
@@ -724,14 +760,102 @@ class _AsBroadcastStream<T> extends Stream<T> { |
} |
void _onCancel() { |
+ bool shutdown = (_controller == null) || _controller.isClosed; |
+ if (_onCancelHandler != null) { |
+ _zone.executePeriodicCallbackGuarded( |
+ () => _onCancelHandler(new _BroadcastSubscriptionWrapper(this))); |
+ } |
+ if (shutdown) { |
+ if (_subscription != null) { |
+ _subscription.cancel(); |
+ _subscription = null; |
+ } |
+ _zone.cancelCallbackExpectation(); |
+ } |
+ } |
+ |
+ void _onListen() { |
+ if (_onListenHandler != null) { |
+ _zone.executePeriodicCallbackGuarded( |
+ () => _onListenHandler(new _BroadcastSubscriptionWrapper(this))); |
+ } |
+ } |
+ |
+ // Methods called from _BroadcastSubscriptionWrapper. |
+ void _cancelSubscription() { |
+ if (_subscription == null) return; |
// Called by [_controller] when it has no subscribers left. |
StreamSubscription subscription = _subscription; |
_subscription = null; |
+ if (_controller._isEmpty) { |
+ _zone.cancelCallbackExpectation(); |
+ } |
_controller = null; // Marks the stream as no longer listenable. |
subscription.cancel(); |
} |
+ |
+ void _pauseSubscription(Future resumeSignal) { |
+ if (_subscription == null) return; |
+ _subscription.pause(resumeSignal); |
+ } |
+ |
+ void _resumeSubscription() { |
+ if (_subscription == null) return; |
+ _subscription.resume(); |
+ } |
+ |
+ bool get _isSubscriptionPaused { |
+ if (_subscription == null) return false; |
+ return _subscription.isPaused; |
+ } |
+} |
+ |
+/** |
+ * Wrapper for subscription that disallows changing handlers. |
+ */ |
+class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
+ final _AsBroadcastStream _stream; |
+ |
+ _BroadcastSubscriptionWrapper(this._stream); |
+ |
+ void onData(void handleData(T data)) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
+ |
+ void onError(void handleError(Object data)) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
+ |
+ void pause([Future resumeSignal]) { |
+ _stream._pauseSubscription(resumeSignal); |
+ } |
+ |
+ void resume() { |
+ _stream._resumeSubscription(); |
+ } |
+ |
+ void cancel() { |
+ _stream._cancelSubscription(); |
+ } |
+ |
+ bool get isPaused { |
+ return _stream._isSubscriptionPaused; |
+ } |
+ |
+ Future asFuture([var futureValue]) { |
+ throw new UnsupportedError( |
+ "Cannot change handlers of asBroadcastStream source subscription."); |
+ } |
} |
+ |
/** |
* Simple implementation of [StreamIterator]. |
*/ |