| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index 6ff3ee2417b7d41939bab032546af7abf7140b41..501b588e19ed63777cddc004e528d35290e44c52 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -47,7 +47,24 @@ part of dart.async;
|
| * of view, the stream is completely inert when has completed.
|
| */
|
| class StreamController<T> extends EventSink<T> {
|
| - final _StreamImpl<T> stream;
|
| + static const int _STATE_OPEN = 0;
|
| + static const int _STATE_CANCELLED = 1;
|
| + static const int _STATE_CLOSED = 2;
|
| +
|
| + final _NotificationHandler _onListen;
|
| + final _NotificationHandler _onPause;
|
| + final _NotificationHandler _onResume;
|
| + final _NotificationHandler _onCancel;
|
| + _StreamImpl<T> _stream;
|
| +
|
| + // An active subscription on the stream, or null if no subscripton is active.
|
| + _ControllerSubscription<T> _subscription;
|
| +
|
| + // Whether we have sent a "done" event.
|
| + int _state = _STATE_OPEN;
|
| +
|
| + // Events added to the stream before it has an active subscription.
|
| + _PendingEvents _pendingEvents = null;
|
|
|
| /**
|
| * A controller with a [stream] that supports only one single subscriber.
|
| @@ -59,7 +76,7 @@ class StreamController<T> extends EventSink<T> {
|
| * paused. [onResume] is called when the stream resumed.
|
| *
|
| * The [onListen] callback is called when the stream
|
| - * receives its listener. [onCancel] when the listener cancels
|
| + * receives its listener and [onCancel] when the listener ends
|
| * its subscription.
|
| *
|
| * If the stream is canceled before the controller needs new data the
|
| @@ -69,8 +86,14 @@ class StreamController<T> extends EventSink<T> {
|
| void onPause(),
|
| void onResume(),
|
| void onCancel()})
|
| - : stream = new _SingleControllerStream<T>(
|
| - onListen, onPause, onResume, onCancel);
|
| + : _onListen = onListen,
|
| + _onPause = onPause,
|
| + _onResume = onResume,
|
| + _onCancel = onCancel {
|
| + _stream = new _ControllerStream<T>(this);
|
| + }
|
| +
|
| + Stream<T> get stream => _stream;
|
|
|
| /**
|
| * Returns a view of this object that only exposes the [EventSink] interface.
|
| @@ -78,76 +101,165 @@ class StreamController<T> extends EventSink<T> {
|
| EventSink<T> get sink => new _EventSinkView<T>(this);
|
|
|
| /**
|
| + * Whether a listener has existed and been cancelled.
|
| + *
|
| + * After this, adding more events will be ignored.
|
| + */
|
| + bool get _isCancelled => (_state & _STATE_CANCELLED) != 0;
|
| +
|
| + /**
|
| * Whether the stream is closed for adding more events.
|
| *
|
| * If true, the "done" event might not have fired yet, but it has been
|
| * scheduled, and it is too late to add more events.
|
| */
|
| - bool get isClosed => stream._isClosed;
|
| + bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
|
|
| - /** Whether one or more active subscribers have requested a pause. */
|
| - bool get isPaused => stream._isInputPaused;
|
| + /** Whether the subscription is active and paused. */
|
| + bool get isPaused => _subscription != null && _subscription._isInputPaused;
|
|
|
| - /** Whether there are currently any subscribers on this [Stream]. */
|
| - bool get hasListener => stream._hasListener;
|
| + /** Whether there are currently a subscriber on the [Stream]. */
|
| + bool get hasListener => _subscription != null;
|
|
|
| /**
|
| * Send or queue a data event.
|
| */
|
| - void add(T value) => stream._add(value);
|
| + void add(T value) {
|
| + if (isClosed) throw new StateError("Adding event after close");
|
| + if (_subscription != null) {
|
| + _subscription._add(value);
|
| + } else if (!_isCancelled) {
|
| + _addPendingEvent(new _DelayedData<T>(value));
|
| + }
|
| + }
|
|
|
| /**
|
| * Send or enqueue an error event.
|
| - *
|
| - * If a subscription has requested to be unsubscribed on errors,
|
| - * it will be unsubscribed after receiving this event.
|
| */
|
| void addError(Object error, [Object stackTrace]) {
|
| + if (isClosed) throw new StateError("Adding event after close");
|
| if (stackTrace != null) {
|
| // Force stack trace overwrite. Even if the error already contained
|
| // a stack trace.
|
| _attachStackTrace(error, stackTrace);
|
| }
|
| - stream._addError(error);
|
| + if (_subscription != null) {
|
| + _subscription._addError(error);
|
| + } else if (!_isCancelled) {
|
| + _addPendingEvent(new _DelayedError(error));
|
| + }
|
| }
|
|
|
| /**
|
| - * Send or enqueue a "done" message.
|
| + * Closes this controller.
|
| + *
|
| + * After closing, no further events may be added using [add] or [addError].
|
| *
|
| - * The "done" message should be sent at most once by a stream, and it
|
| - * should be the last message sent.
|
| + * You are allowed to close the controller more than once, but only the first
|
| + * call has any effect.
|
| + *
|
| + * The first time a controller is closed, a "done" event is sent to its
|
| + * stream.
|
| */
|
| - void close() { stream._close(); }
|
| + void close() {
|
| + if (isClosed) return;
|
| + _state |= _STATE_CLOSED;
|
| + if (_subscription != null) {
|
| + _subscription._close();
|
| + } else if (!_isCancelled) {
|
| + _addPendingEvent(const _DelayedDone());
|
| + }
|
| + }
|
| +
|
| + void _addPendingEvent(_DelayedEvent event) {
|
| + if (_isCancelled) return;
|
| + _StreamImplEvents events = _pendingEvents;
|
| + if (events == null) {
|
| + events = _pendingEvents = new _StreamImplEvents();
|
| + }
|
| + events.add(event);
|
| + }
|
| +
|
| + void _recordListen(_BufferingStreamSubscription subscription) {
|
| + assert(_subscription == null);
|
| + _subscription = subscription;
|
| + subscription._setPendingEvents(_pendingEvents);
|
| + _pendingEvents = null;
|
| + subscription._guardCallback(() {
|
| + _runGuarded(_onListen);
|
| + });
|
| + }
|
| +
|
| + void _recordCancel() {
|
| + _subscription = null;
|
| + _state |= _STATE_CANCELLED;
|
| + _runGuarded(_onCancel);
|
| + }
|
| +
|
| + void _recordPause() {
|
| + _runGuarded(_onPause);
|
| + }
|
| +
|
| + void _recordResume() {
|
| + _runGuarded(_onResume);
|
| + }
|
| }
|
|
|
| typedef void _NotificationHandler();
|
|
|
| -class _SingleControllerStream<T> extends _SingleStreamImpl<T> {
|
| - _NotificationHandler _onListen;
|
| - _NotificationHandler _onPause;
|
| - _NotificationHandler _onResume;
|
| - _NotificationHandler _onCancel;
|
| -
|
| - // TODO(floitsch): share this code with _MultiControllerStream.
|
| - _runGuarded(_NotificationHandler notificationHandler) {
|
| - if (notificationHandler == null) return;
|
| - try {
|
| - notificationHandler();
|
| - } catch (e, s) {
|
| - _throwDelayed(e, s);
|
| +void _runGuarded(_NotificationHandler notificationHandler) {
|
| + if (notificationHandler == null) return;
|
| + try {
|
| + notificationHandler();
|
| + } catch (e, s) {
|
| + _throwDelayed(e, s);
|
| + }
|
| +}
|
| +
|
| +class _ControllerStream<T> extends _StreamImpl<T> {
|
| + StreamController _controller;
|
| + bool _hasListener = false;
|
| +
|
| + _ControllerStream(this._controller);
|
| +
|
| + StreamSubscription<T> _createSubscription(
|
| + void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + if (_hasListener) {
|
| + throw new StateError("The stream has already been listened to.");
|
| }
|
| + _hasListener = true;
|
| + return new _ControllerSubscription<T>(
|
| + _controller, onData, onError, onDone, cancelOnError);
|
| + }
|
| +
|
| + void _onListen(_BufferingStreamSubscription subscription) {
|
| + _controller._recordListen(subscription);
|
| }
|
| +}
|
| +
|
| +class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| + final StreamController _controller;
|
|
|
| - _SingleControllerStream(this._onListen,
|
| - this._onPause,
|
| - this._onResume,
|
| - this._onCancel);
|
| + _ControllerSubscription(StreamController controller,
|
| + void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError)
|
| + : _controller = controller,
|
| + super(onData, onError, onDone, cancelOnError);
|
| +
|
| + void _onCancel() {
|
| + _controller._recordCancel();
|
| + }
|
|
|
| - void _onSubscriptionStateChange() {
|
| - _runGuarded(_hasListener ? _onListen : _onCancel);
|
| + void _onPause() {
|
| + _controller._recordPause();
|
| }
|
|
|
| - void _onPauseStateChange() {
|
| - _runGuarded(_isPaused ? _onPause : _onResume);
|
| + void _onResume() {
|
| + _controller._recordResume();
|
| }
|
| }
|
|
|