Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index 2cdbeaf240c3553337eae7f2a74a83f3f27b1f66..7a2c75bb966273ff9271c252ead1b78aa2eb6e42 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -46,7 +46,7 @@ part of dart.async; |
| * the stream at all, and won't trigger callbacks. From the controller's point |
| * of view, the stream is completely inert when has completed. |
| */ |
| -abstract class StreamController<T> implements EventSink<T> { |
| +abstract class StreamController<T> implements StreamSink<T> { |
| /** The stream that this controller is controlling. */ |
| Stream<T> get stream; |
| @@ -75,10 +75,17 @@ abstract class StreamController<T> implements EventSink<T> { |
| void onPause(), |
| void onResume(), |
| void onCancel(), |
| - bool sync: false}) |
| - => sync |
| + bool sync: false}) { |
| + if (onListen == null && onPause == null && |
| + onResume == null && onCancel == null) { |
| + return sync |
| + ? new _NoCallbackSyncStreamController<T>() |
| + : new _NoCallbackAsyncStreamController<T>(); |
| + } |
| + return sync |
| ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| + } |
| /** |
| * A controller where [stream] can be listened to more than once. |
| @@ -123,14 +130,14 @@ abstract class StreamController<T> implements EventSink<T> { |
| } |
| /** |
| - * Returns a view of this object that only exposes the [EventSink] interface. |
| + * Returns a view of this object that only exposes the [StreamSink] interface. |
| */ |
| - EventSink<T> get sink; |
| + StreamSink<T> get sink; |
| /** |
| * Whether the stream is closed for adding more events. |
| * |
| - * If true, the "done" event might not have fired yet, but it has been |
| + * If true, the "done" event might not have _ yet, but it has been |
|
floitsch
2013/06/27 15:15:19
undo change?
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| * scheduled, and it is too late to add more events. |
| */ |
| bool get isClosed; |
| @@ -162,7 +169,10 @@ abstract class StreamController<T> implements EventSink<T> { |
| abstract class _StreamControllerLifecycle<T> { |
| - void _recordListen(StreamSubscription<T> subscription) {} |
| + StreamSubscription<T> _subscribe(void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError); |
| void _recordPause(StreamSubscription<T> subscription) {} |
| void _recordResume(StreamSubscription<T> subscription) {} |
| void _recordCancel(StreamSubscription<T> subscription) {} |
| @@ -175,80 +185,197 @@ abstract class _StreamControllerLifecycle<T> { |
| */ |
| abstract class _StreamController<T> implements StreamController<T>, |
| _StreamControllerLifecycle<T>, |
| + _EventSink<T>, |
| _EventDispatch<T> { |
| - static const int _STATE_OPEN = 0; |
| - static const int _STATE_CANCELLED = 1; |
| - static const int _STATE_CLOSED = 2; |
| + // The states are bit-flags. More than one can be set at a time. |
| + // |
| + // The "subscription state" goes through the states: |
| + // initial -> subscribed -> canceled. |
| + // These are mutually exclusive. |
| + // The "closed" state records whether the [close] method has been called |
| + // on the controller. This can be done at any time. If done before |
| + // subscription, the done event is queued. If done after cancel, the done |
| + // event is ignored (just as any other event after a cancel). |
| + |
| + /** The controller is in its initial state with no subscription. */ |
| + static const int _STATE_INITIAL = 0; |
| + /** The controller has a subscription, but hasn't been closed or canceled. */ |
| + static const int _STATE_SUBSCRIBED = 1; |
| + /** The subscription is canceled. */ |
| + static const int _STATE_CANCELED = 2; |
| + /** Mask for the subscription state. */ |
| + static const int _STATE_SUBSCRIPTION_MASK = 3; |
| + |
| + // The following state relate to the controller, not the subscription. |
| + // If closed, adding more events is not allowed. |
| + // If executing an [addStream], now events are not allowed either, but will |
|
floitsch
2013/06/27 15:15:19
-now-
Move description of _STATE_ADDSTREAM below.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
now -> new.
Reworded and reordered.
|
| + // be added by the stream. |
| + /** The controller is closed due to calling [close]. */ |
| + static const int _STATE_CLOSED = 4; |
| + /** The controller is in the middle of an [addStream] call. */ |
| + static const int _STATE_ADDSTREAM = 8; |
| - final _NotificationHandler _onListen; |
| - final _NotificationHandler _onPause; |
| - final _NotificationHandler _onResume; |
| - final _NotificationHandler _onCancel; |
| - _StreamImpl<T> _stream; |
| + /** |
| + * Field containing different data depending on the current subscription |
| + * state. |
| + * |
| + * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] |
| + * for events added to the controller before a subscription. |
| + * |
| + * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. |
| + * |
| + * When [_state] is [_STATE_CANCELED] the field is currently not used. |
| + */ |
| + var _varData; |
| - // An active subscription on the stream, or null if no subscripton is active. |
| - _ControllerSubscription<T> _subscription; |
| + /** Current state of the controller. */ |
| + int _state = _STATE_INITIAL; |
| - // Whether we have sent a "done" event. |
| - int _state = _STATE_OPEN; |
| + /** |
| + * Future completed when the stream sends its last event. |
| + * |
| + * This is also the future returned by [close]. |
| + */ |
| + // TODO(lrn): Could this be stored in the varData field too, if it's not |
| + // accessed until the call to "close"? Then we need to special case if it's |
| + // accessed earlier, or if close is called before subscribing. |
| + _FutureImpl _doneFuture; |
| - // Events added to the stream before it has an active subscription. |
| - _PendingEvents _pendingEvents = null; |
| + _StreamController(); |
| - _StreamController(this._onListen, |
| - this._onPause, |
| - this._onResume, |
| - this._onCancel) { |
| - _stream = new _ControllerStream<T>(this); |
| - } |
| + _NotificationHandler get _onListen; |
| + _NotificationHandler get _onPause; |
| + _NotificationHandler get _onResume; |
| + _NotificationHandler get _onCancel; |
| - Stream<T> get stream => _stream; |
| + // Return a new stream every time. The streams are equal, but not identical. |
| + Stream<T> get stream => new _ControllerStream(this); |
| /** |
| - * Returns a view of this object that only exposes the [EventSink] interface. |
| + * Returns a view of this object that only exposes the [StreamSink] interface. |
| */ |
| - EventSink<T> get sink => new _EventSinkView<T>(this); |
| + StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| /** |
| - * Whether a listener has existed and been cancelled. |
| + * Whether a listener has existed and been canceled. |
| * |
| * After this, adding more events will be ignored. |
| */ |
| - bool get _isCancelled => (_state & _STATE_CANCELLED) != 0; |
| + bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| + |
| + /** Whether there is an active listener. */ |
| + bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; |
| + |
| + /** Whether there has not been a listener yet. */ |
| + bool get _isInitialState => |
| + (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; |
| bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| bool get isPaused => hasListener ? _subscription._isInputPaused |
| - : !_isCancelled; |
| + : !_isCanceled; |
| + |
| + bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| + |
| + /** New events may not be added after close, or during addStream. */ |
| + bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| + |
| + // Returns the pending events. |
| + // Pending events are events added before a subscription exists. |
| + // They are added to the subscription when it is created. |
| + // Pending events, if any, are kept in the _varData field until the |
| + // stream is listened to. |
| + // While adding a stream, pending events are moved into the |
| + // state object to allow the state object to use the _varData field. |
| + _PendingEvents get _pendingEvents { |
| + assert(_isInitialState); |
| + if (!_isAddingStream) { |
| + return _varData; |
| + } |
| + _StreamControllerAddStreamState state = _varData; |
| + return state.varData; |
| + } |
| - bool get hasListener => _subscription != null; |
| + // Returns the pending events, and creates the object if necessary. |
| + _StreamImplEvents _ensurePendingEvents() { |
| + assert(_isInitialState); |
| + if (!_isAddingStream) { |
| + if (_varData == null) _varData = new _StreamImplEvents(); |
| + return _varData; |
| + } |
| + _StreamControllerAddStreamState state = _varData; |
| + if (state.varData == null) state.varData = new _StreamImplEvents(); |
| + return state.varData; |
| + } |
| + |
| + // Get the current subscription. |
| + // If we are adding a stream, the subscription is moved into the state |
| + // object to allow the state object to use the _varData field. |
| + _ControllerSubscription get _subscription { |
| + assert(hasListener); |
| + if (_isAddingStream) { |
| + _StreamControllerAddStreamState addState = _varData; |
| + return addState.varData; |
| + } |
| + return _varData; |
| + } |
| /** |
| - * Send or queue a data event. |
| + * Creates an error describing why an event cannot be added. |
| + * |
| + * The reason, and therefore the error message, depends on the current state. |
| */ |
| - void add(T value) { |
| - if (isClosed) throw new StateError("Adding event after close"); |
| - if (_subscription != null) { |
| - _sendData(value); |
| - } else if (!_isCancelled) { |
| - _addPendingEvent(new _DelayedData<T>(value)); |
| + Error _badEventState() { |
| + if (isClosed) { |
| + return new StateError("Cannot add event after closing"); |
| } |
| + assert(_isAddingStream); |
| + return new StateError("Cannot add event while adding a stream"); |
| + } |
| + |
| + // StreamSink interface. |
| + Future addStream(Stream<T> source) { |
| + if (!_mayAddEvent) throw _badEventState(); |
| + if (_isCanceled) return new _FutureImpl.immediate(null); |
| + _StreamControllerAddStreamState addState = |
| + new _StreamControllerAddStreamState(this, _varData, source); |
| + _varData = addState; |
| + _state |= _STATE_ADDSTREAM; |
| + return addState.addStreamFuture; |
| + } |
| + |
| + Future get done => _ensureDoneFuture(); |
| + |
| + Future _ensureDoneFuture() { |
| + if (_doneFuture == null) { |
| + _doneFuture = new _FutureImpl(); |
| + if (_isCanceled) _doneFuture._setValue(null); |
| + } |
| + return _doneFuture; |
| + } |
| + |
| + /** |
| + * Send or enqueue a data event. |
| + */ |
| + void add(T value) { |
| + if (!_mayAddEvent) throw _badEventState(); |
| + _add(value); |
| } |
| /** |
| * Send or enqueue an error event. |
| */ |
| void addError(Object error, [Object stackTrace]) { |
| - if (isClosed) throw new StateError("Adding event after close"); |
| + if (!_mayAddEvent) throw _badEventState(); |
| if (stackTrace != null) { |
| // Force stack trace overwrite. Even if the error already contained |
| // a stack trace. |
| _attachStackTrace(error, stackTrace); |
| } |
| - if (_subscription != null) { |
| + if (hasListener) { |
| _sendError(error); |
| - } else if (!_isCancelled) { |
| - _addPendingEvent(new _DelayedError(error)); |
| + } else if (_isInitialState) { |
| + _ensurePendingEvents().add(new _DelayedError(error)); |
| } |
| } |
| @@ -263,60 +390,113 @@ abstract class _StreamController<T> implements StreamController<T>, |
| * The first time a controller is closed, a "done" event is sent to its |
| * stream. |
| */ |
| - void close() { |
| - if (isClosed) return; |
| + Future close() { |
| + if (isClosed) { |
| + assert(_doneFuture != null); // Was set when close was first called. |
| + return _doneFuture; |
| + } |
| + if (!_mayAddEvent) throw _badEventState(); |
| _state |= _STATE_CLOSED; |
| - if (_subscription != null) { |
| + _ensureDoneFuture(); |
| + if (hasListener) { |
| _sendDone(); |
| - } else if (!_isCancelled) { |
| - _addPendingEvent(const _DelayedDone()); |
| + } else if (_isInitialState) { |
| + _ensurePendingEvents().add(const _DelayedDone()); |
| } |
| + return _doneFuture; |
| } |
| - // EventDispatch interface |
| + // EventSink interface. Used by the [addStream] events. |
| - void _addPendingEvent(_DelayedEvent event) { |
| - if (_isCancelled) return; |
| - _StreamImplEvents events = _pendingEvents; |
| - if (events == null) { |
| - events = _pendingEvents = new _StreamImplEvents(); |
| + // Add data event, used both by the [addStream] events and by [add]. |
| + void _add(T value) { |
| + if (hasListener) { |
| + _sendData(value); |
| + } else if (_isInitialState) { |
| + _ensurePendingEvents().add(new _DelayedData<T>(value)); |
| } |
| - events.add(event); |
| } |
| - void _recordListen(_BufferingStreamSubscription<T> subscription) { |
| - assert(_subscription == null); |
| - _subscription = subscription; |
| - subscription._setPendingEvents(_pendingEvents); |
| - _pendingEvents = null; |
| + void _addError(Object error) { |
| + // Error from addStream. Stop the addStream and complete its future with the |
|
floitsch
2013/06/27 15:15:19
I don't think that's the right thing to do. Just p
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + // error. |
| + assert(_isAddingStream); |
| + _StreamControllerAddStreamState addState = _varData; |
| + _varData = addState.varData; |
| + _state &= ~_STATE_ADDSTREAM; |
| + addState.completeWithError(error); |
| + } |
| + |
| + void _close() { |
| + // End of addStream stream. |
| + assert(_isAddingStream); |
| + _StreamControllerAddStreamState addState = _varData; |
| + _varData = addState.varData; |
| + _state &= ~_STATE_ADDSTREAM; |
| + addState.complete(); |
| + } |
| + |
| + // _StreamControllerLifeCycle interface |
| + |
| + StreamSubscription<T> _subscribe(void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError) { |
| + if (!_isInitialState) { |
| + throw new StateError("Stream has already been listened to."); |
| + } |
| + _ControllerSubscription subscription = new _ControllerSubscription( |
| + this, onData, onError, onDone, cancelOnError); |
| + |
| + _PendingEvents pendingEvents = _pendingEvents; |
| + _state |= _STATE_SUBSCRIBED; |
| + if (_isAddingStream) { |
| + _StreamControllerAddStreamState addState = _varData; |
| + addState.varData = subscription; |
| + } else { |
| + _varData = subscription; |
| + } |
| + subscription._setPendingEvents(pendingEvents); |
| subscription._guardCallback(() { |
| _runGuarded(_onListen); |
| }); |
| + |
| + return subscription; |
| } |
| void _recordCancel(StreamSubscription<T> subscription) { |
| - assert(identical(_subscription, subscription)); |
| - _subscription = null; |
| - _state |= _STATE_CANCELLED; |
| + if (_isAddingStream) { |
| + _StreamControllerAddStreamState addState = _varData; |
| + addState.cancel(); |
| + } |
| + _varData = null; |
| + _state = |
| + (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| _runGuarded(_onCancel); |
| + if (_doneFuture != null && _doneFuture._mayComplete) { |
| + _doneFuture._asyncSetValue(null); |
| + } |
| } |
| void _recordPause(StreamSubscription<T> subscription) { |
| + if (_isAddingStream) { |
| + _StreamControllerAddStreamState addState = _varData; |
| + addState.pause(); |
| + } |
| _runGuarded(_onPause); |
| } |
| void _recordResume(StreamSubscription<T> subscription) { |
| + if (_isAddingStream) { |
| + _StreamControllerAddStreamState addState = _varData; |
| + addState.resume(); |
| + } |
| _runGuarded(_onResume); |
| } |
| } |
| -class _SyncStreamController<T> extends _StreamController<T> { |
| - _SyncStreamController(void onListen(), |
| - void onPause(), |
| - void onResume(), |
| - void onCancel()) |
| - : super(onListen, onPause, onResume, onCancel); |
| - |
| +abstract class _SyncStreamControllerDispatch<T> |
| + implements _StreamController<T> { |
| void _sendData(T data) { |
| _subscription._add(data); |
| } |
| @@ -330,13 +510,8 @@ class _SyncStreamController<T> extends _StreamController<T> { |
| } |
| } |
| -class _AsyncStreamController<T> extends _StreamController<T> { |
| - _AsyncStreamController(void onListen(), |
| - void onPause(), |
| - void onResume(), |
| - void onCancel()) |
| - : super(onListen, onPause, onResume, onCancel); |
| - |
| +abstract class _AsyncStreamControllerDispatch<T> |
| + implements _StreamController<T> { |
| void _sendData(T data) { |
| _subscription._addPending(new _DelayedData(data)); |
| } |
| @@ -350,6 +525,48 @@ class _AsyncStreamController<T> extends _StreamController<T> { |
| } |
| } |
| +// TODO(lrn): Use common superclass for callback-controllers when VM supports |
| +// constructors in mixin superclasses. |
| + |
| +class _AsyncStreamController<T> extends _StreamController<T> |
| + with _AsyncStreamControllerDispatch<T> { |
| + final _NotificationHandler _onListen; |
| + final _NotificationHandler _onPause; |
| + final _NotificationHandler _onResume; |
| + final _NotificationHandler _onCancel; |
| + |
| + _AsyncStreamController(void this._onListen(), |
| + void this._onPause(), |
| + void this._onResume(), |
| + void this._onCancel()); |
| +} |
| + |
| +class _SyncStreamController<T> extends _StreamController<T> |
| + with _SyncStreamControllerDispatch<T> { |
| + final _NotificationHandler _onListen; |
| + final _NotificationHandler _onPause; |
| + final _NotificationHandler _onResume; |
| + final _NotificationHandler _onCancel; |
| + |
| + _SyncStreamController(void this._onListen(), |
| + void this._onPause(), |
| + void this._onResume(), |
| + void this._onCancel()); |
| +} |
| + |
| +abstract class _NoCallbacks { |
| + _NotificationHandler get _onListen => null; |
| + _NotificationHandler get _onPause => null; |
| + _NotificationHandler get _onResume => null; |
| + _NotificationHandler get _onCancel => null; |
| +} |
| + |
| +typedef _NoCallbackAsyncStreamController<T> = _StreamController<T> |
| + with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| + |
| +typedef _NoCallbackSyncStreamController<T> = _StreamController<T> |
| + with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks; |
| + |
| typedef void _NotificationHandler(); |
| void _runGuarded(_NotificationHandler notificationHandler) { |
| @@ -363,7 +580,6 @@ void _runGuarded(_NotificationHandler notificationHandler) { |
| class _ControllerStream<T> extends _StreamImpl<T> { |
| _StreamControllerLifecycle<T> _controller; |
| - bool _hasListener = false; |
| _ControllerStream(this._controller); |
| @@ -371,17 +587,19 @@ class _ControllerStream<T> extends _StreamImpl<T> { |
| void onData(T data), |
| void onError(Object error), |
| void onDone(), |
| - bool cancelOnError) { |
| - if (_hasListener) { |
| - throw new StateError("The stream has already been listened to."); |
| - } |
| - _hasListener = true; |
| - return new _ControllerSubscription<T>( |
| - _controller, onData, onError, onDone, cancelOnError); |
| - } |
| + bool cancelOnError) => |
| + _controller._subscribe(onData, onError, onDone, cancelOnError); |
| - void _onListen(_BufferingStreamSubscription subscription) { |
| - _controller._recordListen(subscription); |
| + // Override == and hashCode so that new streams returned by the same |
| + // controller are considered equal. The controller returns a new stream |
| + // each time it's queried, but doesn't have to cache the result. |
| + |
| + int get hashCode => _controller.hashCode ^ 0x35323532; |
| + |
| + bool operator==(Object other) { |
| + if (other is! _ControllerStream) return false; |
| + _ControllerStream otherStream = other; |
| + return identical(otherStream._controller, this); |
| } |
| } |
| @@ -408,367 +626,68 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| } |
| } |
| -class _BroadcastStream<T> extends _StreamImpl<T> { |
| - _BroadcastStreamController _controller; |
| - |
| - _BroadcastStream(this._controller); |
| - |
| - bool get isBroadcast => true; |
| - |
| - StreamSubscription<T> _createSubscription( |
| - void onData(T data), |
| - void onError(Object error), |
| - void onDone(), |
| - bool cancelOnError) { |
| - return new _BroadcastSubscription<T>( |
| - _controller, onData, onError, onDone, cancelOnError); |
| - } |
| - |
| - void _onListen(_BufferingStreamSubscription subscription) { |
| - _controller._recordListen(subscription); |
| - } |
| -} |
| - |
| -abstract class _BroadcastSubscriptionLink { |
| - _BroadcastSubscriptionLink _next; |
| - _BroadcastSubscriptionLink _previous; |
| -} |
| - |
| -class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
| - implements _BroadcastSubscriptionLink { |
| - static const int _STATE_EVENT_ID = 1; |
| - static const int _STATE_FIRING = 2; |
| - static const int _STATE_REMOVE_AFTER_FIRING = 4; |
| - int _eventState; |
| - |
| - _BroadcastSubscriptionLink _next; |
| - _BroadcastSubscriptionLink _previous; |
| - |
| - _BroadcastSubscription(_StreamControllerLifecycle controller, |
| - void onData(T data), |
| - void onError(Object error), |
| - void onDone(), |
| - bool cancelOnError) |
| - : super(controller, onData, onError, onDone, cancelOnError) { |
| - _next = _previous = this; |
| - } |
| - |
| - _BroadcastStreamController get _controller => super._controller; |
| - |
| - bool _expectsEvent(int eventId) { |
| - return (_eventState & _STATE_EVENT_ID) == eventId; |
| - } |
| - |
| - void _toggleEventId() { |
| - _eventState ^= _STATE_EVENT_ID; |
| - } |
| - |
| - bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |
| - |
| - bool _setRemoveAfterFiring() { |
| - assert(_isFiring); |
| - _eventState |= _STATE_REMOVE_AFTER_FIRING; |
| - } |
| - |
| - bool get _removeAfterFiring => |
| - (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |
| -} |
| - |
| - |
| -abstract class _BroadcastStreamController<T> |
| - implements StreamController<T>, |
| - _StreamControllerLifecycle<T>, |
| - _BroadcastSubscriptionLink, |
| - _EventDispatch<T> { |
| - static const int _STATE_INITIAL = 0; |
| - static const int _STATE_EVENT_ID = 1; |
| - static const int _STATE_FIRING = 2; |
| - static const int _STATE_CLOSED = 4; |
| - |
| - final _NotificationHandler _onListen; |
| - final _NotificationHandler _onCancel; |
| - |
| - // State of the controller. |
| - int _state; |
| - |
| - // Double-linked list of active listeners. |
| - _BroadcastSubscriptionLink _next; |
| - _BroadcastSubscriptionLink _previous; |
| - |
| - _BroadcastStreamController(this._onListen, this._onCancel) |
| - : _state = _STATE_INITIAL { |
| - _next = _previous = this; |
| - } |
| - |
| - // StreamController interface. |
| - |
| - Stream<T> get stream => new _BroadcastStream<T>(this); |
| - |
| - EventSink<T> get sink => new _EventSinkView<T>(this); |
| - |
| - bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| - |
| - /** |
| - * A broadcast controller is never paused. |
| - * |
| - * Each receiving stream may be paused individually, and they handle their |
| - * own buffering. |
| - */ |
| - bool get isPaused => false; |
| - |
| - /** Whether there are currently a subscriber on the [Stream]. */ |
| - bool get hasListener => !_isEmpty; |
| - |
| - /** Whether an event is being fired (sent to some, but not all, listeners). */ |
| - bool get _isFiring => (_state & _STATE_FIRING) != 0; |
| - |
| - // Linked list helpers |
| - |
| - bool get _isEmpty => identical(_next, this); |
| - |
| - /** Adds subscription to linked list of active listeners. */ |
| - void _addListener(_BroadcastSubscription<T> subscription) { |
| - _BroadcastSubscriptionLink previous = _previous; |
| - previous._next = subscription; |
| - _previous = subscription._previous; |
| - subscription._previous._next = this; |
| - subscription._previous = previous; |
| - subscription._eventState = (_state & _STATE_EVENT_ID); |
| - } |
| - |
| - void _removeListener(_BroadcastSubscription<T> subscription) { |
| - assert(identical(subscription._controller, this)); |
| - assert(!identical(subscription._next, subscription)); |
| - subscription._previous._next = subscription._next; |
| - subscription._next._previous = subscription._previous; |
| - subscription._next = subscription._previous = subscription; |
| - } |
| - |
| - // _StreamControllerLifecycle interface. |
| - |
| - void _recordListen(_BroadcastSubscription<T> subscription) { |
| - _addListener(subscription); |
| - if (identical(_next, _previous)) { |
| - // Only one listener, so it must be the first listener. |
| - _runGuarded(_onListen); |
| - } |
| - } |
| - |
| - void _recordCancel(_BroadcastSubscription<T> subscription) { |
| - if (subscription._isFiring) { |
| - subscription._setRemoveAfterFiring(); |
| - } else { |
| - _removeListener(subscription); |
| - // If we are currently firing an event, the empty-check is performed at |
| - // the end of the listener loop instead of here. |
| - if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
| - _callOnCancel(); |
| - } |
| - } |
| - } |
| - |
| - void _recordPause(StreamSubscription<T> subscription) {} |
| - void _recordResume(StreamSubscription<T> subscription) {} |
| - |
| - // EventSink interface. |
| - |
| - void add(T data) { |
| - if (isClosed) { |
| - throw new StateError("Cannot add new events after calling close()"); |
| - } |
| - _sendData(data); |
| - } |
| - |
| - void addError(Object error, [Object stackTrace]) { |
| - if (isClosed) { |
| - throw new StateError("Cannot add new events after calling close()"); |
| - } |
| - if (stackTrace != null) _attachStackTrace(error, stackTrace); |
| - _sendError(error); |
| - } |
| - void close() { |
| - if (isClosed) { |
| - throw new StateError("Cannot add new events after calling close()"); |
| - } |
| - _state |= _STATE_CLOSED; |
| - _sendDone(); |
| - } |
| - |
| - void _forEachListener( |
| - void action(_BufferingStreamSubscription<T> subscription)) { |
| - if (_isFiring) { |
| - throw new StateError( |
| - "Cannot fire new event. Controller is already firing an event"); |
| - } |
| - if (_isEmpty) return; |
| - |
| - // Get event id of this event. |
| - int id = (_state & _STATE_EVENT_ID); |
| - // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
| - // callbacks while firing, and we prevent reentrancy of this function. |
| - // |
| - // Set [_state]'s event id to the next event's id. |
| - // Any listeners added while firing this event will expect the next event, |
| - // not this one, and won't get notified. |
| - _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
| - _BroadcastSubscriptionLink link = _next; |
| - while (!identical(link, this)) { |
| - _BroadcastSubscription<T> subscription = link; |
| - if (subscription._expectsEvent(id)) { |
| - subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
| - action(subscription); |
| - subscription._toggleEventId(); |
| - link = subscription._next; |
| - if (subscription._removeAfterFiring) { |
| - _removeListener(subscription); |
| - } |
| - subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
| - } else { |
| - link = subscription._next; |
| - } |
| - } |
| - _state &= ~_STATE_FIRING; |
| - |
| - if (_isEmpty) { |
| - _callOnCancel(); |
| - } |
| - } |
| - |
| - void _callOnCancel() { |
| - _runGuarded(_onCancel); |
| - } |
| +/** A class that exposes only the [StreamSink] interface of an object. */ |
| +class _StreamSinkWrapper<T> implements StreamSink<T> { |
| + StreamSink _target; |
|
floitsch
2013/06/27 15:15:19
final.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + _StreamSinkWrapper(this._target); |
| + void add(T data) { _target.add(data); } |
| + void addError(Object error) { _target.addError(error); } |
| + Future close() => _target.close(); |
| + Future addStream(Stream<T> source) => _target.addStream(source); |
| + Future get done => _target.done; |
| } |
| -class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| - _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| - : super(onListen, onCancel); |
| +/** |
| + * Object containing the state used to handle [StreamController.addStream]. |
| + */ |
| +class _AddStreamState<T> { |
| + // [_FutureImpl] returned by call to addStream. |
| + _FutureImpl addStreamFuture; |
| - // EventDispatch interface. |
| + // Subscription on stream argument to addStream. |
| + StreamSubscription addSubscription; |
| - void _sendData(T data) { |
| - if (_isEmpty) return; |
| - _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| - subscription._add(data); |
| - }); |
| - } |
| + _AddStreamState(StreamSink controller, Stream source) |
| + : addStreamFuture = new _FutureImpl(), |
| + addSubscription = source.listen(controller._add, |
| + onError: controller._addError, |
| + onDone: controller._close, |
| + cancelOnError: true); |
| - void _sendError(Object error) { |
| - if (_isEmpty) return; |
| - _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| - subscription._addError(error); |
| - }); |
| + void pause() { |
| + addSubscription.pause(); |
| } |
| - void _sendDone() { |
| - if (_isEmpty) return; |
| - _forEachListener((_BroadcastSubscription<T> subscription) { |
| - subscription._close(); |
| - subscription._eventState |= |
| - _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
| - }); |
| + void resume() { |
| + addSubscription.resume(); |
| } |
| -} |
| -class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| - _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
| - : super(onListen, onCancel); |
| - |
| - // EventDispatch interface. |
| - |
| - void _sendData(T data) { |
| - for (_BroadcastSubscriptionLink link = _next; |
| - !identical(link, this); |
| - link = link._next) { |
| - _BroadcastSubscription<T> subscription = link; |
| - subscription._addPending(new _DelayedData(data)); |
| - } |
| + void cancel() { |
| + addSubscription.cancel(); |
| + complete(); |
| } |
| - void _sendError(Object error) { |
| - for (_BroadcastSubscriptionLink link = _next; |
| - !identical(link, this); |
| - link = link._next) { |
| - _BroadcastSubscription<T> subscription = link; |
| - subscription._addPending(new _DelayedError(error)); |
| - } |
| + void completeWithError(Object error) { |
| + addStreamFuture._asyncSetError(error); |
| } |
| - void _sendDone() { |
| - for (_BroadcastSubscriptionLink link = _next; |
| - !identical(link, this); |
| - link = link._next) { |
| - _BroadcastSubscription<T> subscription = link; |
| - subscription._addPending(const _DelayedDone()); |
| - } |
| + void complete() { |
| + addStreamFuture._asyncSetValue(null); |
| } |
| } |
| -/** |
| - * Stream controller that is used by [Stream.asBroadcastStream]. |
| - * |
| - * This stream controller allows incoming events while it is firing |
| - * other events. This is handled by delaying the events until the |
| - * current event is done firing, and then fire the pending events. |
| - * |
| - * This class extends [_SyncBroadcastStreamController]. Events of |
| - * an "asBroadcastStream" stream are always initiated by events |
| - * on another stream, and it is fine to forward them synchronously. |
| - */ |
| -class _AsBroadcastStreamController<T> |
| - extends _SyncBroadcastStreamController<T> |
| - implements _EventDispatch<T> { |
| - _StreamImplEvents _pending; |
| - |
| - _AsBroadcastStreamController(void onListen(), void onCancel()) |
| - : super(onListen, onCancel); |
| - |
| - bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| - |
| - void _addPendingEvent(_DelayedEvent event) { |
| - if (_pending == null) { |
| - _pending = new _StreamImplEvents(); |
| - } |
| - _pending.add(event); |
| - } |
| - |
| - void add(T data) { |
| - if (_isFiring) { |
| - _addPendingEvent(new _DelayedData<T>(data)); |
| - return; |
| - } |
| - super.add(data); |
| - while (_hasPending) { |
| - _pending.handleNext(this); |
| - } |
| - } |
| - |
| - void addError(Object error, [StackTrace stackTrace]) { |
| - if (_isFiring) { |
| - _addPendingEvent(new _DelayedError(error)); |
| - return; |
| - } |
| - super.addError(error, stackTrace); |
| - while (_hasPending) { |
| - _pending.handleNext(this); |
| - } |
| - } |
| - |
| - void close() { |
| - if (_isFiring) { |
| - _addPendingEvent(const _DelayedDone()); |
| - _state |= _STATE_CLOSED; |
| - return; |
| - } |
| - super.close(); |
| - assert(!_hasPending); |
| - } |
| - |
| - void _callOnCancel() { |
| - if (_hasPending) { |
| - _pending.clear(); |
| - _pending = null; |
| +class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
| + // The subscription or pending data of a _StreamController. |
| + // Stored here because we reuse the `_varData` field in the _StreamController |
| + // to store this state object. |
| + var varData; |
| + |
| + _StreamControllerAddStreamState(_StreamController controller, |
| + this.varData, |
| + Stream source) : super(controller, source) { |
| + if (controller.isPaused) { |
| + addSubscription.pause(); |
| } |
| - super._callOnCancel(); |
| } |
| } |