| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index 85aa0833b6f0233cf7399ffb256e21aee5261169..40cc3da63e5ec550249be0dcce588766a272a4a9 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -696,13 +696,47 @@ class _BroadcastLinkedList {
|
| }
|
| }
|
|
|
| +typedef void _broadcastCallback(StreamSubscription subscription);
|
| +
|
| +/**
|
| + * Dummy subscription that will never receive any events.
|
| + */
|
| +class _DummyStreamSubscription<T> implements StreamSubscription<T> {
|
| + int _pauseCounter = 0;
|
| +
|
| + void onData(void handleData(T data)) {}
|
| + void onError(void handleError(Object data)) {}
|
| + void onDone(void handleDone()) {}
|
| +
|
| + void pause([Future resumeSignal]) {
|
| + _pauseCounter++;
|
| + if (resumeSignal != null) resumeSignal.then((_) { resume(); });
|
| + }
|
| + void resume() {
|
| + if (_pauseCounter > 0) _pauseCounter--;
|
| + }
|
| + void cancel() {}
|
| + bool get isPaused => _pauseCounter > 0;
|
| +
|
| + Future asFuture([futureValue]) => new _FutureImpl();
|
| +}
|
| +
|
| class _AsBroadcastStream<T> extends Stream<T> {
|
| final Stream<T> _source;
|
| + final _broadcastCallback _onListenHandler;
|
| + final _broadcastCallback _onCancelHandler;
|
| + final _Zone _zone;
|
| +
|
| _AsBroadcastStreamController<T> _controller;
|
| StreamSubscription<T> _subscription;
|
|
|
| - _AsBroadcastStream(this._source) {
|
| - _controller = new _AsBroadcastStreamController<T>(null, _onCancel);
|
| + _AsBroadcastStream(this._source,
|
| + this._onListenHandler,
|
| + this._onCancelHandler)
|
| + : _zone = _Zone.current {
|
| + _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
|
| + // Keep zone alive until we are done doing callbacks.
|
| + _zone.expectCallback();
|
| }
|
|
|
| bool get isBroadcast => true;
|
| @@ -712,7 +746,9 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| void onDone(),
|
| bool cancelOnError}) {
|
| if (_controller == null) {
|
| - throw new StateError("Source stream has been closed.");
|
| + // Return a dummy subscription backed by nothing, since
|
| + // it won't ever receive any events.
|
| + return new _DummyStreamSubscription<T>();
|
| }
|
| if (_subscription == null) {
|
| _subscription = _source.listen(_controller.add,
|
| @@ -724,14 +760,102 @@ class _AsBroadcastStream<T> extends Stream<T> {
|
| }
|
|
|
| void _onCancel() {
|
| + bool shutdown = (_controller == null) || _controller.isClosed;
|
| + if (_onCancelHandler != null) {
|
| + _zone.executePeriodicCallbackGuarded(
|
| + () => _onCancelHandler(new _BroadcastSubscriptionWrapper(this)));
|
| + }
|
| + if (shutdown) {
|
| + if (_subscription != null) {
|
| + _subscription.cancel();
|
| + _subscription = null;
|
| + }
|
| + _zone.cancelCallbackExpectation();
|
| + }
|
| + }
|
| +
|
| + void _onListen() {
|
| + if (_onListenHandler != null) {
|
| + _zone.executePeriodicCallbackGuarded(
|
| + () => _onListenHandler(new _BroadcastSubscriptionWrapper(this)));
|
| + }
|
| + }
|
| +
|
| + // Methods called from _BroadcastSubscriptionWrapper.
|
| + void _cancelSubscription() {
|
| + if (_subscription == null) return;
|
| // Called by [_controller] when it has no subscribers left.
|
| StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| + if (_controller._isEmpty) {
|
| + _zone.cancelCallbackExpectation();
|
| + }
|
| _controller = null; // Marks the stream as no longer listenable.
|
| subscription.cancel();
|
| }
|
| +
|
| + void _pauseSubscription(Future resumeSignal) {
|
| + if (_subscription == null) return;
|
| + _subscription.pause(resumeSignal);
|
| + }
|
| +
|
| + void _resumeSubscription() {
|
| + if (_subscription == null) return;
|
| + _subscription.resume();
|
| + }
|
| +
|
| + bool get _isSubscriptionPaused {
|
| + if (_subscription == null) return false;
|
| + return _subscription.isPaused;
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Wrapper for subscription that disallows changing handlers.
|
| + */
|
| +class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
| + final _AsBroadcastStream _stream;
|
| +
|
| + _BroadcastSubscriptionWrapper(this._stream);
|
| +
|
| + void onData(void handleData(T data)) {
|
| + throw new UnsupportedError(
|
| + "Cannot change handlers of asBroadcastStream source subscription.");
|
| + }
|
| +
|
| + void onError(void handleError(Object data)) {
|
| + throw new UnsupportedError(
|
| + "Cannot change handlers of asBroadcastStream source subscription.");
|
| + }
|
| +
|
| + void onDone(void handleDone()) {
|
| + throw new UnsupportedError(
|
| + "Cannot change handlers of asBroadcastStream source subscription.");
|
| + }
|
| +
|
| + void pause([Future resumeSignal]) {
|
| + _stream._pauseSubscription(resumeSignal);
|
| + }
|
| +
|
| + void resume() {
|
| + _stream._resumeSubscription();
|
| + }
|
| +
|
| + void cancel() {
|
| + _stream._cancelSubscription();
|
| + }
|
| +
|
| + bool get isPaused {
|
| + return _stream._isSubscriptionPaused;
|
| + }
|
| +
|
| + Future asFuture([var futureValue]) {
|
| + throw new UnsupportedError(
|
| + "Cannot change handlers of asBroadcastStream source subscription.");
|
| + }
|
| }
|
|
|
| +
|
| /**
|
| * Simple implementation of [StreamIterator].
|
| */
|
|
|