Chromium Code Reviews| Index: sdk/lib/async/stream_controller.dart |
| diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart |
| index b057565063d99e65b75af216e42711cc74670f6f..030c5906f8f0f43599f019348738fa5f1bc3d830 100644 |
| --- a/sdk/lib/async/stream_controller.dart |
| +++ b/sdk/lib/async/stream_controller.dart |
| @@ -69,8 +69,11 @@ abstract class StreamController<T> implements EventSink<T> { |
| factory StreamController({void onListen(), |
| void onPause(), |
| void onResume(), |
| - void onCancel()}) |
| - => new _StreamControllerImpl<T>(onListen, onPause, onResume, onCancel); |
| + void onCancel(), |
| + bool sync: false}) |
| + => sync |
| + ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| + : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| /** |
| * A controller where [stream] can be listened to more than once. |
| @@ -92,8 +95,12 @@ abstract class StreamController<T> implements EventSink<T> { |
| * If a listener is added again later, after the [onCancel] was called, |
| * the [onListen] will be called again. |
| */ |
| - factory StreamController.broadcast({void onListen(), void onCancel()}) { |
| - return new _MultiplexStreamController<T>(onListen, onCancel); |
| + factory StreamController.broadcast({void onListen(), |
| + void onCancel(), |
| + bool sync: false}) { |
| + return sync |
| + ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| + : new _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| } |
| /** |
| @@ -147,8 +154,9 @@ abstract class _StreamControllerLifecycle<T> { |
| * |
| * Controls a stream that only supports a single controller. |
| */ |
| -class _StreamControllerImpl<T> implements StreamController<T>, |
| - _StreamControllerLifecycle<T> { |
| +abstract class _StreamController<T> implements StreamController<T>, |
| + _StreamControllerLifecycle<T>, |
| + _EventDispatch<T> { |
| static const int _STATE_OPEN = 0; |
| static const int _STATE_CANCELLED = 1; |
| static const int _STATE_CLOSED = 2; |
| @@ -168,10 +176,10 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
| // Events added to the stream before it has an active subscription. |
| _PendingEvents _pendingEvents = null; |
| - _StreamControllerImpl(this._onListen, |
| - this._onPause, |
| - this._onResume, |
| - this._onCancel) { |
| + _StreamController(this._onListen, |
| + this._onPause, |
| + this._onResume, |
| + this._onCancel) { |
| _stream = new _ControllerStream<T>(this); |
| } |
| @@ -202,7 +210,7 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
| void add(T value) { |
| if (isClosed) throw new StateError("Adding event after close"); |
| if (_subscription != null) { |
| - _subscription._add(value); |
| + _sendData(value); |
| } else if (!_isCancelled) { |
| _addPendingEvent(new _DelayedData<T>(value)); |
| } |
| @@ -219,7 +227,7 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
| _attachStackTrace(error, stackTrace); |
| } |
| if (_subscription != null) { |
| - _subscription._addError(error); |
| + _sendError(error); |
| } else if (!_isCancelled) { |
| _addPendingEvent(new _DelayedError(error)); |
| } |
| @@ -240,12 +248,14 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
| if (isClosed) return; |
| _state |= _STATE_CLOSED; |
| if (_subscription != null) { |
| - _subscription._close(); |
| + _sendDone(); |
| } else if (!_isCancelled) { |
| _addPendingEvent(const _DelayedDone()); |
| } |
| } |
| + // EventDispatch interface |
| + |
| void _addPendingEvent(_DelayedEvent event) { |
| if (_isCancelled) return; |
| _StreamImplEvents events = _pendingEvents; |
| @@ -281,6 +291,52 @@ class _StreamControllerImpl<T> implements StreamController<T>, |
| } |
| } |
| +class _SyncStreamController<T> extends _StreamController<T> { |
| + _SyncStreamController(void onListen(), |
| + void onPause(), |
| + void onResume(), |
| + void onCancel()) |
| + : super(onListen, onPause, onResume, onCancel); |
| + |
| + void _sendData(T data) { |
| + _subscription._add(data); |
| + } |
| + |
| + void _sendError(Object error) { |
| + _subscription._addError(error); |
| + } |
| + |
| + void _sendDone() { |
| + _subscription._close(); |
| + } |
| +} |
| + |
| +class _AsyncStreamController<T> extends _StreamController<T> { |
| + _AsyncStreamController(void onListen(), |
| + void onPause(), |
| + void onResume(), |
| + void onCancel()) |
| + : super(onListen, onPause, onResume, onCancel); |
| + |
| + void _sendData(T data) { |
| + runAsync(() { |
|
floitsch
2013/05/30 12:13:48
Try to use addPending instead.
Lasse Reichstein Nielsen
2013/05/31 05:51:59
Done.
|
| + if (_subscription == null) return; |
| + _subscription._add(data); |
| + }); |
| + } |
| + |
| + void _sendError(Object error) { |
| + runAsync(() { |
| + if (_subscription == null) return; |
| + _subscription._addError(error); |
| + }); |
| + } |
| + |
| + void _sendDone() { |
| + runAsync(_subscription._close); |
| + } |
| +} |
| + |
| typedef void _NotificationHandler(); |
| void _runGuarded(_NotificationHandler notificationHandler) { |
| @@ -339,10 +395,10 @@ class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| } |
| } |
| -class _MultiplexStream<T> extends _StreamImpl<T> { |
| - _MultiplexStreamController _controller; |
| +class _BroadcastStream<T> extends _StreamImpl<T> { |
| + _BroadcastStreamController _controller; |
| - _MultiplexStream(this._controller); |
| + _BroadcastStream(this._controller); |
| bool get isBroadcast => true; |
| @@ -351,7 +407,7 @@ class _MultiplexStream<T> extends _StreamImpl<T> { |
| void onError(Object error), |
| void onDone(), |
| bool cancelOnError) { |
| - return new _MultiplexSubscription<T>( |
| + return new _BroadcastSubscription<T>( |
| _controller, onData, onError, onDone, cancelOnError); |
| } |
| @@ -360,22 +416,22 @@ class _MultiplexStream<T> extends _StreamImpl<T> { |
| } |
| } |
| -abstract class _MultiplexSubscriptionLink { |
| - _MultiplexSubscriptionLink _next; |
| - _MultiplexSubscriptionLink _previous; |
| +abstract class _BroadcastSubscriptionLink { |
| + _BroadcastSubscriptionLink _next; |
| + _BroadcastSubscriptionLink _previous; |
| } |
| -class _MultiplexSubscription<T> extends _ControllerSubscription<T> |
| - implements _MultiplexSubscriptionLink { |
| +class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
| + implements _BroadcastSubscriptionLink { |
| static const int _STATE_EVENT_ID = 1; |
| static const int _STATE_FIRING = 2; |
| static const int _STATE_REMOVE_AFTER_FIRING = 4; |
| int _eventState; |
| - _MultiplexSubscriptionLink _next; |
| - _MultiplexSubscriptionLink _previous; |
| + _BroadcastSubscriptionLink _next; |
| + _BroadcastSubscriptionLink _previous; |
| - _MultiplexSubscription(_StreamControllerLifecycle controller, |
| + _BroadcastSubscription(_StreamControllerLifecycle controller, |
| void onData(T data), |
| void onError(Object error), |
| void onDone(), |
| @@ -384,7 +440,7 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T> |
| _next = _previous = this; |
| } |
| - _MultiplexStreamController get _controller => super._controller; |
| + _BroadcastStreamController get _controller => super._controller; |
| bool _expectsEvent(int eventId) { |
| return (_eventState & _STATE_EVENT_ID) == eventId; |
| @@ -406,9 +462,11 @@ class _MultiplexSubscription<T> extends _ControllerSubscription<T> |
| } |
| -class _MultiplexStreamController<T> implements StreamController<T>, |
| - _StreamControllerLifecycle<T>, |
| - _MultiplexSubscriptionLink { |
| +abstract class _BroadcastStreamController<T> |
| + implements StreamController<T>, |
| + _StreamControllerLifecycle<T>, |
| + _BroadcastSubscriptionLink, |
| + _EventDispatch<T> { |
| static const int _STATE_INITIAL = 0; |
| static const int _STATE_EVENT_ID = 1; |
| static const int _STATE_FIRING = 2; |
| @@ -421,24 +479,24 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| int _state; |
| // Double-linked list of active listeners. |
| - _MultiplexSubscriptionLink _next; |
| - _MultiplexSubscriptionLink _previous; |
| + _BroadcastSubscriptionLink _next; |
| + _BroadcastSubscriptionLink _previous; |
| - _MultiplexStreamController(this._onListen, this._onCancel) |
| + _BroadcastStreamController(this._onListen, this._onCancel) |
| : _state = _STATE_INITIAL { |
| _next = _previous = this; |
| } |
| // StreamController interface. |
| - Stream<T> get stream => new _MultiplexStream<T>(this); |
| + Stream<T> get stream => new _BroadcastStream<T>(this); |
| EventSink<T> get sink => new _EventSinkView<T>(this); |
| bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| /** |
| - * A multiplex controller is never paused. |
| + * A broadcast controller is never paused. |
| * |
| * Each receiving stream may be paused individually, and they handle their |
| * own buffering. |
| @@ -456,8 +514,8 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| bool get _isEmpty => identical(_next, this); |
| /** Adds subscription to linked list of active listeners. */ |
| - void _addListener(_MultiplexSubscription<T> subscription) { |
| - _MultiplexSubscriptionLink previous = _previous; |
| + void _addListener(_BroadcastSubscription<T> subscription) { |
| + _BroadcastSubscriptionLink previous = _previous; |
| previous._next = subscription; |
| _previous = subscription._previous; |
| subscription._previous._next = this; |
| @@ -465,7 +523,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| subscription._eventState = (_state & _STATE_EVENT_ID); |
| } |
| - void _removeListener(_MultiplexSubscription<T> subscription) { |
| + void _removeListener(_BroadcastSubscription<T> subscription) { |
| assert(identical(subscription._controller, this)); |
| assert(!identical(subscription._next, subscription)); |
| subscription._previous._next = subscription._next; |
| @@ -475,7 +533,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| // _StreamControllerLifecycle interface. |
| - void _recordListen(_MultiplexSubscription<T> subscription) { |
| + void _recordListen(_BroadcastSubscription<T> subscription) { |
| _addListener(subscription); |
| if (identical(_next, _previous)) { |
| // Only one listener, so it must be the first listener. |
| @@ -483,7 +541,7 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| } |
| } |
| - void _recordCancel(_MultiplexSubscription<T> subscription) { |
| + void _recordCancel(_BroadcastSubscription<T> subscription) { |
| if (subscription._isFiring) { |
| subscription._setRemoveAfterFiring(); |
| } else { |
| @@ -524,31 +582,6 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| _sendDone(); |
| } |
| - // EventDispatch interface. |
| - |
| - void _sendData(T data) { |
| - if (_isEmpty) return; |
| - _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| - subscription._add(data); |
| - }); |
| - } |
| - |
| - void _sendError(Object error) { |
| - if (_isEmpty) return; |
| - _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| - subscription._addError(error); |
| - }); |
| - } |
| - |
| - void _sendDone() { |
| - if (_isEmpty) return; |
| - _forEachListener((_MultiplexSubscription<T> subscription) { |
| - subscription._close(); |
| - subscription._eventState |= |
| - _MultiplexSubscription._STATE_REMOVE_AFTER_FIRING; |
| - }); |
| - } |
| - |
| void _forEachListener( |
| void action(_BufferingStreamSubscription<T> subscription)) { |
| if (_isFiring) { |
| @@ -566,18 +599,18 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| // Any listeners added while firing this event will expect the next event, |
| // not this one, and won't get notified. |
| _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
| - _MultiplexSubscriptionLink link = _next; |
| + _BroadcastSubscriptionLink link = _next; |
| while (!identical(link, this)) { |
| - _MultiplexSubscription<T> subscription = link; |
| + _BroadcastSubscription<T> subscription = link; |
| if (subscription._expectsEvent(id)) { |
| - subscription._eventState |= _MultiplexSubscription._STATE_FIRING; |
| + subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
| action(subscription); |
| subscription._toggleEventId(); |
| link = subscription._next; |
| if (subscription._removeAfterFiring) { |
| _removeListener(subscription); |
| } |
| - subscription._eventState &= ~_MultiplexSubscription._STATE_FIRING; |
| + subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
| } else { |
| link = subscription._next; |
| } |
| @@ -594,12 +627,89 @@ class _MultiplexStreamController<T> implements StreamController<T>, |
| } |
| } |
| -class _BufferingMultiplexStreamController<T> |
| - extends _MultiplexStreamController<T> |
| +class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| + _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| + : super(onListen, onCancel); |
| + |
| + // EventDispatch interface. |
| + |
| + void _sendData(T data) { |
| + if (_isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._add(data); |
| + }); |
| + } |
| + |
| + void _sendError(Object error) { |
| + if (_isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._addError(error); |
| + }); |
| + } |
| + |
| + void _sendDone() { |
| + if (_isEmpty) return; |
| + _forEachListener((_BroadcastSubscription<T> subscription) { |
| + subscription._close(); |
| + subscription._eventState |= |
| + _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
| + }); |
| + } |
| +} |
| + |
| +class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| + _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
| + : super(onListen, onCancel); |
| + |
| + // EventDispatch interface. |
| + |
| + void _sendData(T data) { |
| + runAsync(() { |
| + if (_isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._add(data); |
| + }); |
| + }); |
| + } |
| + |
| + void _sendError(Object error) { |
| + runAsync(() { |
| + if (_isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._addError(error); |
| + }); |
| + }); |
| + } |
| + |
| + void _sendDone() { |
| + runAsync(() { |
| + if (_isEmpty) return; |
| + _forEachListener((_BroadcastSubscription<T> subscription) { |
| + subscription._close(); |
| + subscription._eventState |= |
| + _BroadcastSubscription._STATE_REMOVE_AFTER_FIRING; |
| + }); |
| + }); |
| + } |
| +} |
| + |
| +/** |
| + * Stream controller that is used by [Stream.asBroadcastStream]. |
| + * |
| + * This stream controller allows incoming events while it is firing |
| + * other events. This is handled by delaying the events until the |
| + * current event is done firing, and then fire the pending events. |
| + * |
| + * This class extends [_SyncBroadcastStreamController]. Events of |
| + * an "asBroadcastStream" stream are always initiated by events |
| + * on another stream, and it is fine to forward them synchronously. |
| + */ |
| +class _AsBroadcastStreamController<T> |
| + extends _SyncBroadcastStreamController<T> |
| implements _EventDispatch<T> { |
| _StreamImplEvents _pending; |
| - _BufferingMultiplexStreamController(void onListen(), void onCancel()) |
| + _AsBroadcastStreamController(void onListen(), void onCancel()) |
| : super(onListen, onCancel); |
| bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| @@ -649,6 +759,5 @@ class _BufferingMultiplexStreamController<T> |
| _pending = null; |
| } |
| super._callOnCancel(); |
| - |
| } |
| } |