| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index 2cdbeaf240c3553337eae7f2a74a83f3f27b1f66..ea3e81cf846047b954f55b10349ed7b92de8f18a 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -46,7 +46,7 @@ part of dart.async;
|
| * the stream at all, and won't trigger callbacks. From the controller's point
|
| * of view, the stream is completely inert when has completed.
|
| */
|
| -abstract class StreamController<T> implements EventSink<T> {
|
| +abstract class StreamController<T> implements StreamSink<T> {
|
| /** The stream that this controller is controlling. */
|
| Stream<T> get stream;
|
|
|
| @@ -75,10 +75,17 @@ abstract class StreamController<T> implements EventSink<T> {
|
| void onPause(),
|
| void onResume(),
|
| void onCancel(),
|
| - bool sync: false})
|
| - => sync
|
| + bool sync: false}) {
|
| + if (onListen == null && onPause == null &&
|
| + onResume == null && onCancel == null) {
|
| + return sync
|
| + ? new _NoCallbackSyncStreamController<T>()
|
| + : new _NoCallbackAsyncStreamController<T>();
|
| + }
|
| + return sync
|
| ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
|
| : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
|
| + }
|
|
|
| /**
|
| * A controller where [stream] can be listened to more than once.
|
| @@ -123,9 +130,9 @@ abstract class StreamController<T> implements EventSink<T> {
|
| }
|
|
|
| /**
|
| - * Returns a view of this object that only exposes the [EventSink] interface.
|
| + * Returns a view of this object that only exposes the [StreamSink] interface.
|
| */
|
| - EventSink<T> get sink;
|
| + StreamSink<T> get sink;
|
|
|
| /**
|
| * Whether the stream is closed for adding more events.
|
| @@ -162,7 +169,10 @@ abstract class StreamController<T> implements EventSink<T> {
|
|
|
|
|
| abstract class _StreamControllerLifecycle<T> {
|
| - void _recordListen(StreamSubscription<T> subscription) {}
|
| + StreamSubscription<T> _subscribe(void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError);
|
| void _recordPause(StreamSubscription<T> subscription) {}
|
| void _recordResume(StreamSubscription<T> subscription) {}
|
| void _recordCancel(StreamSubscription<T> subscription) {}
|
| @@ -175,81 +185,205 @@ abstract class _StreamControllerLifecycle<T> {
|
| */
|
| abstract class _StreamController<T> implements StreamController<T>,
|
| _StreamControllerLifecycle<T>,
|
| + _EventSink<T>,
|
| _EventDispatch<T> {
|
| - static const int _STATE_OPEN = 0;
|
| - static const int _STATE_CANCELLED = 1;
|
| - static const int _STATE_CLOSED = 2;
|
| + // The states are bit-flags. More than one can be set at a time.
|
| + //
|
| + // The "subscription state" goes through the states:
|
| + // initial -> subscribed -> canceled.
|
| + // These are mutually exclusive.
|
| + // The "closed" state records whether the [close] method has been called
|
| + // on the controller. This can be done at any time. If done before
|
| + // subscription, the done event is queued. If done after cancel, the done
|
| + // event is ignored (just as any other event after a cancel).
|
| +
|
| + /** The controller is in its initial state with no subscription. */
|
| + static const int _STATE_INITIAL = 0;
|
| + /** The controller has a subscription, but hasn't been closed or canceled. */
|
| + static const int _STATE_SUBSCRIBED = 1;
|
| + /** The subscription is canceled. */
|
| + static const int _STATE_CANCELED = 2;
|
| + /** Mask for the subscription state. */
|
| + static const int _STATE_SUBSCRIPTION_MASK = 3;
|
| +
|
| + // The following state relate to the controller, not the subscription.
|
| + // If closed, adding more events is not allowed.
|
| + // If executing an [addStream], new events are not allowed either, but will
|
| + // be added by the stream.
|
|
|
| - final _NotificationHandler _onListen;
|
| - final _NotificationHandler _onPause;
|
| - final _NotificationHandler _onResume;
|
| - final _NotificationHandler _onCancel;
|
| - _StreamImpl<T> _stream;
|
| + /**
|
| + * The controller is closed due to calling [close].
|
| + *
|
| + * When the stream is closed, you can neither add new events nor add new
|
| + * listeners.
|
| + */
|
| + static const int _STATE_CLOSED = 4;
|
| + /**
|
| + * The controller is in the middle of an [addStream] operation.
|
| + *
|
| + * While adding events from a stream, no new events can be added directly
|
| + * on the controller.
|
| + */
|
| + static const int _STATE_ADDSTREAM = 8;
|
| +
|
| + /**
|
| + * Field containing different data depending on the current subscription
|
| + * state.
|
| + *
|
| + * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
|
| + * for events added to the controller before a subscription.
|
| + *
|
| + * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
|
| + *
|
| + * When [_state] is [_STATE_CANCELED] the field is currently not used.
|
| + */
|
| + var _varData;
|
|
|
| - // An active subscription on the stream, or null if no subscripton is active.
|
| - _ControllerSubscription<T> _subscription;
|
| + /** Current state of the controller. */
|
| + int _state = _STATE_INITIAL;
|
|
|
| - // Whether we have sent a "done" event.
|
| - int _state = _STATE_OPEN;
|
| + /**
|
| + * Future completed when the stream sends its last event.
|
| + *
|
| + * This is also the future returned by [close].
|
| + */
|
| + // TODO(lrn): Could this be stored in the varData field too, if it's not
|
| + // accessed until the call to "close"? Then we need to special case if it's
|
| + // accessed earlier, or if close is called before subscribing.
|
| + _FutureImpl _doneFuture;
|
|
|
| - // Events added to the stream before it has an active subscription.
|
| - _PendingEvents _pendingEvents = null;
|
| + _StreamController();
|
|
|
| - _StreamController(this._onListen,
|
| - this._onPause,
|
| - this._onResume,
|
| - this._onCancel) {
|
| - _stream = new _ControllerStream<T>(this);
|
| - }
|
| + _NotificationHandler get _onListen;
|
| + _NotificationHandler get _onPause;
|
| + _NotificationHandler get _onResume;
|
| + _NotificationHandler get _onCancel;
|
|
|
| - Stream<T> get stream => _stream;
|
| + // Return a new stream every time. The streams are equal, but not identical.
|
| + Stream<T> get stream => new _ControllerStream(this);
|
|
|
| /**
|
| - * Returns a view of this object that only exposes the [EventSink] interface.
|
| + * Returns a view of this object that only exposes the [StreamSink] interface.
|
| */
|
| - EventSink<T> get sink => new _EventSinkView<T>(this);
|
| + StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
|
|
|
| /**
|
| - * Whether a listener has existed and been cancelled.
|
| + * Whether a listener has existed and been canceled.
|
| *
|
| * After this, adding more events will be ignored.
|
| */
|
| - bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
|
| + bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
|
| +
|
| + /** Whether there is an active listener. */
|
| + bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
|
| +
|
| + /** Whether there has not been a listener yet. */
|
| + bool get _isInitialState =>
|
| + (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
|
|
|
| bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
|
|
| bool get isPaused => hasListener ? _subscription._isInputPaused
|
| - : !_isCancelled;
|
| + : !_isCanceled;
|
| +
|
| + bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
|
| +
|
| + /** New events may not be added after close, or during addStream. */
|
| + bool get _mayAddEvent => (_state < _STATE_CLOSED);
|
| +
|
| + // Returns the pending events.
|
| + // Pending events are events added before a subscription exists.
|
| + // They are added to the subscription when it is created.
|
| + // Pending events, if any, are kept in the _varData field until the
|
| + // stream is listened to.
|
| + // While adding a stream, pending events are moved into the
|
| + // state object to allow the state object to use the _varData field.
|
| + _PendingEvents get _pendingEvents {
|
| + assert(_isInitialState);
|
| + if (!_isAddingStream) {
|
| + return _varData;
|
| + }
|
| + _StreamControllerAddStreamState state = _varData;
|
| + return state.varData;
|
| + }
|
|
|
| - bool get hasListener => _subscription != null;
|
| + // Returns the pending events, and creates the object if necessary.
|
| + _StreamImplEvents _ensurePendingEvents() {
|
| + assert(_isInitialState);
|
| + if (!_isAddingStream) {
|
| + if (_varData == null) _varData = new _StreamImplEvents();
|
| + return _varData;
|
| + }
|
| + _StreamControllerAddStreamState state = _varData;
|
| + if (state.varData == null) state.varData = new _StreamImplEvents();
|
| + return state.varData;
|
| + }
|
| +
|
| + // Get the current subscription.
|
| + // If we are adding a stream, the subscription is moved into the state
|
| + // object to allow the state object to use the _varData field.
|
| + _ControllerSubscription get _subscription {
|
| + assert(hasListener);
|
| + if (_isAddingStream) {
|
| + _StreamControllerAddStreamState addState = _varData;
|
| + return addState.varData;
|
| + }
|
| + return _varData;
|
| + }
|
|
|
| /**
|
| - * Send or queue a data event.
|
| + * Creates an error describing why an event cannot be added.
|
| + *
|
| + * The reason, and therefore the error message, depends on the current state.
|
| */
|
| - void add(T value) {
|
| - if (isClosed) throw new StateError("Adding event after close");
|
| - if (_subscription != null) {
|
| - _sendData(value);
|
| - } else if (!_isCancelled) {
|
| - _addPendingEvent(new _DelayedData<T>(value));
|
| + Error _badEventState() {
|
| + if (isClosed) {
|
| + return new StateError("Cannot add event after closing");
|
| + }
|
| + assert(_isAddingStream);
|
| + return new StateError("Cannot add event while adding a stream");
|
| + }
|
| +
|
| + // StreamSink interface.
|
| + Future addStream(Stream<T> source) {
|
| + if (!_mayAddEvent) throw _badEventState();
|
| + if (_isCanceled) return new _FutureImpl.immediate(null);
|
| + _StreamControllerAddStreamState addState =
|
| + new _StreamControllerAddStreamState(this, _varData, source);
|
| + _varData = addState;
|
| + _state |= _STATE_ADDSTREAM;
|
| + return addState.addStreamFuture;
|
| + }
|
| +
|
| + Future get done => _ensureDoneFuture();
|
| +
|
| + Future _ensureDoneFuture() {
|
| + if (_doneFuture == null) {
|
| + _doneFuture = new _FutureImpl();
|
| + if (_isCanceled) _doneFuture._setValue(null);
|
| }
|
| + return _doneFuture;
|
| + }
|
| +
|
| + /**
|
| + * Send or enqueue a data event.
|
| + */
|
| + void add(T value) {
|
| + if (!_mayAddEvent) throw _badEventState();
|
| + _add(value);
|
| }
|
|
|
| /**
|
| * Send or enqueue an error event.
|
| */
|
| void addError(Object error, [Object stackTrace]) {
|
| - if (isClosed) throw new StateError("Adding event after close");
|
| + if (!_mayAddEvent) throw _badEventState();
|
| if (stackTrace != null) {
|
| // Force stack trace overwrite. Even if the error already contained
|
| // a stack trace.
|
| _attachStackTrace(error, stackTrace);
|
| }
|
| - if (_subscription != null) {
|
| - _sendError(error);
|
| - } else if (!_isCancelled) {
|
| - _addPendingEvent(new _DelayedError(error));
|
| - }
|
| + _addError(error);
|
| }
|
|
|
| /**
|
| @@ -263,60 +397,111 @@ abstract class _StreamController<T> implements StreamController<T>,
|
| * The first time a controller is closed, a "done" event is sent to its
|
| * stream.
|
| */
|
| - void close() {
|
| - if (isClosed) return;
|
| + Future close() {
|
| + if (isClosed) {
|
| + assert(_doneFuture != null); // Was set when close was first called.
|
| + return _doneFuture;
|
| + }
|
| + if (!_mayAddEvent) throw _badEventState();
|
| _state |= _STATE_CLOSED;
|
| - if (_subscription != null) {
|
| + _ensureDoneFuture();
|
| + if (hasListener) {
|
| _sendDone();
|
| - } else if (!_isCancelled) {
|
| - _addPendingEvent(const _DelayedDone());
|
| + } else if (_isInitialState) {
|
| + _ensurePendingEvents().add(const _DelayedDone());
|
| }
|
| + return _doneFuture;
|
| }
|
|
|
| - // EventDispatch interface
|
| + // EventSink interface. Used by the [addStream] events.
|
|
|
| - void _addPendingEvent(_DelayedEvent event) {
|
| - if (_isCancelled) return;
|
| - _StreamImplEvents events = _pendingEvents;
|
| - if (events == null) {
|
| - events = _pendingEvents = new _StreamImplEvents();
|
| + // Add data event, used both by the [addStream] events and by [add].
|
| + void _add(T value) {
|
| + if (hasListener) {
|
| + _sendData(value);
|
| + } else if (_isInitialState) {
|
| + _ensurePendingEvents().add(new _DelayedData<T>(value));
|
| }
|
| - events.add(event);
|
| }
|
|
|
| - void _recordListen(_BufferingStreamSubscription<T> subscription) {
|
| - assert(_subscription == null);
|
| - _subscription = subscription;
|
| - subscription._setPendingEvents(_pendingEvents);
|
| - _pendingEvents = null;
|
| + void _addError(Object error) {
|
| + if (hasListener) {
|
| + _sendError(error);
|
| + } else if (_isInitialState) {
|
| + _ensurePendingEvents().add(new _DelayedError(error));
|
| + }
|
| + }
|
| +
|
| + void _close() {
|
| + // End of addStream stream.
|
| + assert(_isAddingStream);
|
| + _StreamControllerAddStreamState addState = _varData;
|
| + _varData = addState.varData;
|
| + _state &= ~_STATE_ADDSTREAM;
|
| + addState.complete();
|
| + }
|
| +
|
| + // _StreamControllerLifeCycle interface
|
| +
|
| + StreamSubscription<T> _subscribe(void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + if (!_isInitialState) {
|
| + throw new StateError("Stream has already been listened to.");
|
| + }
|
| + _ControllerSubscription subscription = new _ControllerSubscription(
|
| + this, onData, onError, onDone, cancelOnError);
|
| +
|
| + _PendingEvents pendingEvents = _pendingEvents;
|
| + _state |= _STATE_SUBSCRIBED;
|
| + if (_isAddingStream) {
|
| + _StreamControllerAddStreamState addState = _varData;
|
| + addState.varData = subscription;
|
| + } else {
|
| + _varData = subscription;
|
| + }
|
| + subscription._setPendingEvents(pendingEvents);
|
| subscription._guardCallback(() {
|
| _runGuarded(_onListen);
|
| });
|
| +
|
| + return subscription;
|
| }
|
|
|
| void _recordCancel(StreamSubscription<T> subscription) {
|
| - assert(identical(_subscription, subscription));
|
| - _subscription = null;
|
| - _state |= _STATE_CANCELLED;
|
| + if (_isAddingStream) {
|
| + _StreamControllerAddStreamState addState = _varData;
|
| + addState.cancel();
|
| + }
|
| + _varData = null;
|
| + _state =
|
| + (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
|
| _runGuarded(_onCancel);
|
| + if (_doneFuture != null && _doneFuture._mayComplete) {
|
| + _doneFuture._asyncSetValue(null);
|
| + }
|
| }
|
|
|
| void _recordPause(StreamSubscription<T> subscription) {
|
| + if (_isAddingStream) {
|
| + _StreamControllerAddStreamState addState = _varData;
|
| + addState.pause();
|
| + }
|
| _runGuarded(_onPause);
|
| }
|
|
|
| void _recordResume(StreamSubscription<T> subscription) {
|
| + if (_isAddingStream) {
|
| + _StreamControllerAddStreamState addState = _varData;
|
| + addState.resume();
|
| + }
|
| _runGuarded(_onResume);
|
| }
|
| }
|
|
|
| -class _SyncStreamController<T> extends _StreamController<T> {
|
| - _SyncStreamController(void onListen(),
|
| - void onPause(),
|
| - void onResume(),
|
| - void onCancel())
|
| - : super(onListen, onPause, onResume, onCancel);
|
| -
|
| +abstract class _SyncStreamControllerDispatch<T>
|
| + implements _StreamController<T> {
|
| void _sendData(T data) {
|
| _subscription._add(data);
|
| }
|
| @@ -330,13 +515,8 @@ class _SyncStreamController<T> extends _StreamController<T> {
|
| }
|
| }
|
|
|
| -class _AsyncStreamController<T> extends _StreamController<T> {
|
| - _AsyncStreamController(void onListen(),
|
| - void onPause(),
|
| - void onResume(),
|
| - void onCancel())
|
| - : super(onListen, onPause, onResume, onCancel);
|
| -
|
| +abstract class _AsyncStreamControllerDispatch<T>
|
| + implements _StreamController<T> {
|
| void _sendData(T data) {
|
| _subscription._addPending(new _DelayedData(data));
|
| }
|
| @@ -350,6 +530,48 @@ class _AsyncStreamController<T> extends _StreamController<T> {
|
| }
|
| }
|
|
|
| +// TODO(lrn): Use common superclass for callback-controllers when VM supports
|
| +// constructors in mixin superclasses.
|
| +
|
| +class _AsyncStreamController<T> extends _StreamController<T>
|
| + with _AsyncStreamControllerDispatch<T> {
|
| + final _NotificationHandler _onListen;
|
| + final _NotificationHandler _onPause;
|
| + final _NotificationHandler _onResume;
|
| + final _NotificationHandler _onCancel;
|
| +
|
| + _AsyncStreamController(void this._onListen(),
|
| + void this._onPause(),
|
| + void this._onResume(),
|
| + void this._onCancel());
|
| +}
|
| +
|
| +class _SyncStreamController<T> extends _StreamController<T>
|
| + with _SyncStreamControllerDispatch<T> {
|
| + final _NotificationHandler _onListen;
|
| + final _NotificationHandler _onPause;
|
| + final _NotificationHandler _onResume;
|
| + final _NotificationHandler _onCancel;
|
| +
|
| + _SyncStreamController(void this._onListen(),
|
| + void this._onPause(),
|
| + void this._onResume(),
|
| + void this._onCancel());
|
| +}
|
| +
|
| +abstract class _NoCallbacks {
|
| + _NotificationHandler get _onListen => null;
|
| + _NotificationHandler get _onPause => null;
|
| + _NotificationHandler get _onResume => null;
|
| + _NotificationHandler get _onCancel => null;
|
| +}
|
| +
|
| +typedef _NoCallbackAsyncStreamController<T> = _StreamController<T>
|
| + with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
|
| +
|
| +typedef _NoCallbackSyncStreamController<T> = _StreamController<T>
|
| + with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
|
| +
|
| typedef void _NotificationHandler();
|
|
|
| void _runGuarded(_NotificationHandler notificationHandler) {
|
| @@ -363,7 +585,6 @@ void _runGuarded(_NotificationHandler notificationHandler) {
|
|
|
| class _ControllerStream<T> extends _StreamImpl<T> {
|
| _StreamControllerLifecycle<T> _controller;
|
| - bool _hasListener = false;
|
|
|
| _ControllerStream(this._controller);
|
|
|
| @@ -371,17 +592,19 @@ class _ControllerStream<T> extends _StreamImpl<T> {
|
| void onData(T data),
|
| void onError(Object error),
|
| void onDone(),
|
| - bool cancelOnError) {
|
| - if (_hasListener) {
|
| - throw new StateError("The stream has already been listened to.");
|
| - }
|
| - _hasListener = true;
|
| - return new _ControllerSubscription<T>(
|
| - _controller, onData, onError, onDone, cancelOnError);
|
| - }
|
| + bool cancelOnError) =>
|
| + _controller._subscribe(onData, onError, onDone, cancelOnError);
|
|
|
| - void _onListen(_BufferingStreamSubscription subscription) {
|
| - _controller._recordListen(subscription);
|
| + // Override == and hashCode so that new streams returned by the same
|
| + // controller are considered equal. The controller returns a new stream
|
| + // each time it's queried, but doesn't have to cache the result.
|
| +
|
| + int get hashCode => _controller.hashCode ^ 0x35323532;
|
| +
|
| + bool operator==(Object other) {
|
| + if (other is! _ControllerStream) return false;
|
| + _ControllerStream otherStream = other;
|
| + return identical(otherStream._controller, this);
|
| }
|
| }
|
|
|
| @@ -408,367 +631,64 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| }
|
| }
|
|
|
| -class _BroadcastStream<T> extends _StreamImpl<T> {
|
| - _BroadcastStreamController _controller;
|
| -
|
| - _BroadcastStream(this._controller);
|
| -
|
| - bool get isBroadcast => true;
|
| -
|
| - StreamSubscription<T> _createSubscription(
|
| - void onData(T data),
|
| - void onError(Object error),
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _BroadcastSubscription<T>(
|
| - _controller, onData, onError, onDone, cancelOnError);
|
| - }
|
| -
|
| - void _onListen(_BufferingStreamSubscription subscription) {
|
| - _controller._recordListen(subscription);
|
| - }
|
| -}
|
| -
|
| -abstract class _BroadcastSubscriptionLink {
|
| - _BroadcastSubscriptionLink _next;
|
| - _BroadcastSubscriptionLink _previous;
|
| -}
|
| -
|
| -class _BroadcastSubscription<T> extends _ControllerSubscription<T>
|
| - implements _BroadcastSubscriptionLink {
|
| - static const int _STATE_EVENT_ID = 1;
|
| - static const int _STATE_FIRING = 2;
|
| - static const int _STATE_REMOVE_AFTER_FIRING = 4;
|
| - int _eventState;
|
| -
|
| - _BroadcastSubscriptionLink _next;
|
| - _BroadcastSubscriptionLink _previous;
|
| -
|
| - _BroadcastSubscription(_StreamControllerLifecycle controller,
|
| - void onData(T data),
|
| - void onError(Object error),
|
| - void onDone(),
|
| - bool cancelOnError)
|
| - : super(controller, onData, onError, onDone, cancelOnError) {
|
| - _next = _previous = this;
|
| - }
|
| -
|
| - _BroadcastStreamController get _controller => super._controller;
|
| -
|
| - bool _expectsEvent(int eventId) {
|
| - return (_eventState & _STATE_EVENT_ID) == eventId;
|
| - }
|
| -
|
| - void _toggleEventId() {
|
| - _eventState ^= _STATE_EVENT_ID;
|
| - }
|
| -
|
| - bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
|
| -
|
| - bool _setRemoveAfterFiring() {
|
| - assert(_isFiring);
|
| - _eventState |= _STATE_REMOVE_AFTER_FIRING;
|
| - }
|
| -
|
| - bool get _removeAfterFiring =>
|
| - (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
|
| -}
|
| -
|
| -
|
| -abstract class _BroadcastStreamController<T>
|
| - implements StreamController<T>,
|
| - _StreamControllerLifecycle<T>,
|
| - _BroadcastSubscriptionLink,
|
| - _EventDispatch<T> {
|
| - static const int _STATE_INITIAL = 0;
|
| - static const int _STATE_EVENT_ID = 1;
|
| - static const int _STATE_FIRING = 2;
|
| - static const int _STATE_CLOSED = 4;
|
| -
|
| - final _NotificationHandler _onListen;
|
| - final _NotificationHandler _onCancel;
|
| -
|
| - // State of the controller.
|
| - int _state;
|
| -
|
| - // Double-linked list of active listeners.
|
| - _BroadcastSubscriptionLink _next;
|
| - _BroadcastSubscriptionLink _previous;
|
| -
|
| - _BroadcastStreamController(this._onListen, this._onCancel)
|
| - : _state = _STATE_INITIAL {
|
| - _next = _previous = this;
|
| - }
|
| -
|
| - // StreamController interface.
|
| -
|
| - Stream<T> get stream => new _BroadcastStream<T>(this);
|
| -
|
| - EventSink<T> get sink => new _EventSinkView<T>(this);
|
| -
|
| - bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
| -
|
| - /**
|
| - * A broadcast controller is never paused.
|
| - *
|
| - * Each receiving stream may be paused individually, and they handle their
|
| - * own buffering.
|
| - */
|
| - bool get isPaused => false;
|
| -
|
| - /** Whether there are currently a subscriber on the [Stream]. */
|
| - bool get hasListener => !_isEmpty;
|
| -
|
| - /** Whether an event is being fired (sent to some, but not all, listeners). */
|
| - bool get _isFiring => (_state & _STATE_FIRING) != 0;
|
| -
|
| - // Linked list helpers
|
| -
|
| - bool get _isEmpty => identical(_next, this);
|
| -
|
| - /** Adds subscription to linked list of active listeners. */
|
| - void _addListener(_BroadcastSubscription<T> subscription) {
|
| - _BroadcastSubscriptionLink previous = _previous;
|
| - previous._next = subscription;
|
| - _previous = subscription._previous;
|
| - subscription._previous._next = this;
|
| - subscription._previous = previous;
|
| - subscription._eventState = (_state & _STATE_EVENT_ID);
|
| - }
|
| -
|
| - void _removeListener(_BroadcastSubscription<T> subscription) {
|
| - assert(identical(subscription._controller, this));
|
| - assert(!identical(subscription._next, subscription));
|
| - subscription._previous._next = subscription._next;
|
| - subscription._next._previous = subscription._previous;
|
| - subscription._next = subscription._previous = subscription;
|
| - }
|
| -
|
| - // _StreamControllerLifecycle interface.
|
| -
|
| - void _recordListen(_BroadcastSubscription<T> subscription) {
|
| - _addListener(subscription);
|
| - if (identical(_next, _previous)) {
|
| - // Only one listener, so it must be the first listener.
|
| - _runGuarded(_onListen);
|
| - }
|
| - }
|
| -
|
| - void _recordCancel(_BroadcastSubscription<T> subscription) {
|
| - if (subscription._isFiring) {
|
| - subscription._setRemoveAfterFiring();
|
| - } else {
|
| - _removeListener(subscription);
|
| - // If we are currently firing an event, the empty-check is performed at
|
| - // the end of the listener loop instead of here.
|
| - if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
|
| - _callOnCancel();
|
| - }
|
| - }
|
| - }
|
| -
|
| - void _recordPause(StreamSubscription<T> subscription) {}
|
| - void _recordResume(StreamSubscription<T> subscription) {}
|
| -
|
| - // EventSink interface.
|
|
|
| - void add(T data) {
|
| - if (isClosed) {
|
| - throw new StateError("Cannot add new events after calling close()");
|
| - }
|
| - _sendData(data);
|
| - }
|
| -
|
| - void addError(Object error, [Object stackTrace]) {
|
| - if (isClosed) {
|
| - throw new StateError("Cannot add new events after calling close()");
|
| - }
|
| - if (stackTrace != null) _attachStackTrace(error, stackTrace);
|
| - _sendError(error);
|
| - }
|
| -
|
| - void close() {
|
| - if (isClosed) {
|
| - throw new StateError("Cannot add new events after calling close()");
|
| - }
|
| - _state |= _STATE_CLOSED;
|
| - _sendDone();
|
| - }
|
| -
|
| - void _forEachListener(
|
| - void action(_BufferingStreamSubscription<T> subscription)) {
|
| - if (_isFiring) {
|
| - throw new StateError(
|
| - "Cannot fire new event. Controller is already firing an event");
|
| - }
|
| - if (_isEmpty) return;
|
| -
|
| - // Get event id of this event.
|
| - int id = (_state & _STATE_EVENT_ID);
|
| - // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
|
| - // callbacks while firing, and we prevent reentrancy of this function.
|
| - //
|
| - // Set [_state]'s event id to the next event's id.
|
| - // Any listeners added while firing this event will expect the next event,
|
| - // not this one, and won't get notified.
|
| - _state ^= _STATE_EVENT_ID | _STATE_FIRING;
|
| - _BroadcastSubscriptionLink link = _next;
|
| - while (!identical(link, this)) {
|
| - _BroadcastSubscription<T> subscription = link;
|
| - if (subscription._expectsEvent(id)) {
|
| - subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
|
| - action(subscription);
|
| - subscription._toggleEventId();
|
| - link = subscription._next;
|
| - if (subscription._removeAfterFiring) {
|
| - _removeListener(subscription);
|
| - }
|
| - subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
|
| - } else {
|
| - link = subscription._next;
|
| - }
|
| - }
|
| - _state &= ~_STATE_FIRING;
|
| -
|
| - if (_isEmpty) {
|
| - _callOnCancel();
|
| - }
|
| - }
|
| -
|
| - void _callOnCancel() {
|
| - _runGuarded(_onCancel);
|
| - }
|
| -}
|
| -
|
| -class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| - _SyncBroadcastStreamController(void onListen(), void onCancel())
|
| - : super(onListen, onCancel);
|
| -
|
| - // EventDispatch interface.
|
| -
|
| - void _sendData(T data) {
|
| - if (_isEmpty) return;
|
| - _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| - subscription._add(data);
|
| - });
|
| - }
|
| -
|
| - void _sendError(Object error) {
|
| - if (_isEmpty) return;
|
| - _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| - subscription._addError(error);
|
| - });
|
| - }
|
| -
|
| - void _sendDone() {
|
| - if (_isEmpty) return;
|
| - _forEachListener((_BroadcastSubscription<T> subscription) {
|
| - subscription._close();
|
| - subscription._eventState |=
|
| - _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
|
| - });
|
| - }
|
| -}
|
| -
|
| -class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| - _AsyncBroadcastStreamController(void onListen(), void onCancel())
|
| - : super(onListen, onCancel);
|
| -
|
| - // EventDispatch interface.
|
| -
|
| - void _sendData(T data) {
|
| - for (_BroadcastSubscriptionLink link = _next;
|
| - !identical(link, this);
|
| - link = link._next) {
|
| - _BroadcastSubscription<T> subscription = link;
|
| - subscription._addPending(new _DelayedData(data));
|
| - }
|
| - }
|
| -
|
| - void _sendError(Object error) {
|
| - for (_BroadcastSubscriptionLink link = _next;
|
| - !identical(link, this);
|
| - link = link._next) {
|
| - _BroadcastSubscription<T> subscription = link;
|
| - subscription._addPending(new _DelayedError(error));
|
| - }
|
| - }
|
| -
|
| - void _sendDone() {
|
| - for (_BroadcastSubscriptionLink link = _next;
|
| - !identical(link, this);
|
| - link = link._next) {
|
| - _BroadcastSubscription<T> subscription = link;
|
| - subscription._addPending(const _DelayedDone());
|
| - }
|
| - }
|
| +/** A class that exposes only the [StreamSink] interface of an object. */
|
| +class _StreamSinkWrapper<T> implements StreamSink<T> {
|
| + final StreamSink _target;
|
| + _StreamSinkWrapper(this._target);
|
| + void add(T data) { _target.add(data); }
|
| + void addError(Object error) { _target.addError(error); }
|
| + Future close() => _target.close();
|
| + Future addStream(Stream<T> source) => _target.addStream(source);
|
| + Future get done => _target.done;
|
| }
|
|
|
| /**
|
| - * Stream controller that is used by [Stream.asBroadcastStream].
|
| - *
|
| - * This stream controller allows incoming events while it is firing
|
| - * other events. This is handled by delaying the events until the
|
| - * current event is done firing, and then fire the pending events.
|
| - *
|
| - * This class extends [_SyncBroadcastStreamController]. Events of
|
| - * an "asBroadcastStream" stream are always initiated by events
|
| - * on another stream, and it is fine to forward them synchronously.
|
| + * Object containing the state used to handle [StreamController.addStream].
|
| */
|
| -class _AsBroadcastStreamController<T>
|
| - extends _SyncBroadcastStreamController<T>
|
| - implements _EventDispatch<T> {
|
| - _StreamImplEvents _pending;
|
| +class _AddStreamState<T> {
|
| + // [_FutureImpl] returned by call to addStream.
|
| + _FutureImpl addStreamFuture;
|
|
|
| - _AsBroadcastStreamController(void onListen(), void onCancel())
|
| - : super(onListen, onCancel);
|
| + // Subscription on stream argument to addStream.
|
| + StreamSubscription addSubscription;
|
|
|
| - bool get _hasPending => _pending != null && ! _pending.isEmpty;
|
| + _AddStreamState(StreamSink controller, Stream source)
|
| + : addStreamFuture = new _FutureImpl(),
|
| + addSubscription = source.listen(controller._add,
|
| + onError: controller._addError,
|
| + onDone: controller._close,
|
| + cancelOnError: true);
|
|
|
| - void _addPendingEvent(_DelayedEvent event) {
|
| - if (_pending == null) {
|
| - _pending = new _StreamImplEvents();
|
| - }
|
| - _pending.add(event);
|
| + void pause() {
|
| + addSubscription.pause();
|
| }
|
|
|
| - void add(T data) {
|
| - if (_isFiring) {
|
| - _addPendingEvent(new _DelayedData<T>(data));
|
| - return;
|
| - }
|
| - super.add(data);
|
| - while (_hasPending) {
|
| - _pending.handleNext(this);
|
| - }
|
| + void resume() {
|
| + addSubscription.resume();
|
| }
|
|
|
| - void addError(Object error, [StackTrace stackTrace]) {
|
| - if (_isFiring) {
|
| - _addPendingEvent(new _DelayedError(error));
|
| - return;
|
| - }
|
| - super.addError(error, stackTrace);
|
| - while (_hasPending) {
|
| - _pending.handleNext(this);
|
| - }
|
| + void cancel() {
|
| + addSubscription.cancel();
|
| + complete();
|
| }
|
|
|
| - void close() {
|
| - if (_isFiring) {
|
| - _addPendingEvent(const _DelayedDone());
|
| - _state |= _STATE_CLOSED;
|
| - return;
|
| - }
|
| - super.close();
|
| - assert(!_hasPending);
|
| + void complete() {
|
| + addStreamFuture._asyncSetValue(null);
|
| }
|
| +}
|
|
|
| - void _callOnCancel() {
|
| - if (_hasPending) {
|
| - _pending.clear();
|
| - _pending = null;
|
| +class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
|
| + // The subscription or pending data of a _StreamController.
|
| + // Stored here because we reuse the `_varData` field in the _StreamController
|
| + // to store this state object.
|
| + var varData;
|
| +
|
| + _StreamControllerAddStreamState(_StreamController controller,
|
| + this.varData,
|
| + Stream source) : super(controller, source) {
|
| + if (controller.isPaused) {
|
| + addSubscription.pause();
|
| }
|
| - super._callOnCancel();
|
| }
|
| }
|
|
|