| Index: sdk/lib/async/broadcast_stream_controller.dart
|
| diff --git a/sdk/lib/async/broadcast_stream_controller.dart b/sdk/lib/async/broadcast_stream_controller.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..8f4b019b315fbd8a3c91cb3fb1b52c4850133cb1
|
| --- /dev/null
|
| +++ b/sdk/lib/async/broadcast_stream_controller.dart
|
| @@ -0,0 +1,492 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +part of dart.async;
|
| +
|
| +class _BroadcastStream<T> extends _StreamImpl<T> {
|
| + _BroadcastStreamController _controller;
|
| +
|
| + _BroadcastStream(this._controller);
|
| +
|
| + bool get isBroadcast => true;
|
| +
|
| + StreamSubscription<T> _createSubscription(
|
| + void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError) =>
|
| + _controller._subscribe(onData, onError, onDone, cancelOnError);
|
| +}
|
| +
|
| +abstract class _BroadcastSubscriptionLink {
|
| + _BroadcastSubscriptionLink _next;
|
| + _BroadcastSubscriptionLink _previous;
|
| +}
|
| +
|
| +class _BroadcastSubscription<T> extends _ControllerSubscription<T>
|
| + implements _BroadcastSubscriptionLink {
|
| + static const int _STATE_EVENT_ID = 1;
|
| + static const int _STATE_FIRING = 2;
|
| + static const int _STATE_REMOVE_AFTER_FIRING = 4;
|
| + // TODO(lrn): Use the _state field on _ControllerSubscription to
|
| + // also store this state. Requires that the subscription implementation
|
| + // does not assume that it's use of the state integer is the only use.
|
| + int _eventState;
|
| +
|
| + _BroadcastSubscriptionLink _next;
|
| + _BroadcastSubscriptionLink _previous;
|
| +
|
| + _BroadcastSubscription(_StreamControllerLifecycle controller,
|
| + void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError)
|
| + : super(controller, onData, onError, onDone, cancelOnError) {
|
| + _next = _previous = this;
|
| + }
|
| +
|
| + _BroadcastStreamController get _controller => super._controller;
|
| +
|
| + bool _expectsEvent(int eventId) =>
|
| + (_eventState & _STATE_EVENT_ID) == eventId;
|
| +
|
| +
|
| + void _toggleEventId() {
|
| + _eventState ^= _STATE_EVENT_ID;
|
| + }
|
| +
|
| + bool get _isFiring => (_eventState & _STATE_FIRING) != 0;
|
| +
|
| + bool _setRemoveAfterFiring() {
|
| + assert(_isFiring);
|
| + _eventState |= _STATE_REMOVE_AFTER_FIRING;
|
| + }
|
| +
|
| + bool get _removeAfterFiring =>
|
| + (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0;
|
| +
|
| + // The controller._recordPause doesn't do anything for a broadcast controller,
|
| + // so we don't bother calling it.
|
| + void _onPause() { }
|
| +
|
| + // The controller._recordResume doesn't do anything for a broadcast
|
| + // controller, so we don't bother calling it.
|
| + void _onResume() { }
|
| +
|
| + // _onCancel is inherited.
|
| +}
|
| +
|
| +
|
| +abstract class _BroadcastStreamController<T>
|
| + implements StreamController<T>,
|
| + _StreamControllerLifecycle<T>,
|
| + _BroadcastSubscriptionLink,
|
| + _EventSink<T>,
|
| + _EventDispatch<T> {
|
| + static const int _STATE_INITIAL = 0;
|
| + static const int _STATE_EVENT_ID = 1;
|
| + static const int _STATE_FIRING = 2;
|
| + static const int _STATE_CLOSED = 4;
|
| + static const int _STATE_ADDSTREAM = 8;
|
| +
|
| + final _NotificationHandler _onListen;
|
| + final _NotificationHandler _onCancel;
|
| +
|
| + // State of the controller.
|
| + int _state;
|
| +
|
| + // Double-linked list of active listeners.
|
| + _BroadcastSubscriptionLink _next;
|
| + _BroadcastSubscriptionLink _previous;
|
| +
|
| + // Extra state used during an [addStream] call.
|
| + _AddStreamState<T> _addStreamState;
|
| +
|
| + /**
|
| + * Future returned by [close] and [done].
|
| + *
|
| + * The future is completed whenever the done event has been sent to all
|
| + * relevant listeners.
|
| + * The relevant listeners are the ones that were listening when [close] was
|
| + * called. When all of these have been canceled (sending the done event makes
|
| + * them cancel, but they can also be canceled before sending the event),
|
| + * this future completes.
|
| + *
|
| + * Any attempt to listen after calling [close] will throw, so there won't
|
| + * be any further listeners.
|
| + */
|
| + _FutureImpl _doneFuture;
|
| +
|
| + _BroadcastStreamController(this._onListen, this._onCancel)
|
| + : _state = _STATE_INITIAL {
|
| + _next = _previous = this;
|
| + }
|
| +
|
| + // StreamController interface.
|
| +
|
| + Stream<T> get stream => new _BroadcastStream<T>(this);
|
| +
|
| + StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);
|
| +
|
| + bool get isClosed => (_state & _STATE_CLOSED) != 0;
|
| +
|
| + /**
|
| + * A broadcast controller is never paused.
|
| + *
|
| + * Each receiving stream may be paused individually, and they handle their
|
| + * own buffering.
|
| + */
|
| + bool get isPaused => false;
|
| +
|
| + /** Whether there are currently one or more subscribers. */
|
| + bool get hasListener => !_isEmpty;
|
| +
|
| + /** Whether an event is being fired (sent to some, but not all, listeners). */
|
| + bool get _isFiring => (_state & _STATE_FIRING) != 0;
|
| +
|
| + bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;
|
| +
|
| + bool get _mayAddEvent => (_state < _STATE_CLOSED);
|
| +
|
| + _FutureImpl _ensureDoneFuture() {
|
| + if (_doneFuture != null) return _doneFuture;
|
| + return _doneFuture = new _FutureImpl();
|
| + }
|
| +
|
| + // Linked list helpers
|
| +
|
| + bool get _isEmpty => identical(_next, this);
|
| +
|
| + /** Adds subscription to linked list of active listeners. */
|
| + void _addListener(_BroadcastSubscription<T> subscription) {
|
| + assert(identical(subscription._next, subscription));
|
| + // Insert in linked list just before `this`.
|
| + subscription._previous = _previous;
|
| + subscription._next = this;
|
| + this._previous._next = subscription;
|
| + this._previous = subscription;
|
| + subscription._eventState = (_state & _STATE_EVENT_ID);
|
| + }
|
| +
|
| + void _removeListener(_BroadcastSubscription<T> subscription) {
|
| + assert(identical(subscription._controller, this));
|
| + assert(!identical(subscription._next, subscription));
|
| + _BroadcastSubscriptionLink previous = subscription._previous;
|
| + _BroadcastSubscriptionLink next = subscription._next;
|
| + previous._next = next;
|
| + next._previous = previous;
|
| + subscription._next = subscription._previous = subscription;
|
| + }
|
| +
|
| + // _StreamControllerLifecycle interface.
|
| +
|
| + StreamSubscription<T> _subscribe(void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + if (isClosed) {
|
| + throw new StateError("Subscribing to closed stream");
|
| + }
|
| + StreamSubscription subscription = new _BroadcastSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError);
|
| + _addListener(subscription);
|
| + if (identical(_next, _previous)) {
|
| + // Only one listener, so it must be the first listener.
|
| + _runGuarded(_onListen);
|
| + }
|
| + return subscription;
|
| + }
|
| +
|
| + void _recordCancel(_BroadcastSubscription<T> subscription) {
|
| + // If already removed by the stream, don't remove it again.
|
| + if (identical(subscription._next, subscription)) return;
|
| + assert(!identical(subscription._next, subscription));
|
| + if (subscription._isFiring) {
|
| + subscription._setRemoveAfterFiring();
|
| + } else {
|
| + assert(!identical(subscription._next, subscription));
|
| + _removeListener(subscription);
|
| + // If we are currently firing an event, the empty-check is performed at
|
| + // the end of the listener loop instead of here.
|
| + if (!_isFiring && _isEmpty) {
|
| + _callOnCancel();
|
| + }
|
| + }
|
| + }
|
| +
|
| + void _recordPause(StreamSubscription<T> subscription) {}
|
| + void _recordResume(StreamSubscription<T> subscription) {}
|
| +
|
| + // EventSink interface.
|
| +
|
| + Error _addEventError() {
|
| + if (isClosed) {
|
| + return new StateError("Cannot add new events after calling close");
|
| + }
|
| + assert(_isAddingStream);
|
| + return new StateError("Cannot add new events while doing an addStream");
|
| + }
|
| +
|
| + void add(T data) {
|
| + if (!_mayAddEvent) throw _addEventError();
|
| + _sendData(data);
|
| + }
|
| +
|
| + void addError(Object error, [Object stackTrace]) {
|
| + if (!_mayAddEvent) throw _addEventError();
|
| + if (stackTrace != null) _attachStackTrace(error, stackTrace);
|
| + _sendError(error);
|
| + }
|
| +
|
| + Future close() {
|
| + if (isClosed) {
|
| + assert(_doneFuture != null);
|
| + return _doneFuture;
|
| + }
|
| + if (!_mayAddEvent) throw _addEventError();
|
| + _state |= _STATE_CLOSED;
|
| + Future doneFuture = _ensureDoneFuture();
|
| + _sendDone();
|
| + return doneFuture;
|
| + }
|
| +
|
| + Future get done => _ensureDoneFuture();
|
| +
|
| + Future addStream(Stream<T> stream) {
|
| + if (!_mayAddEvent) throw _addEventError();
|
| + _state |= _STATE_ADDSTREAM;
|
| + _addStreamState = new _AddStreamState(this, stream);
|
| + return _addStreamState.addStreamFuture;
|
| + }
|
| +
|
| + // _EventSink interface, called from AddStreamState.
|
| + void _add(T data) {
|
| + _sendData(data);
|
| + }
|
| +
|
| + void _addError(Object error) {
|
| + assert(_isAddingStream);
|
| + _sendError(error);
|
| + }
|
| +
|
| + void _close() {
|
| + assert(_isAddingStream);
|
| + _AddStreamState addState = _addStreamState;
|
| + _addStreamState = null;
|
| + _state &= ~_STATE_ADDSTREAM;
|
| + addState.complete();
|
| + }
|
| +
|
| + // Event handling.
|
| + void _forEachListener(
|
| + void action(_BufferingStreamSubscription<T> subscription)) {
|
| + if (_isFiring) {
|
| + throw new StateError(
|
| + "Cannot fire new event. Controller is already firing an event");
|
| + }
|
| + if (_isEmpty) return;
|
| +
|
| + // Get event id of this event.
|
| + int id = (_state & _STATE_EVENT_ID);
|
| + // Start firing (set the _STATE_FIRING bit). We don't do [_onCancel]
|
| + // callbacks while firing, and we prevent reentrancy of this function.
|
| + //
|
| + // Set [_state]'s event id to the next event's id.
|
| + // Any listeners added while firing this event will expect the next event,
|
| + // not this one, and won't get notified.
|
| + _state ^= _STATE_EVENT_ID | _STATE_FIRING;
|
| + _BroadcastSubscriptionLink link = _next;
|
| + while (!identical(link, this)) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + if (subscription._expectsEvent(id)) {
|
| + subscription._eventState |= _BroadcastSubscription._STATE_FIRING;
|
| + action(subscription);
|
| + subscription._toggleEventId();
|
| + link = subscription._next;
|
| + if (subscription._removeAfterFiring) {
|
| + _removeListener(subscription);
|
| + }
|
| + subscription._eventState &= ~_BroadcastSubscription._STATE_FIRING;
|
| + } else {
|
| + link = subscription._next;
|
| + }
|
| + }
|
| + _state &= ~_STATE_FIRING;
|
| +
|
| + if (_isEmpty) {
|
| + _callOnCancel();
|
| + }
|
| + }
|
| +
|
| + void _callOnCancel() {
|
| + assert(_isEmpty);
|
| + if (isClosed && _doneFuture._mayComplete) {
|
| + // When closed, _doneFuture is not null.
|
| + _doneFuture._asyncSetValue(null);
|
| + }
|
| + _runGuarded(_onCancel);
|
| + }
|
| +}
|
| +
|
| +class _SyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| + _SyncBroadcastStreamController(void onListen(), void onCancel())
|
| + : super(onListen, onCancel);
|
| +
|
| + // EventDispatch interface.
|
| +
|
| + void _sendData(T data) {
|
| + if (_isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + subscription._add(data);
|
| + });
|
| + }
|
| +
|
| + void _sendError(Object error) {
|
| + if (_isEmpty) return;
|
| + _forEachListener((_BufferingStreamSubscription<T> subscription) {
|
| + subscription._addError(error);
|
| + });
|
| + }
|
| +
|
| + void _sendDone() {
|
| + if (!_isEmpty) {
|
| + _forEachListener((_BroadcastSubscription<T> subscription) {
|
| + subscription._close();
|
| + });
|
| + } else {
|
| + assert(_doneFuture != null);
|
| + assert(_doneFuture._mayComplete);
|
| + _doneFuture._asyncSetValue(null);
|
| + }
|
| + }
|
| +}
|
| +
|
| +class _AsyncBroadcastStreamController<T> extends _BroadcastStreamController<T> {
|
| + _AsyncBroadcastStreamController(void onListen(), void onCancel())
|
| + : super(onListen, onCancel);
|
| +
|
| + // EventDispatch interface.
|
| +
|
| + void _sendData(T data) {
|
| + for (_BroadcastSubscriptionLink link = _next;
|
| + !identical(link, this);
|
| + link = link._next) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + subscription._addPending(new _DelayedData(data));
|
| + }
|
| + }
|
| +
|
| + void _sendError(Object error) {
|
| + for (_BroadcastSubscriptionLink link = _next;
|
| + !identical(link, this);
|
| + link = link._next) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + subscription._addPending(new _DelayedError(error));
|
| + }
|
| + }
|
| +
|
| + void _sendDone() {
|
| + if (!_isEmpty) {
|
| + for (_BroadcastSubscriptionLink link = _next;
|
| + !identical(link, this);
|
| + link = link._next) {
|
| + _BroadcastSubscription<T> subscription = link;
|
| + subscription._addPending(const _DelayedDone());
|
| + }
|
| + } else {
|
| + assert(_doneFuture != null);
|
| + assert(_doneFuture._mayComplete);
|
| + _doneFuture._asyncSetValue(null);
|
| + }
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Stream controller that is used by [Stream.asBroadcastStream].
|
| + *
|
| + * This stream controller allows incoming events while it is firing
|
| + * other events. This is handled by delaying the events until the
|
| + * current event is done firing, and then fire the pending events.
|
| + *
|
| + * This class extends [_SyncBroadcastStreamController]. Events of
|
| + * an "asBroadcastStream" stream are always initiated by events
|
| + * on another stream, and it is fine to forward them synchronously.
|
| + */
|
| +class _AsBroadcastStreamController<T>
|
| + extends _SyncBroadcastStreamController<T>
|
| + implements _EventDispatch<T> {
|
| + _StreamImplEvents _pending;
|
| +
|
| + _AsBroadcastStreamController(void onListen(), void onCancel())
|
| + : super(onListen, onCancel);
|
| +
|
| + bool get _hasPending => _pending != null && ! _pending.isEmpty;
|
| +
|
| + void _addPendingEvent(_DelayedEvent event) {
|
| + if (_pending == null) {
|
| + _pending = new _StreamImplEvents();
|
| + }
|
| + _pending.add(event);
|
| + }
|
| +
|
| + void add(T data) {
|
| + if (!isClosed && _isFiring) {
|
| + _addPendingEvent(new _DelayedData<T>(data));
|
| + return;
|
| + }
|
| + super.add(data);
|
| + while (_hasPending) {
|
| + _pending.handleNext(this);
|
| + }
|
| + }
|
| +
|
| + void addError(Object error, [StackTrace stackTrace]) {
|
| + if (!isClosed && _isFiring) {
|
| + _addPendingEvent(new _DelayedError(error));
|
| + return;
|
| + }
|
| + super.addError(error, stackTrace);
|
| + while (_hasPending) {
|
| + _pending.handleNext(this);
|
| + }
|
| + }
|
| +
|
| + void close() {
|
| + if (!isClosed && _isFiring) {
|
| + _addPendingEvent(const _DelayedDone());
|
| + _state |= _STATE_CLOSED;
|
| + return;
|
| + }
|
| + super.close();
|
| + assert(!_hasPending);
|
| + }
|
| +
|
| + void _callOnCancel() {
|
| + if (_hasPending) {
|
| + _pending.clear();
|
| + _pending = null;
|
| + }
|
| + super._callOnCancel();
|
| + }
|
| +}
|
| +
|
| +// A subscription that never receives any events.
|
| +// It can simulate pauses, but otherwise does nothing.
|
| +class _DoneSubscription<T> implements StreamSubscription<T> {
|
| + int _pauseCount = 0;
|
| + void onData(void handleData(T data)) {}
|
| + void onError(void handleErrr(Object error)) {}
|
| + void onDone(void handleDone()) {}
|
| + void pause([Future resumeSignal]) {
|
| + if (resumeSignal != null) resumeSignal.then(_resume);
|
| + _pauseCount++;
|
| + }
|
| + void resume() { _resume(null); }
|
| + void _resume(_) {
|
| + if (_pauseCount > 0) _pauseCount--;
|
| + }
|
| + void cancel() {}
|
| + bool get isPaused => _pauseCount > 0;
|
| + Future asFuture(Object value) => new _FutureImpl();
|
| +}
|
|
|