| Index: sdk/lib/async/stream_controller.dart
|
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart
|
| index b057565063d99e65b75af216e42711cc74670f6f..abfd6485859eb1872c5a52f4413f3df712d2d73e 100644
|
| --- a/sdk/lib/async/stream_controller.dart
|
| +++ b/sdk/lib/async/stream_controller.dart
|
| @@ -53,6 +53,11 @@ abstract class StreamController<T> implements EventSink<T> {
|
| /**
|
| * A controller with a [stream] that supports only one single subscriber.
|
| *
|
| + * If [sync] is true, events may be passed directly to the stream's listener
|
| + * during an [add], [addError] or [close] call. If [sync] is false, the event
|
| + * will be passed to the listener at a later time, after the code creating
|
| + * the event has returned.
|
| + *
|
| * The controller will buffer all incoming events until the subscriber is
|
| * registered.
|
| *
|
| @@ -69,8 +74,11 @@ abstract class StreamController<T> implements EventSink<T> {
|
| factory StreamController({void onListen(),
|
| void onPause(),
|
| void onResume(),
|
| - void onCancel()})
|
| - => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel);
|
| + void onCancel(),
|
| + bool sync: false})
|
| + => sync
|
| + ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
|
| + : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
|
|
|
| /**
|
| * A controller where [stream] can be listened to more than once.
|
| @@ -83,17 +91,35 @@ abstract class StreamController<T> implements EventSink<T> {
|
| * It is not allowed to call [add], [addError], or [close] before a previous
|
| * call has returned.
|
| *
|
| + * If [sync] is true, events may be passed directly to the stream's listener
|
| + * during an [add], [addError] or [close] call. If [sync] is false, the event
|
| + * will be passed to the listener at a later time, after the code creating
|
| + * the event has returned.
|
| + *
|
| * Each listener is handled independently, and if they pause, only the pausing
|
| * listener is affected. A paused listener will buffer events internally until
|
| * unpaused or canceled.
|
| *
|
| + * If [sync] is false, no guarantees are given with regard to when
|
| + * multiple listeners get the events, except that each listener will get
|
| + * all events in the correct order. If two events are sent on an async
|
| + * controller with two listeners, one of the listeners may get both events
|
| + * before the other listener gets any.
|
| + * A listener must be subscribed both when the event is initiated (that is,
|
| + * when [add] is called) and when the event is later delivered, in order to
|
| + * get the event.
|
| + *
|
| * The [onListen] callback is called when the first listener is subscribed,
|
| * and the [onCancel] is called when there are no longer any active listeners.
|
| * If a listener is added again later, after the [onCancel] was called,
|
| * the [onListen] will be called again.
|
| */
|
| - factory StreamController.broadcast({void onListen(), void onCancel()}) {
|
| - return new _MultiplexStreamController<T>(onListen, onCancel);
|
| + factory StreamController.broadcast({void onListen(),
|
| + void onCancel(),
|
| + bool sync: false}) {
|
| + return sync
|
| + ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
|
| + : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
|
| }
|
|
|
| /**
|
| @@ -147,8 +173,9 @@ abstract class _StreamControllerLifecycle<T> {
|
| *
|
| * Controls a stream that only supports a single controller.
|
| */
|
| -class _StreamControllerImpl<T> implements StreamController<T>,
|
| - _StreamControllerLifecycle<T> {
|
| +abstract class _StreamController<T> implements StreamController<T>,
|
| + _StreamControllerLifecycle<T>,
|
| + _EventDispatch<T> {
|
| static const int _STATE_OPEN = 0;
|
| static const int _STATE_CANCELLED = 1;
|
| static const int _STATE_CLOSED = 2;
|
| @@ -168,10 +195,10 @@ class _StreamControllerImpl<T> implements StreamController<T>,
|
| // Events added to the stream before it has an active subscription.
|
| _PendingEvents _pendingEvents = null;
|
|
|
| - _StreamControllerImpl(this._onListen,
|
| - this._onPause,
|
| - this._onResume,
|
| - this._onCancel) {
|
| + _StreamController(this._onListen,
|
| + this._onPause,
|
| + this._onResume,
|
| + this._onCancel) {
|
| _stream = new _ControllerStream<T>(this);
|
| }
|
|
|
| @@ -202,7 +229,7 @@ class _StreamControllerImpl<T> implements StreamController<T>,
|
| void add(T value) {
|
| if (isClosed) throw new StateError("Adding event after close");
|
| if (_subscription != null) {
|
| - _subscription._add(value);
|
| + _sendData(value);
|
| } else if (!_isCancelled) {
|
| _addPendingEvent(new _DelayedData<T>(value));
|
| }
|
| @@ -219,7 +246,7 @@ class _StreamControllerImpl<T> implements StreamController<T>,
|
| _attachStackTrace(error, stackTrace);
|
| }
|
| if (_subscription != null) {
|
| - _subscription._addError(error);
|
| + _sendError(error);
|
| } else if (!_isCancelled) {
|
| _addPendingEvent(new _DelayedError(error));
|
| }
|
| @@ -240,12 +267,14 @@ class _StreamControllerImpl<T> implements StreamController<T>,
|
| if (isClosed) return;
|
| _state |= _STATE_CLOSED;
|
| if (_subscription != null) {
|
| - _subscription._close();
|
| + _sendDone();
|
| } else if (!_isCancelled) {
|
| _addPendingEvent(const _DelayedDone());
|
| }
|
| }
|
|
|
| + // EventDispatch interface
|
| +
|
| void _addPendingEvent(_DelayedEvent event) {
|
| if (_isCancelled) return;
|
| _StreamImplEvents events = _pendingEvents;
|
| @@ -281,6 +310,46 @@ class _StreamControllerImpl<T> implements StreamController<T>,
|
| }
|
| }
|
|
|
| +class _SyncStreamController<T> extends _StreamController<T> {
|
| + _SyncStreamController(void onListen(),
|
| + void onPause(),
|
| + void onResume(),
|
| + void onCancel())
|
| + : super(onListen, onPause, onResume, onCancel);
|
| +
|
| + void _sendData(T data) {
|
| + _subscription._add(data);
|
| + }
|
| +
|
| + void _sendError(Object error) {
|
| + _subscription._addError(error);
|
| + }
|
| +
|
| + void _sendDone() {
|
| + _subscription._close();
|
| + }
|
| +}
|
| +
|
| +class _AsyncStreamController<T> extends _StreamController<T> {
|
| + _AsyncStreamController(void onListen(),
|
| + void onPause(),
|
| + void onResume(),
|
| + void onCancel())
|
| + : super(onListen, onPause, onResume, onCancel);
|
| +
|
| + void _sendData(T data) {
|
| + _subscription._addPending(new _DelayedData(data));
|
| + }
|
| +
|
| + void _sendError(Object error) {
|
| + _subscription._addPending(new _DelayedError(error));
|
| + }
|
| +
|
| + void _sendDone() {
|
| + _subscription._addPending(const _DelayedDone());
|
| + }
|
| +}
|
| +
|
| typedef void _NotificationHandler();
|
|
|
| void _runGuarded(_NotificationHandler notificationHandler) {
|
| @@ -339,10 +408,10 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
|
| }
|
| }
|
|
|
| -class _MultiplexStream<T> extends _StreamImpl<T> {
|
| - _MultiplexStreamController _controller;
|
| +class _BroadcastStream<T> extends _StreamImpl<T> {
|
| + _BroadcastStreamController _controller;
|
|
|
| - _MultiplexStream(this._controller);
|
| + _BroadcastStream(this._controller);
|
|
|
| bool get isBroadcast => true;
|
|
|
| @@ -351,7 +420,7 @@ class _MultiplexStream<T> extends _StreamImpl<T> {
|
| void onError(Object error),
|
| void onDone(),
|
| bool cancelOnError) {
|
| - return new _MultiplexSubscription<T>(
|
| + return new _BroadcastSubscription<T>(
|
| _controller, onData, onError, onDone, cancelOnError);
|
| }
|
|
|
| @@ -360,22 +429,22 @@ class _MultiplexStream<T> extends _StreamImpl<T> {
|
| }
|
| }
|
|
|
| -abstract class _MultiplexSubscriptionLink {
|
| - _MultiplexSubscriptionLink _next;
|
| - _MultiplexSubscriptionLink _previous;
|
| +abstract class _BroadcastSubscriptionLink {
|
| + _BroadcastSubscriptionLink _next;
|
| + _BroadcastSubscriptionLink _previous;
|
| }
|
|
|
| -class _MultiplexSubscription<T> extends _ControllerSubscription<T>
|
| - implements _MultiplexSubscriptionLink {
|
| +class _BroadcastSubscription<T> extends _ControllerSubscription<T>
|
| + implements _BroadcastSubscriptionLink {
|
| static const int _STATE_EVENT_ID = 1;
|
| static const int _STATE_FIRING = 2;
|
| static const int _STATE_REMOVE_AFTER_FIRING = 4;
|
| int _eventState;
|
|
|
| - _MultiplexSubscriptionLink _next;
|
| - _MultiplexSubscriptionLink _previous;
|
| + _BroadcastSubscriptionLink _next;
|
| + _BroadcastSubscriptionLink _previous;
|
|
|
| - _MultiplexSubscription(_StreamControllerLifecycle controller,
|
| + _BroadcastSubscription(_StreamControllerLifecycle controller,
|
| void onData(T data),
|
| void onError(Object error),
|
| void onDone(),
|
| @@ -384,7 +453,7 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T>
|
| _next = _previous = this;
|
| }
|
|
|
| - _MultiplexStreamController get _controller => super._controller;
|
| + _BroadcastStreamController get _controller => super._controller;
|
|
|
| bool _expectsEvent(int eventId) {
|
| return (_eventState & _STATE_EVENT_ID) == eventId;
|
| @@ -406,9 +475,11 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T>
|
| }
|
|
|
|
|
| -class _MultiplexStreamController<T> implements StreamController<T>,
|
| - _StreamControllerLifecycle<T>,
|
| - _MultiplexSubscriptionLink {
|
| +abstract class _BroadcastStreamController<T>
|
| + implements StreamController<T>,
|
| + _StreamControllerLifecycle<T>,
|
| + _BroadcastSubscriptionLink,
|
| + _EventDispatch<T> {
|
| static const int _STATE_INITIAL = 0;
|
| static const int _STATE_EVENT_ID = 1;
|
| static const int _STATE_FIRING = 2;
|
| @@ -421,24 +492,24 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| int _state;
|
|
|
| // Double-linked list of active listeners.
|
| - _MultiplexSubscriptionLink _next;
|
| - _MultiplexSubscriptionLink _previous;
|
| + _BroadcastSubscriptionLink _next;
|
| + _BroadcastSubscriptionLink _previous;
|
|
|
| - _MultiplexStreamController(this._onListen, this._onCancel)
|
| + _BroadcastStreamController(this._onListen, this._onCancel)
|
| : _state = _STATE_INITIAL {
|
| _next = _previous = this;
|
| }
|
|
|
| // StreamController interface.
|
|
|
| - Stream<T> get stream => new _MultiplexStream<T>(this);
|
| + Stream<T> get stream => new _BroadcastStream<T>(this);
|
|
|
| EventSink<T> get sink => new _EventSinkView<T>(this);
|
|
|
| bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
|
|
| /**
|
| - * A multiplex controller is never paused.
|
| + * A broadcast controller is never paused.
|
| *
|
| * Each receiving stream may be paused individually, and they handle their
|
| * own buffering.
|
| @@ -456,8 +527,8 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| bool get _isEmpty => identical(_next, this);
|
|
|
| /** Adds subscription to linked list of active listeners. */
|
| - void _addListener(_MultiplexSubscription<T> subscription) {
|
| - _MultiplexSubscriptionLink previous = _previous;
|
| + void _addListener(_BroadcastSubscription<T> subscription) {
|
| + _BroadcastSubscriptionLink previous = _previous;
|
| previous._next = subscription;
|
| _previous = subscription._previous;
|
| subscription._previous._next = this;
|
| @@ -465,7 +536,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| subscription._eventState = (_state & _STATE_EVENT_ID);
|
| }
|
|
|
| - void _removeListener(_MultiplexSubscription<T> subscription) {
|
| + void _removeListener(_BroadcastSubscription<T> subscription) {
|
| assert(identical(subscription._controller, this));
|
| assert(!identical(subscription._next, subscription));
|
| subscription._previous._next = subscription._next;
|
| @@ -475,7 +546,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
|
|
| // _StreamControllerLifecycle interface.
|
|
|
| - void _recordListen(_MultiplexSubscription<T> subscription) {
|
| + void _recordListen(_BroadcastSubscription<T> subscription) {
|
| _addListener(subscription);
|
| if (identical(_next, _previous)) {
|
| // Only one listener, so it must be the first listener.
|
| @@ -483,7 +554,7 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| }
|
| }
|
|
|
| - void _recordCancel(_MultiplexSubscription<T> subscription) {
|
| + void _recordCancel(_BroadcastSubscription<T> subscription) {
|
| if (subscription._isFiring) {
|
| subscription._setRemoveAfterFiring();
|
| } else {
|
| @@ -524,31 +595,6 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| _sendDone();
|
| }
|
|
|
| - // EventDispatch interface.
|
| -
|
| - void _sendData(T data) {
|
| - if (_isEmpty) return;
|
| - _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| - subscription._add(data);
|
| - });
|
| - }
|
| -
|
| - void _sendError(Object error) {
|
| - if (_isEmpty) return;
|
| - _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| - subscription._addError(error);
|
| - });
|
| - }
|
| -
|
| - void _sendDone() {
|
| - if (_isEmpty) return;
|
| - _forEachListener((_MultiplexSubscription<T> subscription) {
|
| - subscription._close();
|
| - subscription._eventState |=
|
| - _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING;
|
| - });
|
| - }
|
| -
|
| void _forEachListener(
|
| void action(_BufferingStreamSubscription<T> subscription)) {
|
| if (_isFiring) {
|
| @@ -566,18 +612,18 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| // Any listeners added while firing this event will expect the next event,
|
| // not this one, and won't get notified.
|
| _state ^= _STATE_EVENT_ID | _STATE_FIRING;
|
| - _MultiplexSubscriptionLink link = _next;
|
| + _BroadcastSubscriptionLink link = _next;
|
| while (!identical(link, this)) {
|
| - _MultiplexSubscription<T> subscription = link;
|
| + _BroadcastSubscription<T> subscription = link;
|
| if (subscription._expectsEvent(id)) {
|
| - subscription._eventState |= _MultiplexSubscription._STATE_FIRING;
|
| + subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
|
| action(subscription);
|
| subscription._toggleEventId();
|
| link = subscription._next;
|
| if (subscription._removeAfterFiring) {
|
| _removeListener(subscription);
|
| }
|
| - subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING;
|
| + subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
|
| } else {
|
| link = subscription._next;
|
| }
|
| @@ -594,12 +640,87 @@ class _MultiplexStreamController<T> implements StreamController<T>,
|
| }
|
| }
|
|
|
| -class _BufferingMultiplexStreamController<T>
|
| - extends _MultiplexStreamController<T>
|
| +class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| + _SyncBroadcastStreamController(void onListen(), void onCancel())
|
| + : super(onListen, onCancel);
|
| +
|
| + // EventDispatch interface.
|
| +
|
| + void _sendData(T data) {
|
| + if (_isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + subscription._add(data);
|
| + });
|
| + }
|
| +
|
| + void _sendError(Object error) {
|
| + if (_isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + subscription._addError(error);
|
| + });
|
| + }
|
| +
|
| + void _sendDone() {
|
| + if (_isEmpty) return;
|
| + _forEachListener((_BroadcastSubscription<T> subscription) {
|
| + subscription._close();
|
| + subscription._eventState |=
|
| + _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING;
|
| + });
|
| + }
|
| +}
|
| +
|
| +class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| + _AsyncBroadcastStreamController(void onListen(), void onCancel())
|
| + : super(onListen, onCancel);
|
| +
|
| + // EventDispatch interface.
|
| +
|
| + void _sendData(T data) {
|
| + for (_BroadcastSubscriptionLink link = _next;
|
| + !identical(link, this);
|
| + link = link._next) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + subscription._addPending(new _DelayedData(data));
|
| + }
|
| + }
|
| +
|
| + void _sendError(Object error) {
|
| + for (_BroadcastSubscriptionLink link = _next;
|
| + !identical(link, this);
|
| + link = link._next) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + subscription._addPending(new _DelayedError(error));
|
| + }
|
| + }
|
| +
|
| + void _sendDone() {
|
| + for (_BroadcastSubscriptionLink link = _next;
|
| + !identical(link, this);
|
| + link = link._next) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + subscription._addPending(const _DelayedDone());
|
| + }
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Stream controller that is used by [Stream.asBroadcastStream].
|
| + *
|
| + * This stream controller allows incoming events while it is firing
|
| + * other events. This is handled by delaying the events until the
|
| + * current event is done firing, and then fire the pending events.
|
| + *
|
| + * This class extends [_SyncBroadcastStreamController]. Events of
|
| + * an "asBroadcastStream" stream are always initiated by events
|
| + * on another stream, and it is fine to forward them synchronously.
|
| + */
|
| +class _AsBroadcastStreamController<T>
|
| + extends _SyncBroadcastStreamController<T>
|
| implements _EventDispatch<T> {
|
| _StreamImplEvents _pending;
|
|
|
| - _BufferingMultiplexStreamController(void onListen(), void onCancel())
|
| + _AsBroadcastStreamController(void onListen(), void onCancel())
|
| : super(onListen, onCancel);
|
|
|
| bool get _hasPending => _pending != null && ! _pending.isEmpty;
|
| @@ -649,6 +770,5 @@ class _BufferingMultiplexStreamController<T>
|
| _pending = null;
|
| }
|
| super._callOnCancel();
|
| -
|
| }
|
| }
|
|
|