Chromium Code Reviews| Index: sdk/lib/async/broadcast_stream_controller.dart |
| diff --git a/sdk/lib/async/broadcast_stream_controller.dart b/sdk/lib/async/broadcast_stream_controller.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..5433851d08e51d5e63452345cf1c7d7c853b9d12 |
| --- /dev/null |
| +++ b/sdk/lib/async/broadcast_stream_controller.dart |
| @@ -0,0 +1,489 @@ |
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +part of dart.async; |
| + |
| +class _BroadcastStream<T> extends _StreamImpl<T> { |
| + _BroadcastStreamController _controller; |
| + |
| + _BroadcastStream(this._controller); |
| + |
| + bool get isBroadcast => true; |
| + |
| + StreamSubscription<T> _createSubscription( |
| + void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError) => |
| + _controller._subscribe(onData, onError, onDone, cancelOnError); |
| +} |
| + |
| +abstract class _BroadcastSubscriptionLink { |
| + _BroadcastSubscriptionLink _next; |
| + _BroadcastSubscriptionLink _previous; |
| +} |
| + |
| +class _BroadcastSubscription<T> extends _ControllerSubscription<T> |
| + implements _BroadcastSubscriptionLink { |
| + static const int _STATE_EVENT_ID = 1; |
| + static const int _STATE_FIRING = 2; |
| + static const int _STATE_REMOVE_AFTER_FIRING = 4; |
| + // TODO(lrn): Use the _state field on _ControllerSubscription to |
| + // also store this state. Requires that the subscription implementation |
| + // does not assume that it's use of the state integer is the only use. |
| + int _eventState; |
| + |
| + _BroadcastSubscriptionLink _next; |
| + _BroadcastSubscriptionLink _previous; |
| + |
| + _BroadcastSubscription(_StreamControllerLifecycle controller, |
| + void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError) |
| + : super(controller, onData, onError, onDone, cancelOnError) { |
| + _next = _previous = this; |
| + } |
| + |
| + _BroadcastStreamController get _controller => super._controller; |
| + |
| + bool _expectsEvent(int eventId) { |
|
floitsch
2013/06/27 15:15:19
=> ?
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Will do.
Probably added the block to be able to d
|
| + return (_eventState & _STATE_EVENT_ID) == eventId; |
| + } |
| + |
| + void _toggleEventId() { |
|
floitsch
2013/06/27 15:15:19
=> ?
Lasse Reichstein Nielsen
2013/06/28 12:57:38
doesn't return a value.
|
| + _eventState ^= _STATE_EVENT_ID; |
| + } |
| + |
| + bool get _isFiring => (_eventState & _STATE_FIRING) != 0; |
| + |
| + bool _setRemoveAfterFiring() { |
| + assert(_isFiring); |
| + _eventState |= _STATE_REMOVE_AFTER_FIRING; |
| + } |
| + |
| + bool get _removeAfterFiring => |
|
floitsch
2013/06/27 15:15:19
_shouldRemoveAfterFiring
Lasse Reichstein Nielsen
2013/06/28 12:57:38
This is an imperative. It must be removed after fi
|
| + (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; |
| + |
| + void _onPause() { } |
| + |
| + void _onResume() { } |
| +} |
|
floitsch
2013/06/27 15:15:19
missing _onCancel() from the _ControllerSubscripti
Lasse Reichstein Nielsen
2013/06/28 12:57:38
It's inherited. We overwrite _onPause and _onResum
|
| + |
| + |
| +abstract class _BroadcastStreamController<T> |
| + implements StreamController<T>, |
| + _StreamControllerLifecycle<T>, |
| + _BroadcastSubscriptionLink, |
| + _EventSink<T>, |
| + _EventDispatch<T> { |
| + static const int _STATE_INITIAL = 0; |
| + static const int _STATE_EVENT_ID = 1; |
| + static const int _STATE_FIRING = 2; |
| + static const int _STATE_CLOSED = 4; |
| + static const int _STATE_ADDSTREAM = 8; |
| + |
| + final _NotificationHandler _onListen; |
| + final _NotificationHandler _onCancel; |
| + |
| + // State of the controller. |
| + int _state; |
| + |
| + // Double-linked list of active listeners. |
| + _BroadcastSubscriptionLink _next; |
| + _BroadcastSubscriptionLink _previous; |
| + |
| + // Extra state used during an [addStream] call. |
| + _AddStreamState<T> _addStreamState; |
| + |
| + /** |
| + * Future returned by [close] and [done]. |
| + * |
| + * The future is completed whenever the done event has been sent to all |
| + * relevant listeners. |
| + * This means when all listeners at the time when the done event was |
|
floitsch
2013/06/27 15:15:19
bad English sentence.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Reworded.
|
| + * scheduled have been canceled (sending the done event makes them cancel, |
| + * but they can also be canceled before sending the event). |
| + * |
| + * To make this easier to handle, all listeners added after calling "close" |
| + * will never receive any events, so we don't remember them. That means that |
| + * this future can be completed whenever the controller [isClosed] and |
| + * [hasListener] is false. This is checked in [close] and [_callOnCancel]. |
| + */ |
| + _FutureImpl _doneFuture; |
| + |
| + _BroadcastStreamController(this._onListen, this._onCancel) |
| + : _state = _STATE_INITIAL { |
| + _next = _previous = this; |
| + } |
| + |
| + // StreamController interface. |
| + |
| + Stream<T> get stream => new _BroadcastStream<T>(this); |
| + |
| + StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); |
| + |
| + bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| + |
| + /** |
| + * A broadcast controller is never paused. |
| + * |
| + * Each receiving stream may be paused individually, and they handle their |
| + * own buffering. |
| + */ |
| + bool get isPaused => false; |
| + |
| + /** Whether there are currently one or more subscribers. */ |
| + bool get hasListener => !_isEmpty; |
| + |
| + /** Whether an event is being fired (sent to some, but not all, listeners). */ |
| + bool get _isFiring => (_state & _STATE_FIRING) != 0; |
| + |
| + bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| + |
| + bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| + |
| + _FutureImpl _ensureDoneFuture() { |
| + if (_doneFuture != null) return _doneFuture; |
| + return _doneFuture = new _FutureImpl(); |
| + } |
| + |
| + // Linked list helpers |
| + |
| + bool get _isEmpty => identical(_next, this); |
| + |
| + /** Adds subscription to linked list of active listeners. */ |
| + void _addListener(_BroadcastSubscription<T> subscription) { |
| + _BroadcastSubscriptionLink previous = _previous; |
| + previous._next = subscription; |
|
floitsch
2013/06/27 15:15:19
needs comments.
Either explain that you want to ac
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + _previous = subscription._previous; |
| + subscription._previous._next = this; |
| + subscription._previous = previous; |
| + subscription._eventState = (_state & _STATE_EVENT_ID); |
| + } |
| + |
| + void _removeListener(_BroadcastSubscription<T> subscription) { |
| + assert(identical(subscription._controller, this)); |
| + assert(!identical(subscription._next, subscription)); |
| + subscription._previous._next = subscription._next; |
|
floitsch
2013/06/27 15:15:19
please make this nicer to read:
var prev = sub.pre
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + subscription._next._previous = subscription._previous; |
| + subscription._next = subscription._previous = subscription; |
| + } |
| + |
| + // _StreamControllerLifecycle interface. |
| + |
| + StreamSubscription<T> _subscribe(void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError) { |
| + if (isClosed) { |
| + // No events will ever reach the new subscription, so we don't attach |
| + // it to anything. |
| + return new _DoneSubscription<T>(); |
|
floitsch
2013/06/27 15:15:19
Let's throw instead.
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + } |
| + StreamSubscription subscription = new _BroadcastSubscription<T>( |
| + this, onData, onError, onDone, cancelOnError); |
| + _addListener(subscription); |
| + if (identical(_next, _previous)) { |
| + // Only one listener, so it must be the first listener. |
| + _runGuarded(_onListen); |
| + } |
| + return subscription; |
| + } |
| + |
| + void _recordCancel(_BroadcastSubscription<T> subscription) { |
| + // If already removed by the stream, don't remove it again. |
| + if (identical(subscription._next, subscription)) return; |
| + assert(!identical(subscription._next, subscription)); |
| + if (subscription._isFiring) { |
| + subscription._setRemoveAfterFiring(); |
| + } else { |
| + assert(!identical(subscription._next, subscription)); |
| + _removeListener(subscription); |
| + // If we are currently firing an event, the empty-check is performed at |
| + // the end of the listener loop instead of here. |
| + if ((_state & _STATE_FIRING) == 0 && _isEmpty) { |
|
floitsch
2013/06/27 15:15:19
if (!_isFiring && _isEmpty)
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + _callOnCancel(); |
| + } |
| + } |
| + } |
| + |
| + void _recordPause(StreamSubscription<T> subscription) {} |
| + void _recordResume(StreamSubscription<T> subscription) {} |
| + |
| + // EventSink interface. |
| + |
| + Error _addEventError() { |
| + if (isClosed) { |
| + return new StateError("Cannot add new events after calling close"); |
| + } |
| + assert(_isAddingStream); |
| + return new StateError("Cannot add new events while doing an addStream"); |
| + } |
| + |
| + void add(T data) { |
| + if (!_mayAddEvent) throw _addEventError(); |
| + _sendData(data); |
| + } |
| + |
| + void addError(Object error, [Object stackTrace]) { |
| + if (!_mayAddEvent) throw _addEventError(); |
| + if (stackTrace != null) _attachStackTrace(error, stackTrace); |
| + _sendError(error); |
| + } |
| + |
| + Future close() { |
| + if (isClosed) { |
| + assert(_doneFuture != null); |
| + return _doneFuture; |
| + } |
| + if (!_mayAddEvent) throw _addEventError(); |
| + _state |= _STATE_CLOSED; |
| + Future doneFuture = _ensureDoneFuture(); |
| + _sendDone(); |
| + return doneFuture; |
| + } |
| + |
| + Future get done => _ensureDoneFuture(); |
| + |
| + Future addStream(Stream<T> stream) { |
| + if (!_mayAddEvent) throw _addEventError(); |
| + _state |= _STATE_ADDSTREAM; |
| + _addStreamState = new _AddStreamState(this, stream); |
| + return _addStreamState.addStreamFuture; |
| + } |
| + |
| + // _EventSink interface, called from AddStramState. |
|
floitsch
2013/06/27 15:15:19
AddStreamState
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Done.
|
| + void _add(T data) { |
| + _sendData(data); |
| + } |
| + |
| + void _addError(Object error) { |
| + assert(_isAddingStream); |
|
floitsch
2013/06/27 15:15:19
Why is an error fatal? Isn't it just passed throug
Lasse Reichstein Nielsen
2013/06/28 12:57:38
Let's pass it through, the controller can handle i
|
| + _AddStreamState addState = _addStreamState; |
| + _addStreamState = null; |
| + _state &= ~_STATE_ADDSTREAM; |
| + addState.completeWithError(error); |
| + } |
| + |
| + void _close() { |
| + assert(_isAddingStream); |
| + _AddStreamState addState = _addStreamState; |
| + _addStreamState = null; |
| + _state &= ~_STATE_ADDSTREAM; |
| + addState.complete(); |
| + } |
| + |
| + // Event handling. |
| + void _forEachListener( |
| + void action(_BufferingStreamSubscription<T> subscription)) { |
| + if (_isFiring) { |
| + throw new StateError( |
| + "Cannot fire new event. Controller is already firing an event"); |
| + } |
| + if (_isEmpty) return; |
| + |
| + // Get event id of this event. |
| + int id = (_state & _STATE_EVENT_ID); |
| + // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel] |
| + // callbacks while firing, and we prevent reentrancy of this function. |
| + // |
| + // Set [_state]'s event id to the next event's id. |
| + // Any listeners added while firing this event will expect the next event, |
| + // not this one, and won't get notified. |
| + _state ^= _STATE_EVENT_ID | _STATE_FIRING; |
| + _BroadcastSubscriptionLink link = _next; |
| + while (!identical(link, this)) { |
| + _BroadcastSubscription<T> subscription = link; |
| + if (subscription._expectsEvent(id)) { |
| + subscription._eventState |= _BroadcastSubscription._STATE_FIRING; |
| + action(subscription); |
| + subscription._toggleEventId(); |
| + link = subscription._next; |
| + if (subscription._removeAfterFiring) { |
| + _removeListener(subscription); |
| + } |
| + subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING; |
| + } else { |
| + link = subscription._next; |
| + } |
| + } |
| + _state &= ~_STATE_FIRING; |
| + |
| + if (_isEmpty) { |
| + _callOnCancel(); |
| + } |
| + } |
| + |
| + void _callOnCancel() { |
| + assert(_isEmpty); |
| + if (isClosed && _doneFuture._mayComplete) { |
| + // When closed, _doneFuture is not null. |
| + _doneFuture._asyncSetValue(null); |
| + } |
| + _runGuarded(_onCancel); |
| + } |
| +} |
| + |
| +class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| + _SyncBroadcastStreamController(void onListen(), void onCancel()) |
| + : super(onListen, onCancel); |
| + |
| + // EventDispatch interface. |
| + |
| + void _sendData(T data) { |
| + if (_isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._add(data); |
| + }); |
| + } |
| + |
| + void _sendError(Object error) { |
| + if (_isEmpty) return; |
| + _forEachListener((_BufferingStreamSubscription<T> subscription) { |
| + subscription._addError(error); |
| + }); |
| + } |
| + |
| + void _sendDone() { |
| + if (!_isEmpty) { |
| + _forEachListener((_BroadcastSubscription<T> subscription) { |
| + subscription._close(); |
| + }); |
| + } else { |
| + assert(_doneFuture != null); |
| + assert(_doneFuture._mayComplete); |
| + _doneFuture._asyncSetValue(null); |
| + } |
| + } |
| +} |
| + |
| +class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> { |
| + _AsyncBroadcastStreamController(void onListen(), void onCancel()) |
| + : super(onListen, onCancel); |
| + |
| + // EventDispatch interface. |
| + |
| + void _sendData(T data) { |
| + for (_BroadcastSubscriptionLink link = _next; |
| + !identical(link, this); |
| + link = link._next) { |
| + _BroadcastSubscription<T> subscription = link; |
| + subscription._addPending(new _DelayedData(data)); |
| + } |
| + } |
| + |
| + void _sendError(Object error) { |
| + for (_BroadcastSubscriptionLink link = _next; |
| + !identical(link, this); |
| + link = link._next) { |
| + _BroadcastSubscription<T> subscription = link; |
| + subscription._addPending(new _DelayedError(error)); |
| + } |
| + } |
| + |
| + void _sendDone() { |
| + if (!_isEmpty) { |
| + for (_BroadcastSubscriptionLink link = _next; |
| + !identical(link, this); |
| + link = link._next) { |
| + _BroadcastSubscription<T> subscription = link; |
| + subscription._addPending(const _DelayedDone()); |
| + } |
| + } else { |
| + assert(_doneFuture != null); |
| + assert(_doneFuture._mayComplete); |
| + _doneFuture._asyncSetValue(null); |
| + } |
| + } |
| +} |
| + |
| +/** |
| + * Stream controller that is used by [Stream.asBroadcastStream]. |
| + * |
| + * This stream controller allows incoming events while it is firing |
| + * other events. This is handled by delaying the events until the |
| + * current event is done firing, and then fire the pending events. |
| + * |
| + * This class extends [_SyncBroadcastStreamController]. Events of |
| + * an "asBroadcastStream" stream are always initiated by events |
| + * on another stream, and it is fine to forward them synchronously. |
| + */ |
| +class _AsBroadcastStreamController<T> |
| + extends _SyncBroadcastStreamController<T> |
| + implements _EventDispatch<T> { |
| + _StreamImplEvents _pending; |
| + |
| + _AsBroadcastStreamController(void onListen(), void onCancel()) |
| + : super(onListen, onCancel); |
| + |
| + bool get _hasPending => _pending != null && ! _pending.isEmpty; |
| + |
| + void _addPendingEvent(_DelayedEvent event) { |
| + if (_pending == null) { |
| + _pending = new _StreamImplEvents(); |
| + } |
| + _pending.add(event); |
| + } |
| + |
| + void add(T data) { |
| + if (!isClosed && _isFiring) { |
| + _addPendingEvent(new _DelayedData<T>(data)); |
| + return; |
| + } |
| + super.add(data); |
| + while (_hasPending) { |
| + _pending.handleNext(this); |
| + } |
| + } |
| + |
| + void addError(Object error, [StackTrace stackTrace]) { |
| + if (!isClosed && _isFiring) { |
| + _addPendingEvent(new _DelayedError(error)); |
| + return; |
| + } |
| + super.addError(error, stackTrace); |
| + while (_hasPending) { |
| + _pending.handleNext(this); |
| + } |
| + } |
| + |
| + void close() { |
| + if (!isClosed && _isFiring) { |
| + _addPendingEvent(const _DelayedDone()); |
| + _state |= _STATE_CLOSED; |
| + return; |
| + } |
| + super.close(); |
| + assert(!_hasPending); |
| + } |
| + |
| + void _callOnCancel() { |
| + if (_hasPending) { |
| + _pending.clear(); |
| + _pending = null; |
| + } |
| + super._callOnCancel(); |
| + } |
| +} |
| + |
| +// A subscription that never receives any events. |
| +// It can simulate pauses, but otherwise does nothing. |
| +class _DoneSubscription<T> implements StreamSubscription<T> { |
| + int _pauseCount = 0; |
| + void onData(void handleData(T data)) {} |
| + void onError(void handleErrr(Object error)) {} |
| + void onDone(void handleDone()) {} |
| + void pause([Future resumeSignal]) { |
| + if (resumeSignal != null) resumeSignal.then(_resume); |
| + _pauseCount++; |
| + } |
| + void resume() { _resume(null); } |
| + void _resume(_) { |
| + if (_pauseCount > 0) _pauseCount--; |
| + } |
| + void cancel() {} |
| + bool get isPaused => _pauseCount > 0; |
| + Future asFuture(Object value) => new _FutureImpl(); |
| +} |