Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(256)

Unified Diff: sdk/lib/async/stream_controller.dart

Issue 16240008: Make StreamController be a StreamSink, not just an EventSink. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Complete rewrite. StreamController is now itself a StreamSink. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sdk/lib/async/stream_controller.dart
diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
index 2cdbeaf240c3553337eae7f2a74a83f3f27b1f66..7a2c75bb966273ff9271c252ead1b78aa2eb6e42 100644
--- a/sdk/lib/async/stream_controller.dart
+++ b/sdk/lib/async/stream_controller.dart
@@ -46,7 +46,7 @@ part of dart.async;
* the stream at all, and won't trigger callbacks. From the controller's point
* of view, the stream is completely inert when has completed.
*/
-abstract class StreamController<T> implements EventSink<T> {
+abstract class StreamController<T> implements StreamSink<T> {
/** The stream that this controller is controlling. */
Stream<T> get stream;
@@ -75,10 +75,17 @@ abstract class StreamController<T> implements EventSink<T> {
void onPause(),
void onResume(),
void onCancel(),
- bool sync: false})
- => sync
+ bool sync: false}) {
+ if (onListen == null && onPause == null &&
+ onResume == null && onCancel == null) {
+ return sync
+ ? new _NoCallbackSyncStreamController<T>()
+ : new _NoCallbackAsyncStreamController<T>();
+ }
+ return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
+ }
/**
* A controller where [stream] can be listened to more than once.
@@ -123,14 +130,14 @@ abstract class StreamController<T> implements EventSink<T> {
}
/**
- * Returns a view of this object that only exposes the [EventSink] interface.
+ * Returns a view of this object that only exposes the [StreamSink] interface.
*/
- EventSink<T> get sink;
+ StreamSink<T> get sink;
/**
* Whether the stream is closed for adding more events.
*
- * If true, the "done" event might not have fired yet, but it has been
+ * If true, the "done" event might not have _ yet, but it has been
floitsch 2013/06/27 15:15:19 undo change?
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
* scheduled, and it is too late to add more events.
*/
bool get isClosed;
@@ -162,7 +169,10 @@ abstract class StreamController<T> implements EventSink<T> {
abstract class _StreamControllerLifecycle<T> {
- void _recordListen(StreamSubscription<T> subscription) {}
+ StreamSubscription<T> _subscribe(void onData(T data),
+ void onError(Object error),
+ void onDone(),
+ bool cancelOnError);
void _recordPause(StreamSubscription<T> subscription) {}
void _recordResume(StreamSubscription<T> subscription) {}
void _recordCancel(StreamSubscription<T> subscription) {}
@@ -175,80 +185,197 @@ abstract class _StreamControllerLifecycle<T> {
*/
abstract class _StreamController<T> implements StreamController<T>,
_StreamControllerLifecycle<T>,
+ _EventSink<T>,
_EventDispatch<T> {
- static const int _STATE_OPEN = 0;
- static const int _STATE_CANCELLED = 1;
- static const int _STATE_CLOSED = 2;
+ // The states are bit-flags. More than one can be set at a time.
+ //
+ // The "subscription state" goes through the states:
+ // initial -> subscribed -> canceled.
+ // These are mutually exclusive.
+ // The "closed" state records whether the [close] method has been called
+ // on the controller. This can be done at any time. If done before
+ // subscription, the done event is queued. If done after cancel, the done
+ // event is ignored (just as any other event after a cancel).
+
+ /** The controller is in its initial state with no subscription. */
+ static const int _STATE_INITIAL = 0;
+ /** The controller has a subscription, but hasn't been closed or canceled. */
+ static const int _STATE_SUBSCRIBED = 1;
+ /** The subscription is canceled. */
+ static const int _STATE_CANCELED = 2;
+ /** Mask for the subscription state. */
+ static const int _STATE_SUBSCRIPTION_MASK = 3;
+
+ // The following state relate to the controller, not the subscription.
+ // If closed, adding more events is not allowed.
+ // If executing an [addStream], now events are not allowed either, but will
floitsch 2013/06/27 15:15:19 -now- Move description of _STATE_ADDSTREAM below.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 now -> new. Reworded and reordered.
+ // be added by the stream.
+ /** The controller is closed due to calling [close]. */
+ static const int _STATE_CLOSED = 4;
+ /** The controller is in the middle of an [addStream] call. */
+ static const int _STATE_ADDSTREAM = 8;
- final _NotificationHandler _onListen;
- final _NotificationHandler _onPause;
- final _NotificationHandler _onResume;
- final _NotificationHandler _onCancel;
- _StreamImpl<T> _stream;
+ /**
+ * Field containing different data depending on the current subscription
+ * state.
+ *
+ * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
+ * for events added to the controller before a subscription.
+ *
+ * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
+ *
+ * When [_state] is [_STATE_CANCELED] the field is currently not used.
+ */
+ var _varData;
- // An active subscription on the stream, or null if no subscripton is active.
- _ControllerSubscription<T> _subscription;
+ /** Current state of the controller. */
+ int _state = _STATE_INITIAL;
- // Whether we have sent a "done" event.
- int _state = _STATE_OPEN;
+ /**
+ * Future completed when the stream sends its last event.
+ *
+ * This is also the future returned by [close].
+ */
+ // TODO(lrn): Could this be stored in the varData field too, if it's not
+ // accessed until the call to "close"? Then we need to special case if it's
+ // accessed earlier, or if close is called before subscribing.
+ _FutureImpl _doneFuture;
- // Events added to the stream before it has an active subscription.
- _PendingEvents _pendingEvents = null;
+ _StreamController();
- _StreamController(this._onListen,
- this._onPause,
- this._onResume,
- this._onCancel) {
- _stream = new _ControllerStream<T>(this);
- }
+ _NotificationHandler get _onListen;
+ _NotificationHandler get _onPause;
+ _NotificationHandler get _onResume;
+ _NotificationHandler get _onCancel;
- Stream<T> get stream => _stream;
+ // Return a new stream every time. The streams are equal, but not identical.
+ Stream<T> get stream => new _ControllerStream(this);
/**
- * Returns a view of this object that only exposes the [EventSink] interface.
+ * Returns a view of this object that only exposes the [StreamSink] interface.
*/
- EventSink<T> get sink => new _EventSinkView<T>(this);
+ StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
/**
- * Whether a listener has existed and been cancelled.
+ * Whether a listener has existed and been canceled.
*
* After this, adding more events will be ignored.
*/
- bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
+ bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
+
+ /** Whether there is an active listener. */
+ bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;
+
+ /** Whether there has not been a listener yet. */
+ bool get _isInitialState =>
+ (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
bool get isClosed => (_state & _STATE_CLOSED) != 0;
bool get isPaused => hasListener ? _subscription._isInputPaused
- : !_isCancelled;
+ : !_isCanceled;
+
+ bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
+
+ /** New events may not be added after close, or during addStream. */
+ bool get _mayAddEvent => (_state < _STATE_CLOSED);
+
+ // Returns the pending events.
+ // Pending events are events added before a subscription exists.
+ // They are added to the subscription when it is created.
+ // Pending events, if any, are kept in the _varData field until the
+ // stream is listened to.
+ // While adding a stream, pending events are moved into the
+ // state object to allow the state object to use the _varData field.
+ _PendingEvents get _pendingEvents {
+ assert(_isInitialState);
+ if (!_isAddingStream) {
+ return _varData;
+ }
+ _StreamControllerAddStreamState state = _varData;
+ return state.varData;
+ }
- bool get hasListener => _subscription != null;
+ // Returns the pending events, and creates the object if necessary.
+ _StreamImplEvents _ensurePendingEvents() {
+ assert(_isInitialState);
+ if (!_isAddingStream) {
+ if (_varData == null) _varData = new _StreamImplEvents();
+ return _varData;
+ }
+ _StreamControllerAddStreamState state = _varData;
+ if (state.varData == null) state.varData = new _StreamImplEvents();
+ return state.varData;
+ }
+
+ // Get the current subscription.
+ // If we are adding a stream, the subscription is moved into the state
+ // object to allow the state object to use the _varData field.
+ _ControllerSubscription get _subscription {
+ assert(hasListener);
+ if (_isAddingStream) {
+ _StreamControllerAddStreamState addState = _varData;
+ return addState.varData;
+ }
+ return _varData;
+ }
/**
- * Send or queue a data event.
+ * Creates an error describing why an event cannot be added.
+ *
+ * The reason, and therefore the error message, depends on the current state.
*/
- void add(T value) {
- if (isClosed) throw new StateError("Adding event after close");
- if (_subscription != null) {
- _sendData(value);
- } else if (!_isCancelled) {
- _addPendingEvent(new _DelayedData<T>(value));
+ Error _badEventState() {
+ if (isClosed) {
+ return new StateError("Cannot add event after closing");
}
+ assert(_isAddingStream);
+ return new StateError("Cannot add event while adding a stream");
+ }
+
+ // StreamSink interface.
+ Future addStream(Stream<T> source) {
+ if (!_mayAddEvent) throw _badEventState();
+ if (_isCanceled) return new _FutureImpl.immediate(null);
+ _StreamControllerAddStreamState addState =
+ new _StreamControllerAddStreamState(this, _varData, source);
+ _varData = addState;
+ _state |= _STATE_ADDSTREAM;
+ return addState.addStreamFuture;
+ }
+
+ Future get done => _ensureDoneFuture();
+
+ Future _ensureDoneFuture() {
+ if (_doneFuture == null) {
+ _doneFuture = new _FutureImpl();
+ if (_isCanceled) _doneFuture._setValue(null);
+ }
+ return _doneFuture;
+ }
+
+ /**
+ * Send or enqueue a data event.
+ */
+ void add(T value) {
+ if (!_mayAddEvent) throw _badEventState();
+ _add(value);
}
/**
* Send or enqueue an error event.
*/
void addError(Object error, [Object stackTrace]) {
- if (isClosed) throw new StateError("Adding event after close");
+ if (!_mayAddEvent) throw _badEventState();
if (stackTrace != null) {
// Force stack trace overwrite. Even if the error already contained
// a stack trace.
_attachStackTrace(error, stackTrace);
}
- if (_subscription != null) {
+ if (hasListener) {
_sendError(error);
- } else if (!_isCancelled) {
- _addPendingEvent(new _DelayedError(error));
+ } else if (_isInitialState) {
+ _ensurePendingEvents().add(new _DelayedError(error));
}
}
@@ -263,60 +390,113 @@ abstract class _StreamController<T> implements StreamController<T>,
* The first time a controller is closed, a "done" event is sent to its
* stream.
*/
- void close() {
- if (isClosed) return;
+ Future close() {
+ if (isClosed) {
+ assert(_doneFuture != null); // Was set when close was first called.
+ return _doneFuture;
+ }
+ if (!_mayAddEvent) throw _badEventState();
_state |= _STATE_CLOSED;
- if (_subscription != null) {
+ _ensureDoneFuture();
+ if (hasListener) {
_sendDone();
- } else if (!_isCancelled) {
- _addPendingEvent(const _DelayedDone());
+ } else if (_isInitialState) {
+ _ensurePendingEvents().add(const _DelayedDone());
}
+ return _doneFuture;
}
- // EventDispatch interface
+ // EventSink interface. Used by the [addStream] events.
- void _addPendingEvent(_DelayedEvent event) {
- if (_isCancelled) return;
- _StreamImplEvents events = _pendingEvents;
- if (events == null) {
- events = _pendingEvents = new _StreamImplEvents();
+ // Add data event, used both by the [addStream] events and by [add].
+ void _add(T value) {
+ if (hasListener) {
+ _sendData(value);
+ } else if (_isInitialState) {
+ _ensurePendingEvents().add(new _DelayedData<T>(value));
}
- events.add(event);
}
- void _recordListen(_BufferingStreamSubscription<T> subscription) {
- assert(_subscription == null);
- _subscription = subscription;
- subscription._setPendingEvents(_pendingEvents);
- _pendingEvents = null;
+ void _addError(Object error) {
+ // Error from addStream. Stop the addStream and complete its future with the
floitsch 2013/06/27 15:15:19 I don't think that's the right thing to do. Just p
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
+ // error.
+ assert(_isAddingStream);
+ _StreamControllerAddStreamState addState = _varData;
+ _varData = addState.varData;
+ _state &= ~_STATE_ADDSTREAM;
+ addState.completeWithError(error);
+ }
+
+ void _close() {
+ // End of addStream stream.
+ assert(_isAddingStream);
+ _StreamControllerAddStreamState addState = _varData;
+ _varData = addState.varData;
+ _state &= ~_STATE_ADDSTREAM;
+ addState.complete();
+ }
+
+ // _StreamControllerLifeCycle interface
+
+ StreamSubscription<T> _subscribe(void onData(T data),
+ void onError(Object error),
+ void onDone(),
+ bool cancelOnError) {
+ if (!_isInitialState) {
+ throw new StateError("Stream has already been listened to.");
+ }
+ _ControllerSubscription subscription = new _ControllerSubscription(
+ this, onData, onError, onDone, cancelOnError);
+
+ _PendingEvents pendingEvents = _pendingEvents;
+ _state |= _STATE_SUBSCRIBED;
+ if (_isAddingStream) {
+ _StreamControllerAddStreamState addState = _varData;
+ addState.varData = subscription;
+ } else {
+ _varData = subscription;
+ }
+ subscription._setPendingEvents(pendingEvents);
subscription._guardCallback(() {
_runGuarded(_onListen);
});
+
+ return subscription;
}
void _recordCancel(StreamSubscription<T> subscription) {
- assert(identical(_subscription, subscription));
- _subscription = null;
- _state |= _STATE_CANCELLED;
+ if (_isAddingStream) {
+ _StreamControllerAddStreamState addState = _varData;
+ addState.cancel();
+ }
+ _varData = null;
+ _state =
+ (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
_runGuarded(_onCancel);
+ if (_doneFuture != null && _doneFuture._mayComplete) {
+ _doneFuture._asyncSetValue(null);
+ }
}
void _recordPause(StreamSubscription<T> subscription) {
+ if (_isAddingStream) {
+ _StreamControllerAddStreamState addState = _varData;
+ addState.pause();
+ }
_runGuarded(_onPause);
}
void _recordResume(StreamSubscription<T> subscription) {
+ if (_isAddingStream) {
+ _StreamControllerAddStreamState addState = _varData;
+ addState.resume();
+ }
_runGuarded(_onResume);
}
}
-class _SyncStreamController<T> extends _StreamController<T> {
- _SyncStreamController(void onListen(),
- void onPause(),
- void onResume(),
- void onCancel())
- : super(onListen, onPause, onResume, onCancel);
-
+abstract class _SyncStreamControllerDispatch<T>
+ implements _StreamController<T> {
void _sendData(T data) {
_subscription._add(data);
}
@@ -330,13 +510,8 @@ class _SyncStreamController<T> extends _StreamController<T> {
}
}
-class _AsyncStreamController<T> extends _StreamController<T> {
- _AsyncStreamController(void onListen(),
- void onPause(),
- void onResume(),
- void onCancel())
- : super(onListen, onPause, onResume, onCancel);
-
+abstract class _AsyncStreamControllerDispatch<T>
+ implements _StreamController<T> {
void _sendData(T data) {
_subscription._addPending(new _DelayedData(data));
}
@@ -350,6 +525,48 @@ class _AsyncStreamController<T> extends _StreamController<T> {
}
}
+// TODO(lrn): Use common superclass for callback-controllers when VM supports
+// constructors in mixin superclasses.
+
+class _AsyncStreamController<T> extends _StreamController<T>
+ with _AsyncStreamControllerDispatch<T> {
+ final _NotificationHandler _onListen;
+ final _NotificationHandler _onPause;
+ final _NotificationHandler _onResume;
+ final _NotificationHandler _onCancel;
+
+ _AsyncStreamController(void this._onListen(),
+ void this._onPause(),
+ void this._onResume(),
+ void this._onCancel());
+}
+
+class _SyncStreamController<T> extends _StreamController<T>
+ with _SyncStreamControllerDispatch<T> {
+ final _NotificationHandler _onListen;
+ final _NotificationHandler _onPause;
+ final _NotificationHandler _onResume;
+ final _NotificationHandler _onCancel;
+
+ _SyncStreamController(void this._onListen(),
+ void this._onPause(),
+ void this._onResume(),
+ void this._onCancel());
+}
+
+abstract class _NoCallbacks {
+ _NotificationHandler get _onListen => null;
+ _NotificationHandler get _onPause => null;
+ _NotificationHandler get _onResume => null;
+ _NotificationHandler get _onCancel => null;
+}
+
+typedef _NoCallbackAsyncStreamController<T> = _StreamController<T>
+ with _AsyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
+
+typedef _NoCallbackSyncStreamController<T> = _StreamController<T>
+ with _SyncStreamControllerDispatch/*<T>*/, _NoCallbacks;
+
typedef void _NotificationHandler();
void _runGuarded(_NotificationHandler notificationHandler) {
@@ -363,7 +580,6 @@ void _runGuarded(_NotificationHandler notificationHandler) {
class _ControllerStream<T> extends _StreamImpl<T> {
_StreamControllerLifecycle<T> _controller;
- bool _hasListener = false;
_ControllerStream(this._controller);
@@ -371,17 +587,19 @@ class _ControllerStream<T> extends _StreamImpl<T> {
void onData(T data),
void onError(Object error),
void onDone(),
- bool cancelOnError) {
- if (_hasListener) {
- throw new StateError("The stream has already been listened to.");
- }
- _hasListener = true;
- return new _ControllerSubscription<T>(
- _controller, onData, onError, onDone, cancelOnError);
- }
+ bool cancelOnError) =>
+ _controller._subscribe(onData, onError, onDone, cancelOnError);
- void _onListen(_BufferingStreamSubscription subscription) {
- _controller._recordListen(subscription);
+ // Override == and hashCode so that new streams returned by the same
+ // controller are considered equal. The controller returns a new stream
+ // each time it's queried, but doesn't have to cache the result.
+
+ int get hashCode => _controller.hashCode ^ 0x35323532;
+
+ bool operator==(Object other) {
+ if (other is! _ControllerStream) return false;
+ _ControllerStream otherStream = other;
+ return identical(otherStream._controller, this);
}
}
@@ -408,367 +626,68 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
}
}
-class _BroadcastStream<T> extends _StreamImpl<T> {
- _BroadcastStreamController _controller;
-
- _BroadcastStream(this._controller);
-
- bool get isBroadcast => true;
-
- StreamSubscription<T> _createSubscription(
- void onData(T data),
- void onError(Object error),
- void onDone(),
- bool cancelOnError) {
- return new _BroadcastSubscription<T>(
- _controller, onData, onError, onDone, cancelOnError);
- }
-
- void _onListen(_BufferingStreamSubscription subscription) {
- _controller._recordListen(subscription);
- }
-}
-
-abstract class _BroadcastSubscriptionLink {
- _BroadcastSubscriptionLink _next;
- _BroadcastSubscriptionLink _previous;
-}
-
-class _BroadcastSubscription<T> extends _ControllerSubscription<T>
- implements _BroadcastSubscriptionLink {
- static const int _STATE_EVENT_ID = 1;
- static const int _STATE_FIRING = 2;
- static const int _STATE_REMOVE_AFTER_FIRING = 4;
- int _eventState;
-
- _BroadcastSubscriptionLink _next;
- _BroadcastSubscriptionLink _previous;
-
- _BroadcastSubscription(_StreamControllerLifecycle controller,
- void onData(T data),
- void onError(Object error),
- void onDone(),
- bool cancelOnError)
- : super(controller, onData, onError, onDone, cancelOnError) {
- _next = _previous = this;
- }
-
- _BroadcastStreamController get _controller => super._controller;
-
- bool _expectsEvent(int eventId) {
- return (_eventState & _STATE_EVENT_ID) == eventId;
- }
-
- void _toggleEventId() {
- _eventState ^= _STATE_EVENT_ID;
- }
-
- bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
-
- bool _setRemoveAfterFiring() {
- assert(_isFiring);
- _eventState |= _STATE_REMOVE_AFTER_FIRING;
- }
-
- bool get _removeAfterFiring =>
- (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
-}
-
-
-abstract class _BroadcastStreamController<T>
- implements StreamController<T>,
- _StreamControllerLifecycle<T>,
- _BroadcastSubscriptionLink,
- _EventDispatch<T> {
- static const int _STATE_INITIAL = 0;
- static const int _STATE_EVENT_ID = 1;
- static const int _STATE_FIRING = 2;
- static const int _STATE_CLOSED = 4;
-
- final _NotificationHandler _onListen;
- final _NotificationHandler _onCancel;
-
- // State of the controller.
- int _state;
-
- // Double-linked list of active listeners.
- _BroadcastSubscriptionLink _next;
- _BroadcastSubscriptionLink _previous;
-
- _BroadcastStreamController(this._onListen, this._onCancel)
- : _state = _STATE_INITIAL {
- _next = _previous = this;
- }
-
- // StreamController interface.
-
- Stream<T> get stream => new _BroadcastStream<T>(this);
-
- EventSink<T> get sink => new _EventSinkView<T>(this);
-
- bool get isClosed => (_state & _STATE_CLOSED) != 0;
-
- /**
- * A broadcast controller is never paused.
- *
- * Each receiving stream may be paused individually, and they handle their
- * own buffering.
- */
- bool get isPaused => false;
-
- /** Whether there are currently a subscriber on the [Stream]. */
- bool get hasListener => !_isEmpty;
-
- /** Whether an event is being fired (sent to some, but not all, listeners). */
- bool get _isFiring => (_state & _STATE_FIRING) != 0;
-
- // Linked list helpers
-
- bool get _isEmpty => identical(_next, this);
-
- /** Adds subscription to linked list of active listeners. */
- void _addListener(_BroadcastSubscription<T> subscription) {
- _BroadcastSubscriptionLink previous = _previous;
- previous._next = subscription;
- _previous = subscription._previous;
- subscription._previous._next = this;
- subscription._previous = previous;
- subscription._eventState = (_state & _STATE_EVENT_ID);
- }
-
- void _removeListener(_BroadcastSubscription<T> subscription) {
- assert(identical(subscription._controller, this));
- assert(!identical(subscription._next, subscription));
- subscription._previous._next = subscription._next;
- subscription._next._previous = subscription._previous;
- subscription._next = subscription._previous = subscription;
- }
-
- // _StreamControllerLifecycle interface.
-
- void _recordListen(_BroadcastSubscription<T> subscription) {
- _addListener(subscription);
- if (identical(_next, _previous)) {
- // Only one listener, so it must be the first listener.
- _runGuarded(_onListen);
- }
- }
-
- void _recordCancel(_BroadcastSubscription<T> subscription) {
- if (subscription._isFiring) {
- subscription._setRemoveAfterFiring();
- } else {
- _removeListener(subscription);
- // If we are currently firing an event, the empty-check is performed at
- // the end of the listener loop instead of here.
- if ((_state & _STATE_FIRING) == 0 && _isEmpty) {
- _callOnCancel();
- }
- }
- }
-
- void _recordPause(StreamSubscription<T> subscription) {}
- void _recordResume(StreamSubscription<T> subscription) {}
-
- // EventSink interface.
-
- void add(T data) {
- if (isClosed) {
- throw new StateError("Cannot add new events after calling close()");
- }
- _sendData(data);
- }
-
- void addError(Object error, [Object stackTrace]) {
- if (isClosed) {
- throw new StateError("Cannot add new events after calling close()");
- }
- if (stackTrace != null) _attachStackTrace(error, stackTrace);
- _sendError(error);
- }
- void close() {
- if (isClosed) {
- throw new StateError("Cannot add new events after calling close()");
- }
- _state |= _STATE_CLOSED;
- _sendDone();
- }
-
- void _forEachListener(
- void action(_BufferingStreamSubscription<T> subscription)) {
- if (_isFiring) {
- throw new StateError(
- "Cannot fire new event. Controller is already firing an event");
- }
- if (_isEmpty) return;
-
- // Get event id of this event.
- int id = (_state & _STATE_EVENT_ID);
- // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
- // callbacks while firing, and we prevent reentrancy of this function.
- //
- // Set [_state]'s event id to the next event's id.
- // Any listeners added while firing this event will expect the next event,
- // not this one, and won't get notified.
- _state ^= _STATE_EVENT_ID | _STATE_FIRING;
- _BroadcastSubscriptionLink link = _next;
- while (!identical(link, this)) {
- _BroadcastSubscription<T> subscription = link;
- if (subscription._expectsEvent(id)) {
- subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
- action(subscription);
- subscription._toggleEventId();
- link = subscription._next;
- if (subscription._removeAfterFiring) {
- _removeListener(subscription);
- }
- subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
- } else {
- link = subscription._next;
- }
- }
- _state &= ~_STATE_FIRING;
-
- if (_isEmpty) {
- _callOnCancel();
- }
- }
-
- void _callOnCancel() {
- _runGuarded(_onCancel);
- }
+/** A class that exposes only the [StreamSink] interface of an object. */
+class _StreamSinkWrapper<T> implements StreamSink<T> {
+ StreamSink _target;
floitsch 2013/06/27 15:15:19 final.
Lasse Reichstein Nielsen 2013/06/28 12:57:38 Done.
+ _StreamSinkWrapper(this._target);
+ void add(T data) { _target.add(data); }
+ void addError(Object error) { _target.addError(error); }
+ Future close() => _target.close();
+ Future addStream(Stream<T> source) => _target.addStream(source);
+ Future get done => _target.done;
}
-class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
- _SyncBroadcastStreamController(void onListen(), void onCancel())
- : super(onListen, onCancel);
+/**
+ * Object containing the state used to handle [StreamController.addStream].
+ */
+class _AddStreamState<T> {
+ // [_FutureImpl] returned by call to addStream.
+ _FutureImpl addStreamFuture;
- // EventDispatch interface.
+ // Subscription on stream argument to addStream.
+ StreamSubscription addSubscription;
- void _sendData(T data) {
- if (_isEmpty) return;
- _forEachListener((_BufferingStreamSubscription<T> subscription) {
- subscription._add(data);
- });
- }
+ _AddStreamState(StreamSink controller, Stream source)
+ : addStreamFuture = new _FutureImpl(),
+ addSubscription = source.listen(controller._add,
+ onError: controller._addError,
+ onDone: controller._close,
+ cancelOnError: true);
- void _sendError(Object error) {
- if (_isEmpty) return;
- _forEachListener((_BufferingStreamSubscription<T> subscription) {
- subscription._addError(error);
- });
+ void pause() {
+ addSubscription.pause();
}
- void _sendDone() {
- if (_isEmpty) return;
- _forEachListener((_BroadcastSubscription<T> subscription) {
- subscription._close();
- subscription._eventState |=
- _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
- });
+ void resume() {
+ addSubscription.resume();
}
-}
-class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
- _AsyncBroadcastStreamController(void onListen(), void onCancel())
- : super(onListen, onCancel);
-
- // EventDispatch interface.
-
- void _sendData(T data) {
- for (_BroadcastSubscriptionLink link = _next;
- !identical(link, this);
- link = link._next) {
- _BroadcastSubscription<T> subscription = link;
- subscription._addPending(new _DelayedData(data));
- }
+ void cancel() {
+ addSubscription.cancel();
+ complete();
}
- void _sendError(Object error) {
- for (_BroadcastSubscriptionLink link = _next;
- !identical(link, this);
- link = link._next) {
- _BroadcastSubscription<T> subscription = link;
- subscription._addPending(new _DelayedError(error));
- }
+ void completeWithError(Object error) {
+ addStreamFuture._asyncSetError(error);
}
- void _sendDone() {
- for (_BroadcastSubscriptionLink link = _next;
- !identical(link, this);
- link = link._next) {
- _BroadcastSubscription<T> subscription = link;
- subscription._addPending(const _DelayedDone());
- }
+ void complete() {
+ addStreamFuture._asyncSetValue(null);
}
}
-/**
- * Stream controller that is used by [Stream.asBroadcastStream].
- *
- * This stream controller allows incoming events while it is firing
- * other events. This is handled by delaying the events until the
- * current event is done firing, and then fire the pending events.
- *
- * This class extends [_SyncBroadcastStreamController]. Events of
- * an "asBroadcastStream" stream are always initiated by events
- * on another stream, and it is fine to forward them synchronously.
- */
-class _AsBroadcastStreamController<T>
- extends _SyncBroadcastStreamController<T>
- implements _EventDispatch<T> {
- _StreamImplEvents _pending;
-
- _AsBroadcastStreamController(void onListen(), void onCancel())
- : super(onListen, onCancel);
-
- bool get _hasPending => _pending != null && ! _pending.isEmpty;
-
- void _addPendingEvent(_DelayedEvent event) {
- if (_pending == null) {
- _pending = new _StreamImplEvents();
- }
- _pending.add(event);
- }
-
- void add(T data) {
- if (_isFiring) {
- _addPendingEvent(new _DelayedData<T>(data));
- return;
- }
- super.add(data);
- while (_hasPending) {
- _pending.handleNext(this);
- }
- }
-
- void addError(Object error, [StackTrace stackTrace]) {
- if (_isFiring) {
- _addPendingEvent(new _DelayedError(error));
- return;
- }
- super.addError(error, stackTrace);
- while (_hasPending) {
- _pending.handleNext(this);
- }
- }
-
- void close() {
- if (_isFiring) {
- _addPendingEvent(const _DelayedDone());
- _state |= _STATE_CLOSED;
- return;
- }
- super.close();
- assert(!_hasPending);
- }
-
- void _callOnCancel() {
- if (_hasPending) {
- _pending.clear();
- _pending = null;
+class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
+ // The subscription or pending data of a _StreamController.
+ // Stored here because we reuse the `_varData` field in the _StreamController
+ // to store this state object.
+ var varData;
+
+ _StreamControllerAddStreamState(_StreamController controller,
+ this.varData,
+ Stream source) : super(controller, source) {
+ if (controller.isPaused) {
+ addSubscription.pause();
}
- super._callOnCancel();
}
}

Powered by Google App Engine
This is Rietveld 408576698