| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index 8fa1848d14ac0261b2943dfc41c4ba314d457162..3d704856e73355adadbc6cfe424f7140eef2450c 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -4,57 +4,6 @@
|
|
|
| part of dart.async;
|
|
|
| -// States shared by single/multi stream implementations.
|
| -
|
| -// Completion state of the stream.
|
| -/// Initial and default state where the stream can receive and send events.
|
| -const int _STREAM_OPEN = 0;
|
| -/// The stream has received a request to complete, but hasn't done so yet.
|
| -/// No further events can be added to the stream.
|
| -const int _STREAM_CLOSED = 1;
|
| -/// The stream has completed and will no longer receive or send events.
|
| -/// Also counts as closed. The stream must not be paused when it's completed.
|
| -/// Always used in conjunction with [_STREAM_CLOSED].
|
| -const int _STREAM_COMPLETE = 2;
|
| -
|
| -/// Bit that alternates between events, and listeners are updated to the
|
| -/// current value when they are notified of the event.
|
| -const int _STREAM_EVENT_ID = 4;
|
| -const int _STREAM_EVENT_ID_SHIFT = 2;
|
| -
|
| -// The activity state of the stream: What is it currently doing.
|
| -/// Bit set while firing and clear while not.
|
| -const int _STREAM_FIRING = 8;
|
| -/// Bit set while calling a pause-state or subscription-state change callback.
|
| -const int _STREAM_CALLBACK = 16;
|
| -
|
| -// The pause state of the stream.
|
| -/// Bit set when resuming with pending events. Cleared after all pending events
|
| -/// have been transmitted. Means that the controller still considers the
|
| -/// stream paused, even if the listener doesn't.
|
| -const int _STREAM_PENDING_RESUME = 32;
|
| -/// The count of times a stream has paused is stored in the
|
| -/// state, shifted by this amount.
|
| -const int _STREAM_PAUSE_COUNT_SHIFT = 6;
|
| -
|
| -// States for listeners.
|
| -
|
| -/// The listener is currently not subscribed to its source stream.
|
| -const int _LISTENER_UNSUBSCRIBED = 0;
|
| -/// The listener is actively subscribed to its source stream.
|
| -const int _LISTENER_SUBSCRIBED = 1;
|
| -/// The listener is subscribed until it has been notified of the current event.
|
| -/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
|
| -const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
|
| -
|
| -/// Bit that contains the last sent event's "id bit".
|
| -const int _LISTENER_EVENT_ID = 4;
|
| -const int _LISTENER_EVENT_ID_SHIFT = 2;
|
| -
|
| -/// The count of times a listener has paused is stored in the
|
| -/// state, shifted by this amount.
|
| -const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
|
| -
|
| /** Throws the given error in the next cycle. */
|
| _throwDelayed(var error, [Object stackTrace]) {
|
| // We are going to reach the top-level here, but there might be a global
|
| @@ -69,915 +18,526 @@ _throwDelayed(var error, [Object stackTrace]) {
|
| });
|
| }
|
|
|
| +/** Abstract and private interface for a place to put events. */
|
| +abstract class _EventSink<T> {
|
| + void _add(T data);
|
| + void _addError(Object error);
|
| + void _close();
|
| +}
|
|
|
| -// -------------------------------------------------------------------
|
| -// Common base class for single and multi-subscription streams.
|
| -// -------------------------------------------------------------------
|
| -abstract class _StreamImpl<T> extends Stream<T> {
|
| - /** Current state of the stream. */
|
| - int _state = _STREAM_OPEN;
|
| -
|
| - /**
|
| - * List of pending events.
|
| - *
|
| - * If events are added to the stream (using [_add], [_addError] or [_done])
|
| - * while the stream is paused, or while another event is firing, events will
|
| - * stored here.
|
| - * Also supports scheduling the events for later execution.
|
| - */
|
| - _PendingEvents _pendingEvents;
|
| -
|
| - // ------------------------------------------------------------------
|
| - // Stream interface.
|
| -
|
| - StreamSubscription<T> listen(void onData(T data),
|
| - { void onError(error),
|
| - void onDone(),
|
| - bool cancelOnError }) {
|
| - if (_isComplete) {
|
| - return new _DoneSubscription(onDone);
|
| - }
|
| - if (onData == null) onData = _nullDataHandler;
|
| - if (onError == null) onError = _nullErrorHandler;
|
| - if (onDone == null) onDone = _nullDoneHandler;
|
| - cancelOnError = identical(true, cancelOnError);
|
| - _StreamSubscriptionImpl subscription =
|
| - _createSubscription(onData, onError, onDone, cancelOnError);
|
| - _addListener(subscription);
|
| - return subscription;
|
| - }
|
| -
|
| - // ------------------------------------------------------------------
|
| - // EventSink interface-like methods for sending events into the stream.
|
| - // It's the responsibility of the caller to ensure that the stream is not
|
| - // paused when adding events. If the stream is paused, the events will be
|
| - // queued, but it's better to not send events at all.
|
| +/**
|
| + * Abstract and private interface for a place to send events.
|
| + *
|
| + * Used by event buffering to finally dispatch the pending event, where
|
| + * [_EventSink] is where the event first enters the stream subscription,
|
| + * and may yet be buffered.
|
| + */
|
| +abstract class _EventDispatch<T> {
|
| + void _sendData(T data);
|
| + void _sendError(Object error);
|
| + void _sendDone();
|
| +}
|
|
|
| +/**
|
| + * Default implementation of stream subscription of buffering events.
|
| + *
|
| + * The only public methods are those of [StreamSubscription], so instances of
|
| + * [_BufferingStreamSubscription] can be returned directly as a
|
| + * [StreamSubscription] without exposing internal functionality.
|
| + *
|
| + * The [StreamController] is a public facing version of [Stream] and this class,
|
| + * with some methods made public.
|
| + *
|
| + * The user interface of [_BufferingStreamSubscription] are the following
|
| + * methods:
|
| + * * [_add]: Add a data event to the stream.
|
| + * * [_addError]: Add an error event to the stream.
|
| + * * [_close]: Request to close the stream.
|
| + * * [_onCancel]: Called when the subscription will provide no more events,
|
| + * either due to being actively canceled, or after sending a done event.
|
| + * * [_onPause]: Called when the subscription wants the event source to pause.
|
| + * * [_onResume]: Called when allowing new events after a pause.
|
| + * The user should not add new events when the subscription requests a paused,
|
| + * but if it happens anyway, the subscription will enqueue the events just as
|
| + * when new events arrive while still firing an old event.
|
| + */
|
| +class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| + _EventSink<T>,
|
| + _EventDispatch<T> {
|
| + /** The `cancelOnError` flag from the `listen` call. */
|
| + static const int _STATE_CANCEL_ON_ERROR = 1;
|
| /**
|
| - * Send or queue a data event.
|
| + * Whether the "done" event has been received.
|
| + * No further events are accepted after this.
|
| */
|
| - void _add(T value) {
|
| - if (_isClosed) throw new StateError("Sending on closed stream");
|
| - if (!_mayFireState) {
|
| - // Not the time to send events.
|
| - _addPendingEvent(new _DelayedData<T>(value));
|
| - return;
|
| - }
|
| - if (_hasPendingEvent) {
|
| - _addPendingEvent(new _DelayedData<T>(value));
|
| - } else {
|
| - _sendData(value);
|
| - }
|
| - _handlePendingEvents();
|
| - }
|
| -
|
| + static const int _STATE_CLOSED = 2;
|
| /**
|
| - * Send or enqueue an error event.
|
| + * Set if the input has been asked not to send events.
|
| *
|
| - * If a subscription has requested to be unsubscribed on errors,
|
| - * it will be unsubscribed after receiving this event.
|
| + * This is not the same as being paused, since the input will remain paused
|
| + * after a call to [resume] if there are pending events.
|
| */
|
| - void _addError(error) {
|
| - if (_isClosed) throw new StateError("Sending on closed stream");
|
| - if (!_mayFireState) {
|
| - // Not the time to send events.
|
| - _addPendingEvent(new _DelayedError(error));
|
| - return;
|
| - }
|
| - if (_hasPendingEvent) {
|
| - _addPendingEvent(new _DelayedError(error));
|
| - } else {
|
| - _sendError(error);
|
| - }
|
| - _handlePendingEvents();
|
| - }
|
| -
|
| + static const int _STATE_INPUT_PAUSED = 4;
|
| /**
|
| - * Send or enqueue a "done" message.
|
| + * Whether the subscription has been canceled.
|
| *
|
| - * The "done" message should be sent at most once by a stream, and it
|
| - * should be the last message sent.
|
| + * Set by calling [cancel], or by handling a "done" event, or an "error" event
|
| + * when `cancelOnError` is true.
|
| */
|
| - void _close() {
|
| - if (_isClosed) return;
|
| - _state |= _STREAM_CLOSED;
|
| - if (!_mayFireState) {
|
| - // Not the time to send events.
|
| - _addPendingEvent(const _DelayedDone());
|
| - return;
|
| - }
|
| - if (_hasPendingEvent) {
|
| - _addPendingEvent(new _DelayedDone());
|
| - _handlePendingEvents();
|
| - } else {
|
| - _sendDone();
|
| - assert(_isComplete);
|
| - assert(!_hasPendingEvent);
|
| - }
|
| - }
|
| -
|
| - // -------------------------------------------------------------------
|
| - // Internal implementation.
|
| -
|
| - // State predicates.
|
| -
|
| - // Lifecycle state.
|
| - /** Whether the stream is in the default, open, state for events. */
|
| - bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0;
|
| -
|
| - /** Whether the stream has been closed (a done event requested). */
|
| - bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
|
| -
|
| - /** Whether the stream is completed. */
|
| - bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
|
| -
|
| - // Pause state.
|
| -
|
| - /** Whether one or more active subscribers have requested a pause. */
|
| - bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
|
| + static const int _STATE_CANCELED = 8;
|
| + static const int _STATE_IN_CALLBACK = 16;
|
| + static const int _STATE_HAS_PENDING = 32;
|
| + static const int _STATE_PAUSE_COUNT = 64;
|
| + static const int _STATE_PAUSE_COUNT_SHIFT = 6;
|
| +
|
| + /* Event handlers provided in constructor. */
|
| + /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands
|
| + * parameterized function types. */
|
| + Function _onData;
|
| + _ErrorHandler _onError;
|
| + _DoneHandler _onDone;
|
|
|
| - /** How many times the stream has been paused. */
|
| - int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT;
|
| + /** Bit vector based on state-constants above. */
|
| + int _state;
|
|
|
| /**
|
| - * Whether a controller thinks the stream is paused.
|
| + * Queue of pending events.
|
| *
|
| - * When this changes, a pause-state change callback is performed.
|
| - *
|
| - * It may differ from [_isPaused] if there are pending events
|
| - * in the queue when the listeners resume. The controller won't
|
| - * be informed until all queued events have been fired.
|
| + * Is created when necessary, or set in constructor for preconfigured events.
|
| */
|
| - bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME);
|
| -
|
| - /** Whether we have a pending resume scheduled. */
|
| - bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0;
|
| -
|
| -
|
| - // Action state. If the stream makes a call-out to external code,
|
| - // this state tracks it and avoids reentrancy problems.
|
| -
|
| - /** Whether the stream is not currently firing or calling a callback. */
|
| - bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0;
|
| + _PendingEvents _pending;
|
|
|
| - /** Whether we are currently executing a state-chance callback. */
|
| - bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0;
|
| -
|
| - /** Whether we are currently firing an event. */
|
| - bool get _isFiring => (_state & _STREAM_FIRING) != 0;
|
| -
|
| - /** Check whether the pending event queue is non-empty */
|
| - bool get _hasPendingEvent =>
|
| - _pendingEvents != null && !_pendingEvents.isEmpty;
|
| + _BufferingStreamSubscription(this._onData,
|
| + this._onError,
|
| + this._onDone,
|
| + bool cancelOnError)
|
| + : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
|
| + assert(_onData != null);
|
| + assert(_onError != null);
|
| + assert(_onDone != null);
|
| + }
|
|
|
| /**
|
| - * The bit representing the current or last event fired.
|
| + * Sets the subscription's pending events object.
|
| *
|
| - * This bit matches a bit on listeners that have received the corresponding
|
| - * event. It is toggled for each new event being fired.
|
| + * This can only be done once. The pending events object is used for the
|
| + * rest of the subscription's life cycle.
|
| */
|
| - int get _currentEventIdBit =>
|
| - (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
|
| -
|
| - /** Whether there is currently a subscriber on this [Stream]. */
|
| - bool get _hasListener;
|
| -
|
| -
|
| - /** Whether the state bits allow firing. */
|
| - bool get _mayFireState {
|
| - // The state allows firing unless:
|
| - // - it's currently firing
|
| - // - it's currently in a callback
|
| - // - it's paused
|
| - const int mask =
|
| - _STREAM_FIRING |
|
| - _STREAM_CALLBACK |
|
| - ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1);
|
| - return (_state & mask) == 0;
|
| - }
|
| -
|
| - // State modification.
|
| -
|
| - /** Record an increases in the number of times the listener has paused. */
|
| - void _incrementPauseCount(_StreamListener<T> listener) {
|
| - listener._incrementPauseCount();
|
| - _state &= ~_STREAM_PENDING_RESUME;
|
| - _updatePauseCount(1);
|
| - }
|
| -
|
| - /** Record a decrease in the number of times the listener has paused. */
|
| - void _decrementPauseCount(_StreamListener<T> listener) {
|
| - assert(_isPaused);
|
| - listener._decrementPauseCount();
|
| - _updatePauseCount(-1);
|
| - }
|
| -
|
| - /** Update the stream's own pause count only. */
|
| - void _updatePauseCount(int by) {
|
| - int oldState = _state;
|
| - // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js
|
| - // converts the result of the left-shift to a positive number.
|
| - if (by >= 0) {
|
| - _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT);
|
| - } else {
|
| - _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT);
|
| + void _setPendingEvents(_PendingEvents pendingEvents) {
|
| + assert(_pending == null);
|
| + if (pendingEvents == null) return;
|
| + _pending = pendingEvents;
|
| + if (!pendingEvents.isEmpty) {
|
| + _state |= _STATE_HAS_PENDING;
|
| + _pending.schedule(this);
|
| }
|
| - assert(_state >= 0);
|
| - assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) ==
|
| - (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by);
|
| }
|
|
|
| - void _setClosed() {
|
| - assert(!_isClosed);
|
| - _state |= _STREAM_CLOSED;
|
| + /**
|
| + * Extracts the pending events from a canceled stream.
|
| + *
|
| + * This can only be done during the [_onCancel] method call. After that,
|
| + * any remaining pending events will be cleared.
|
| + */
|
| + _PendingEvents _extractPending() {
|
| + assert(_isCanceled);
|
| + _PendingEvents events = _pending;
|
| + _pending = null;
|
| + return events;
|
| }
|
|
|
| - void _setComplete() {
|
| - assert(_isClosed);
|
| - _state = _state |_STREAM_COMPLETE;
|
| + // StreamSubscription interface.
|
| +
|
| + void onData(void handleData(T event)) {
|
| + if (handleData == null) handleData = _nullDataHandler;
|
| + _onData = handleData;
|
| }
|
|
|
| - void _startFiring() {
|
| - assert(!_isFiring);
|
| - assert(!_isInCallback);
|
| - assert(_hasListener);
|
| - assert(!_isPaused);
|
| - // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
|
| - // bit. All current subscribers will now have a _LISTENER_EVENT_ID
|
| - // that doesn't match _STREAM_EVENT_ID, and they will receive the
|
| - // event being fired.
|
| - _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
|
| + void onError(void handleError(error)) {
|
| + if (handleError == null) handleError = _nullErrorHandler;
|
| + _onError = handleError;
|
| }
|
|
|
| - void _endFiring(bool wasInputPaused) {
|
| - assert(_isFiring);
|
| - _state ^= _STREAM_FIRING;
|
| - // Had listeners, or we wouldn't have fired.
|
| - _checkCallbacks(true, wasInputPaused);
|
| + void onDone(void handleDone()) {
|
| + if (handleDone == null) handleDone = _nullDoneHandler;
|
| + _onDone = handleDone;
|
| }
|
|
|
| - /**
|
| - * Record that a listener wants a pause from events.
|
| - *
|
| - * This methods is called from [_StreamListener.pause()].
|
| - * Subclasses can override this method, along with [isPaused] and
|
| - * [createSubscription], if they want to do a different handling of paused
|
| - * subscriptions, e.g., a filtering stream pausing its own source if all its
|
| - * subscribers are paused.
|
| - */
|
| - void _pause(_StreamListener<T> listener, Future resumeSignal) {
|
| - assert(identical(listener._source, this));
|
| - if (!listener._isSubscribed) {
|
| - throw new StateError("Subscription has been canceled.");
|
| - }
|
| - assert(!_isComplete); // There can be no subscribers when complete.
|
| - bool wasInputPaused = _isInputPaused;
|
| + void pause([Future resumeSignal]) {
|
| + if (_isCanceled) return;
|
| bool wasPaused = _isPaused;
|
| - _incrementPauseCount(listener);
|
| - if (resumeSignal != null) {
|
| - resumeSignal.whenComplete(() { this._resume(listener, true); });
|
| - }
|
| - if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) {
|
| - _pendingEvents.cancelSchedule();
|
| - }
|
| - if (_isInactive && !wasInputPaused) {
|
| - _checkCallbacks(true, false);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - _schedulePendingEvents();
|
| - }
|
| - }
|
| + bool wasInputPaused = _isInputPaused;
|
| + // Increment pause count and mark input paused (if it isn't already).
|
| + _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
|
| + if (resumeSignal != null) resumeSignal.whenComplete(resume);
|
| + if (!wasPaused && _pending != null) _pending.cancelSchedule();
|
| + if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
|
| }
|
|
|
| - /** Stops pausing due to one request from the given listener. */
|
| - void _resume(_StreamListener<T> listener, bool fromEvent) {
|
| - if (!listener.isPaused) return;
|
| - assert(listener._isSubscribed);
|
| - assert(_isPaused);
|
| - _decrementPauseCount(listener);
|
| - if (!_isPaused) {
|
| - if (_hasPendingEvent) {
|
| - _state |= _STREAM_PENDING_RESUME;
|
| - // Controller's pause state hasn't changed.
|
| - // If we can fire events now, fire any pending events right away.
|
| - if (_isInactive) {
|
| - if (fromEvent) {
|
| - _handlePendingEvents();
|
| - } else {
|
| - _schedulePendingEvents();
|
| - }
|
| - }
|
| - } else if (_isInactive) {
|
| - _checkCallbacks(true, true);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - if (fromEvent) {
|
| - _handlePendingEvents();
|
| - } else {
|
| - _schedulePendingEvents();
|
| - }
|
| + void resume() {
|
| + if (_isCanceled) return;
|
| + if (_isPaused) {
|
| + _decrementPauseCount();
|
| + if (!_isPaused) {
|
| + if (_hasPending && !_pending.isEmpty) {
|
| + // Input is still paused.
|
| + _pending.schedule(this);
|
| + } else {
|
| + assert(_mayResumeInput);
|
| + _state &= ~_STATE_INPUT_PAUSED;
|
| + if (!_inCallback) _guardCallback(_onResume);
|
| }
|
| }
|
| }
|
| }
|
|
|
| - /** Schedule pending events to be executed. */
|
| - void _schedulePendingEvents() {
|
| - assert(_hasPendingEvent);
|
| - _pendingEvents.schedule(this);
|
| + void cancel() {
|
| + if (_isCanceled) return;
|
| + _cancel();
|
| + if (!_inCallback) {
|
| + // otherwise checkState will be called after firing or callback completes.
|
| + _state |= _STATE_IN_CALLBACK;
|
| + _onCancel();
|
| + _pending = null;
|
| + _state &= ~_STATE_IN_CALLBACK;
|
| + }
|
| }
|
|
|
| - /** Create a subscription object. Called by [subcribe]. */
|
| - _StreamSubscriptionImpl<T> _createSubscription(
|
| - void onData(T data),
|
| - void onError(error),
|
| - void onDone(),
|
| - bool cancelOnError);
|
| + Future asFuture([var futureValue]) {
|
| + _FutureImpl<T> result = new _FutureImpl<T>();
|
|
|
| - /**
|
| - * Adds a listener to this stream.
|
| - */
|
| - void _addListener(_StreamSubscriptionImpl subscription);
|
| + // Overwrite the onDone and onError handlers.
|
| + _onDone = () { result._setValue(futureValue); };
|
| + _onError = (error) {
|
| + cancel();
|
| + result._setError(error);
|
| + };
|
|
|
| - /**
|
| - * Handle a cancel requested from a [_StreamSubscriptionImpl].
|
| - *
|
| - * This method is called from [_StreamSubscriptionImpl.cancel].
|
| - *
|
| - * If an event is currently firing, the cancel is delayed
|
| - * until after the subscribers have received the event.
|
| - */
|
| - void _cancel(_StreamSubscriptionImpl subscriber);
|
| + return result;
|
| + }
|
|
|
| - /**
|
| - * Iterate over all current subscribers and perform an action on each.
|
| - *
|
| - * Subscribers added during the iteration will not be visited.
|
| - * Subscribers unsubscribed during the iteration will only be removed
|
| - * after they have been acted on.
|
| - *
|
| - * Any change in the pause state is only reported after all subscribers have
|
| - * received the event.
|
| - *
|
| - * The [action] must not throw, or the controller will be left in an
|
| - * invalid state.
|
| - *
|
| - * This method must not be called while [isFiring] is true.
|
| - */
|
| - void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
|
| + // State management.
|
|
|
| - /**
|
| - * Checks whether the subscription/pause state has changed.
|
| - *
|
| - * Calls the appropriate callback if the state has changed from the
|
| - * provided one. Repeats calling callbacks as long as the call changes
|
| - * the state.
|
| - */
|
| - void _checkCallbacks(bool hadListener, bool wasPaused) {
|
| - assert(!_isFiring);
|
| - // Will be handled after the current callback.
|
| - if (_isInCallback) return;
|
| - if (_hasPendingResume && !_hasPendingEvent) {
|
| - _state ^= _STREAM_PENDING_RESUME;
|
| - }
|
| - _state |= _STREAM_CALLBACK;
|
| - while (true) {
|
| - bool hasListener = _hasListener;
|
| - bool isPaused = _isInputPaused;
|
| - if (hadListener != hasListener) {
|
| - _onSubscriptionStateChange();
|
| - } else if (isPaused != wasPaused) {
|
| - _onPauseStateChange();
|
| - } else {
|
| - _state ^= _STREAM_CALLBACK;
|
| - return;
|
| - }
|
| - wasPaused = isPaused;
|
| - hadListener = hasListener;
|
| + bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
|
| + bool get _isClosed => (_state & _STATE_CLOSED) != 0;
|
| + bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
|
| + bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
|
| + bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
|
| + bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
|
| + bool get _canFire => _state < _STATE_IN_CALLBACK;
|
| + bool get _mayResumeInput =>
|
| + !_isPaused && (_pending == null || _pending.isEmpty);
|
| + bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
|
| +
|
| + bool get isPaused => _isPaused;
|
| +
|
| + void _cancel() {
|
| + _state |= _STATE_CANCELED;
|
| + if (_hasPending) {
|
| + _pending.cancelSchedule();
|
| }
|
| }
|
|
|
| /**
|
| - * Called when the first subscriber requests a pause or the last a resume.
|
| + * Increment the pause count.
|
| *
|
| - * Read [isPaused] to see the new state.
|
| + * Also marks input as paused.
|
| */
|
| - void _onPauseStateChange() {}
|
| -
|
| - /**
|
| - * Called when the first listener subscribes or the last unsubscribes.
|
| - *
|
| - * Read [hasListener] to see what the new state is.
|
| - */
|
| - void _onSubscriptionStateChange() {}
|
| + void _incrementPauseCount() {
|
| + _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
|
| + }
|
|
|
| /**
|
| - * Add a pending event at the end of the pending event queue.
|
| + * Decrements the pause count.
|
| *
|
| - * Schedules events if currently not paused and inside a callback.
|
| + * Does not automatically unpause the input (call [_onResume]) when
|
| + * the pause count reaches zero. This is handled elsewhere, and only
|
| + * if there are no pending events buffered.
|
| */
|
| - void _addPendingEvent(_DelayedEvent event) {
|
| - if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
|
| - _StreamImplEvents events = _pendingEvents;
|
| - events.add(event);
|
| - if (_isPaused || _isFiring) return;
|
| - if (_isInCallback) {
|
| - _schedulePendingEvents();
|
| - return;
|
| - }
|
| + void _decrementPauseCount() {
|
| + assert(_isPaused);
|
| + _state -= _STATE_PAUSE_COUNT;
|
| }
|
|
|
| - /** Fire any pending events until the pending event queue is empty. */
|
| - void _handlePendingEvents() {
|
| - assert(_isInactive);
|
| - if (!_hasPendingEvent) return;
|
| - _PendingEvents events = _pendingEvents;
|
| - do {
|
| - if (_isPaused) return;
|
| - if (events.isScheduled) events.cancelSchedule();
|
| - events.handleNext(this);
|
| - } while (!events.isEmpty);
|
| - }
|
| + // _EventSink interface.
|
|
|
| - /**
|
| - * Send a data event directly to each subscriber.
|
| - */
|
| - _sendData(T value) {
|
| - assert(!_isPaused);
|
| - assert(!_isComplete);
|
| - if (!_hasListener) return;
|
| - _forEachSubscriber((subscriber) {
|
| - try {
|
| - subscriber._sendData(value);
|
| - } catch (e, s) {
|
| - _throwDelayed(e, s);
|
| - }
|
| - });
|
| + void _add(T data) {
|
| + assert(!_isClosed);
|
| + if (_isCanceled) return;
|
| + if (_canFire) {
|
| + _sendData(data);
|
| + } else {
|
| + _addPending(new _DelayedData(data));
|
| + }
|
| }
|
|
|
| - /**
|
| - * Sends an error event directly to each subscriber.
|
| - */
|
| - void _sendError(error) {
|
| - assert(!_isPaused);
|
| - assert(!_isComplete);
|
| - if (!_hasListener) return;
|
| - _forEachSubscriber((subscriber) {
|
| - try {
|
| - subscriber._sendError(error);
|
| - } catch (e, s) {
|
| - _throwDelayed(e, s);
|
| - }
|
| - });
|
| + void _addError(Object error) {
|
| + if (_isCanceled) return;
|
| + if (_canFire) {
|
| + _sendError(error); // Reports cancel after sending.
|
| + } else {
|
| + _addPending(new _DelayedError(error));
|
| + }
|
| }
|
|
|
| - /**
|
| - * Sends the "done" message directly to each subscriber.
|
| - * This automatically stops further subscription and
|
| - * unsubscribes all subscribers.
|
| - */
|
| - void _sendDone() {
|
| - assert(!_isPaused);
|
| - assert(_isClosed);
|
| - _setComplete();
|
| - if (!_hasListener) return;
|
| - _forEachSubscriber((subscriber) {
|
| - _cancel(subscriber);
|
| - try {
|
| - subscriber._sendDone();
|
| - } catch (e, s) {
|
| - _throwDelayed(e, s);
|
| - }
|
| - });
|
| - assert(!_hasListener);
|
| + void _close() {
|
| + assert(!_isClosed);
|
| + if (_isCanceled) return;
|
| + _state |= _STATE_CLOSED;
|
| + if (_canFire) {
|
| + _sendDone();
|
| + } else {
|
| + _addPending(const _DelayedDone());
|
| + }
|
| }
|
| -}
|
| -
|
| -// -------------------------------------------------------------------
|
| -// Default implementation of a stream with a single subscriber.
|
| -// -------------------------------------------------------------------
|
| -/**
|
| - * Default implementation of stream capable of sending events to one subscriber.
|
| - *
|
| - * Any class needing to implement [Stream] can either directly extend this
|
| - * class, or extend [Stream] and delegate the subscribe method to an instance
|
| - * of this class.
|
| - *
|
| - * The only public methods are those of [Stream], so instances of
|
| - * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
|
| - * internal functionality.
|
| - *
|
| - * The [StreamController] is a public facing version of this class, with
|
| - * some methods made public.
|
| - *
|
| - * The user interface of [_SingleStreamImpl] are the following methods:
|
| - * * [_add]: Add a data event to the stream.
|
| - * * [_addError]: Add an error event to the stream.
|
| - * * [_close]: Request to close the stream.
|
| - * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
|
| - * when losing the last subscriber.
|
| - * * [_onPauseStateChange]: Called when entering or leaving paused mode.
|
| - * * [_hasListener]: Test whether there are currently any subscribers.
|
| - * * [_isInputPaused]: Test whether the stream is currently paused.
|
| - * The user should not add new events while the stream is paused, but if it
|
| - * happens anyway, the stream will enqueue the events just as when new events
|
| - * arrive while still firing an old event.
|
| - */
|
| -class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| - _StreamListener _subscriber = null;
|
|
|
| - /** Whether there is currently a subscriber on this [Stream]. */
|
| - bool get _hasListener => _subscriber != null;
|
| -
|
| - // -------------------------------------------------------------------
|
| - // Internal implementation.
|
| -
|
| - _SingleStreamImpl() {
|
| - // Start out paused.
|
| - _updatePauseCount(1);
|
| + // Hooks called when the input is paused, unpaused or canceled.
|
| + // These must not throw. If overwritten to call user code, include suitable
|
| + // try/catch wrapping and send any errors to [_throwDelayed].
|
| + void _onPause() {
|
| + assert(_isInputPaused);
|
| }
|
|
|
| - /**
|
| - * Create the new subscription object.
|
| - */
|
| - _StreamSubscriptionImpl<T> _createSubscription(
|
| - void onData(T data),
|
| - void onError(error),
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _StreamSubscriptionImpl<T>(
|
| - this, onData, onError, onDone, cancelOnError);
|
| + void _onResume() {
|
| + assert(!_isInputPaused);
|
| }
|
|
|
| - void _addListener(_StreamListener subscription) {
|
| - assert(!_isComplete);
|
| - if (_hasListener) {
|
| - throw new StateError("Stream already has subscriber.");
|
| - }
|
| - assert(_pauseCount == 1);
|
| - _updatePauseCount(-1);
|
| - _subscriber = subscription;
|
| - subscription._setSubscribed(0);
|
| - if (_isInactive) {
|
| - _checkCallbacks(false, true);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - _schedulePendingEvents();
|
| - }
|
| - }
|
| + void _onCancel() {
|
| + assert(_isCanceled);
|
| }
|
|
|
| + // Handle pending events.
|
| +
|
| /**
|
| - * Handle a cancel requested from a [_StreamSubscriptionImpl].
|
| + * Add a pending event.
|
| *
|
| - * This method is called from [_StreamSubscriptionImpl.cancel].
|
| + * If the subscription is not paused, this also schedules a firing
|
| + * of pending events later (if necessary).
|
| */
|
| - void _cancel(_StreamListener subscriber) {
|
| - assert(identical(subscriber._source, this));
|
| - // We allow unsubscribing the currently firing subscription during
|
| - // the event firing, because it is indistinguishable from delaying it since
|
| - // that event has already received the event.
|
| - if (!identical(_subscriber, subscriber)) {
|
| - // You may unsubscribe more than once, only the first one counts.
|
| - return;
|
| - }
|
| - _subscriber = null;
|
| - // Unsubscribing a paused subscription also cancels its pauses.
|
| - int resumeCount = subscriber._setUnsubscribed();
|
| - // Keep being paused while there is no subscriber and the stream is not
|
| - // complete.
|
| - _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1);
|
| - if (_isInactive) {
|
| - _checkCallbacks(true, resumeCount > 0);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - _schedulePendingEvents();
|
| + void _addPending(_DelayedEvent event) {
|
| + _StreamImplEvents pending = _pending;
|
| + if (_pending == null) pending = _pending = new _StreamImplEvents();
|
| + pending.add(event);
|
| + if (!_hasPending) {
|
| + _state |= _STATE_HAS_PENDING;
|
| + if (!_isPaused) {
|
| + _pending.schedule(this);
|
| }
|
| }
|
| }
|
|
|
| - void _forEachSubscriber(
|
| - void action(_StreamListener<T> subscription)) {
|
| + /* _EventDispatch interface. */
|
| +
|
| + void _sendData(T data) {
|
| + assert(!_isCanceled);
|
| assert(!_isPaused);
|
| + assert(!_inCallback);
|
| bool wasInputPaused = _isInputPaused;
|
| - _StreamListener subscription = _subscriber;
|
| - assert(subscription != null);
|
| - _startFiring();
|
| - action(subscription);
|
| - _endFiring(wasInputPaused);
|
| - }
|
| -}
|
| -
|
| -// -------------------------------------------------------------------
|
| -// Default implementation of a stream with subscribers.
|
| -// -------------------------------------------------------------------
|
| -
|
| -/**
|
| - * Default implementation of stream capable of sending events to subscribers.
|
| - *
|
| - * Any class needing to implement [Stream] can either directly extend this
|
| - * class, or extend [Stream] and delegate the subscribe method to an instance
|
| - * of this class.
|
| - *
|
| - * The only public methods are those of [Stream], so instances of
|
| - * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
|
| - * internal functionality.
|
| - *
|
| - * The [StreamController] is a public facing version of this class, with
|
| - * some methods made public.
|
| - *
|
| - * The user interface of [_MultiStreamImpl] are the following methods:
|
| - * * [_add]: Add a data event to the stream.
|
| - * * [_addError]: Add an error event to the stream.
|
| - * * [_close]: Request to close the stream.
|
| - * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
|
| - * when losing the last subscriber.
|
| - * * [_onPauseStateChange]: Called when entering or leaving paused mode.
|
| - * * [_hasListener]: Test whether there are currently any subscribers.
|
| - * * [_isPaused]: Test whether the stream is currently paused.
|
| - * The user should not add new events while the stream is paused, but if it
|
| - * happens anyway, the stream will enqueue the events just as when new events
|
| - * arrive while still firing an old event.
|
| - */
|
| -class _MultiStreamImpl<T> extends _StreamImpl<T>
|
| - implements _InternalLinkList {
|
| - // Link list implementation (mixin when possible).
|
| - _InternalLink _nextLink;
|
| - _InternalLink _previousLink;
|
| -
|
| - _MultiStreamImpl() {
|
| - _nextLink = _previousLink = this;
|
| + _state |= _STATE_IN_CALLBACK;
|
| + try {
|
| + _onData(data);
|
| + } catch (e, s) {
|
| + _throwDelayed(e, s);
|
| + }
|
| + _state &= ~_STATE_IN_CALLBACK;
|
| + _checkState(wasInputPaused);
|
| }
|
|
|
| - bool get isBroadcast => true;
|
| -
|
| - Stream<T> asBroadcastStream() => this;
|
| -
|
| - // ------------------------------------------------------------------
|
| - // Helper functions that can be overridden in subclasses.
|
| -
|
| - /** Whether there are currently any subscribers on this [Stream]. */
|
| - bool get _hasListener => !_InternalLinkList.isEmpty(this);
|
| -
|
| - /**
|
| - * Create the new subscription object.
|
| - */
|
| - _StreamListener<T> _createSubscription(
|
| - void onData(T data),
|
| - void onError(error),
|
| - void onDone(),
|
| - bool cancelOnError) {
|
| - return new _StreamSubscriptionImpl<T>(
|
| - this, onData, onError, onDone, cancelOnError);
|
| + void _sendError(var error) {
|
| + assert(!_isCanceled);
|
| + assert(!_isPaused);
|
| + assert(!_inCallback);
|
| + bool wasInputPaused = _isInputPaused;
|
| + _state |= _STATE_IN_CALLBACK;
|
| + try {
|
| + _onError(error);
|
| + } catch (e, s) {
|
| + _throwDelayed(e, s);
|
| + }
|
| + _state &= ~_STATE_IN_CALLBACK;
|
| + if (_cancelOnError) {
|
| + _cancel();
|
| + }
|
| + _checkState(wasInputPaused);
|
| }
|
|
|
| - // -------------------------------------------------------------------
|
| - // Internal implementation.
|
| -
|
| - /**
|
| - * Iterate over all current subscribers and perform an action on each.
|
| - *
|
| - * The set of subscribers cannot be modified during this iteration.
|
| - * All attempts to add or unsubscribe subscribers will be delayed until
|
| - * after the iteration is complete.
|
| - *
|
| - * The [action] must not throw, or the controller will be left in an
|
| - * invalid state.
|
| - *
|
| - * This method must not be called while [isFiring] is true.
|
| - */
|
| - void _forEachSubscriber(
|
| - void action(_StreamListener<T> subscription)) {
|
| - assert(!_isFiring);
|
| - if (!_hasListener) return;
|
| - bool wasInputPaused = _isInputPaused;
|
| - _startFiring();
|
| - _InternalLink cursor = this._nextLink;
|
| - while (!identical(cursor, this)) {
|
| - _StreamListener<T> current = cursor;
|
| - if (current._needsEvent(_currentEventIdBit)) {
|
| - action(current);
|
| - // Marks as having received the event.
|
| - current._toggleEventReceived();
|
| - }
|
| - cursor = current._nextLink;
|
| - if (current._isPendingUnsubscribe) {
|
| - _removeListener(current);
|
| - }
|
| + void _sendDone() {
|
| + assert(!_isCanceled);
|
| + assert(!_isPaused);
|
| + assert(!_inCallback);
|
| + _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
|
| + try {
|
| + _onDone();
|
| + } catch (e, s) {
|
| + _throwDelayed(e, s);
|
| }
|
| - _endFiring(wasInputPaused);
|
| + _onCancel(); // No checkState after cancel, it is always the last event.
|
| + _state &= ~_STATE_IN_CALLBACK;
|
| }
|
|
|
| - void _addListener(_StreamListener listener) {
|
| - listener._setSubscribed(_currentEventIdBit);
|
| - bool hadListener = _hasListener;
|
| - _InternalLinkList.add(this, listener);
|
| - if (!hadListener && _isInactive) {
|
| - _checkCallbacks(false, false);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - _schedulePendingEvents();
|
| - }
|
| - }
|
| + /**
|
| + * Call a hook function.
|
| + *
|
| + * The call is properly wrapped in code to avoid other callbacks
|
| + * during the call, and it checks for state changes after the call
|
| + * that should cause further callbacks.
|
| + */
|
| + void _guardCallback(callback) {
|
| + assert(!_inCallback);
|
| + bool wasInputPaused = _isInputPaused;
|
| + _state |= _STATE_IN_CALLBACK;
|
| + callback();
|
| + _state &= ~_STATE_IN_CALLBACK;
|
| + _checkState(wasInputPaused);
|
| }
|
|
|
| /**
|
| - * Handle a cancel requested from a [_StreamListener].
|
| + * Check if the input needs to be informed of state changes.
|
| + *
|
| + * State changes are pausing, resuming and canceling.
|
| *
|
| - * This method is called from [_StreamListener.cancel].
|
| + * After canceling, no further callbacks will happen.
|
| *
|
| - * If an event is currently firing, the cancel is delayed
|
| - * until after the subscribers have received the event.
|
| + * The cancel callback is called after a user cancel, or after
|
| + * the final done event is sent.
|
| */
|
| - void _cancel(_StreamListener listener) {
|
| - assert(identical(listener._source, this));
|
| - if (_InternalLink.isUnlinked(listener)) {
|
| - // You may unsubscribe more than once, only the first one counts.
|
| - return;
|
| + void _checkState(bool wasInputPaused) {
|
| + assert(!_inCallback);
|
| + if (_hasPending && _pending.isEmpty) {
|
| + _state &= ~_STATE_HAS_PENDING;
|
| + if (_isInputPaused && _mayResumeInput) {
|
| + _state &= ~_STATE_INPUT_PAUSED;
|
| + }
|
| }
|
| - if (_isFiring) {
|
| - if (listener._needsEvent(_currentEventIdBit)) {
|
| - assert(listener._isSubscribed);
|
| - listener._setPendingUnsubscribe(_currentEventIdBit);
|
| - } else {
|
| - // The listener has been notified of the event (or don't need to,
|
| - // if it's still pending subscription) so it's safe to remove it.
|
| - _removeListener(listener);
|
| + // If the state changes during a callback, we immediately
|
| + // make a new state-change callback. Loop until the state didn't change.
|
| + while (true) {
|
| + if (_isCanceled) {
|
| + _onCancel();
|
| + _pending = null;
|
| + return;
|
| }
|
| - // Pause and subscription state changes are reported when we end
|
| - // firing.
|
| - } else {
|
| - bool wasInputPaused = _isInputPaused;
|
| - _removeListener(listener);
|
| - if (_isInactive) {
|
| - _checkCallbacks(true, wasInputPaused);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - _schedulePendingEvents();
|
| - }
|
| + bool isInputPaused = _isInputPaused;
|
| + if (wasInputPaused == isInputPaused) break;
|
| + _state ^= _STATE_IN_CALLBACK;
|
| + if (isInputPaused) {
|
| + _onPause();
|
| + } else {
|
| + _onResume();
|
| }
|
| + _state &= ~_STATE_IN_CALLBACK;
|
| + wasInputPaused = isInputPaused;
|
| }
|
| - }
|
| -
|
| - /**
|
| - * Removes a listener from this stream and cancels its pauses.
|
| - *
|
| - * This is a low-level action that doesn't call [_onSubscriptionStateChange].
|
| - * or [_callOnPauseStateChange].
|
| - */
|
| - void _removeListener(_StreamListener listener) {
|
| - int pauseCount = listener._setUnsubscribed();
|
| - _InternalLinkList.remove(listener);
|
| - if (pauseCount > 0) {
|
| - _updatePauseCount(-pauseCount);
|
| - if (!_isPaused && _hasPendingEvent) {
|
| - _state |= _STREAM_PENDING_RESUME;
|
| - }
|
| + if (_hasPending && !_isPaused) {
|
| + _pending.schedule(this);
|
| }
|
| }
|
| }
|
|
|
| +// -------------------------------------------------------------------
|
| +// Common base class for single and multi-subscription streams.
|
| +// -------------------------------------------------------------------
|
| +abstract class _StreamImpl<T> extends Stream<T> {
|
| + // ------------------------------------------------------------------
|
| + // Stream interface.
|
|
|
| -/** Stream that generates its own events. */
|
| -class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
|
| - /**
|
| - * Initializes the stream to have only the events provided by [events].
|
| - *
|
| - * A [_PendingEvents] implementation provides events that are handled
|
| - * by calling [_PendingEvents.handleNext] with the [_StreamImpl].
|
| - */
|
| - _GeneratedSingleStreamImpl(_PendingEvents events) {
|
| - _pendingEvents = events;
|
| - _setClosed(); // Closed for input since all events are already pending.
|
| + StreamSubscription<T> listen(void onData(T data),
|
| + { void onError(error),
|
| + void onDone(),
|
| + bool cancelOnError }) {
|
| + if (onData == null) onData = _nullDataHandler;
|
| + if (onError == null) onError = _nullErrorHandler;
|
| + if (onDone == null) onDone = _nullDoneHandler;
|
| + cancelOnError = identical(true, cancelOnError);
|
| + StreamSubscription subscription =
|
| + _createSubscription(onData, onError, onDone, cancelOnError);
|
| + _onListen(subscription);
|
| + return subscription;
|
| }
|
|
|
| - void _add(T value) {
|
| - throw new UnsupportedError("Cannot inject events into generated stream");
|
| + // -------------------------------------------------------------------
|
| + /** Create a subscription object. Called by [subcribe]. */
|
| + _BufferingStreamSubscription<T> _createSubscription(
|
| + void onData(T data),
|
| + void onError(error),
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + return new _BufferingStreamSubscription<T>(
|
| + onData, onError, onDone, cancelOnError);
|
| }
|
|
|
| - void _addError(value) {
|
| - throw new UnsupportedError("Cannot inject events into generated stream");
|
| - }
|
| + /** Hook called when the subscription has been created. */
|
| + void _onListen(StreamSubscription subscription) {}
|
| +}
|
|
|
| - void _close() {
|
| - throw new UnsupportedError("Cannot inject events into generated stream");
|
| +typedef _PendingEvents _EventGenerator();
|
| +
|
| +/** Stream that generates its own events. */
|
| +class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
|
| + final _EventGenerator _pending;
|
| + /**
|
| + * Initializes the stream to have only the events provided by a
|
| + * [_PendingEvents].
|
| + *
|
| + * A new [_PendingEvents] must be generated for each listen.
|
| + */
|
| + _GeneratedStreamImpl(this._pending);
|
| +
|
| + StreamSubscription _createSubscription(void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError) {
|
| + _BufferingStreamSubscription<T> subscription =
|
| + new _BufferingStreamSubscription(
|
| + onData, onError, onDone, cancelOnError);
|
| + subscription._setPendingEvents(_pending());
|
| + return subscription;
|
| }
|
| }
|
|
|
|
|
| /** Pending events object that gets its events from an [Iterable]. */
|
| class _IterablePendingEvents<T> extends _PendingEvents {
|
| - final Iterator<T> _iterator;
|
| - /**
|
| - * Whether there are no more events to be sent.
|
| - *
|
| - * This starts out as [:false:] since there is always at least
|
| - * a 'done' event to be sent.
|
| - */
|
| - bool _isDone = false;
|
| + // The iterator providing data for data events.
|
| + // Set to null when iteration has completed.
|
| + Iterator<T> _iterator;
|
|
|
| _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
|
|
|
| - bool get isEmpty => _isDone;
|
| + bool get isEmpty => _iterator == null;
|
|
|
| - void handleNext(_StreamImpl<T> stream) {
|
| - if (_isDone) throw new StateError("No events pending.");
|
| + void handleNext(_EventDispatch dispatch) {
|
| + if (_iterator == null) {
|
| + throw new StateError("No events pending.");
|
| + }
|
| + // Send one event per call to moveNext.
|
| + // If moveNext returns true, send the current element as data.
|
| + // If moveNext returns false, send a done event and clear the _iterator.
|
| + // If moveNext throws an error, send an error and clear the _iterator.
|
| + // After an error, no further events will be sent.
|
| + bool isDone;
|
| try {
|
| - _isDone = !_iterator.moveNext();
|
| - if (!_isDone) {
|
| - stream._sendData(_iterator.current);
|
| - } else {
|
| - stream._sendDone();
|
| - }
|
| + isDone = !_iterator.moveNext();
|
| } catch (e, s) {
|
| - stream._sendError(_asyncError(e, s));
|
| - stream._sendDone();
|
| - _isDone = true;
|
| + _iterator = null;
|
| + dispatch._sendError(_asyncError(e, s));
|
| + return;
|
| + }
|
| + if (!isDone) {
|
| + dispatch._sendData(_iterator.current);
|
| + } else {
|
| + _iterator = null;
|
| + dispatch._sendDone();
|
| }
|
| }
|
| -}
|
| -
|
| -
|
| -/**
|
| - * The subscription class that the [StreamController] uses.
|
| - *
|
| - * The [_StreamImpl.createSubscription] method should
|
| - * create an object of this type, or another subclass of [_StreamListener].
|
| - * A subclass of [_StreamImpl] can specify which subclass
|
| - * of [_StreamSubscriptionImpl] it uses by overriding
|
| - * [_StreamImpl.createSubscription].
|
| - *
|
| - * The subscription is in one of three states:
|
| - * * Subscribed.
|
| - * * Paused-and-subscribed.
|
| - * * Unsubscribed.
|
| - * Unsubscribing also resumes any pauses started by the subscription.
|
| - */
|
| -class _StreamSubscriptionImpl<T> extends _StreamListener<T>
|
| - implements StreamSubscription<T> {
|
| - final bool _cancelOnError;
|
| - // TODO(ahe): Restore type when feature is implemented in dart2js
|
| - // checked mode. http://dartbug.com/7733
|
| - var /* _DataHandler<T> */ _onData;
|
| - _ErrorHandler _onError;
|
| - _DoneHandler _onDone;
|
| - _StreamSubscriptionImpl(_StreamImpl source,
|
| - this._onData,
|
| - this._onError,
|
| - this._onDone,
|
| - this._cancelOnError) : super(source);
|
| -
|
| - void onData(void handleData(T event)) {
|
| - if (handleData == null) handleData = _nullDataHandler;
|
| - _onData = handleData;
|
| - }
|
| -
|
| - void onError(void handleError(error)) {
|
| - if (handleError == null) handleError = _nullErrorHandler;
|
| - _onError = handleError;
|
| - }
|
| -
|
| - void onDone(void handleDone()) {
|
| - if (handleDone == null) handleDone = _nullDoneHandler;
|
| - _onDone = handleDone;
|
| - }
|
| -
|
| - void _sendData(T data) {
|
| - _onData(data);
|
| - }
|
| -
|
| - void _sendError(error) {
|
| - _onError(error);
|
| - if (_cancelOnError) _source._cancel(this);
|
| - }
|
| -
|
| - void _sendDone() {
|
| - _onDone();
|
| - }
|
| -
|
| - void cancel() {
|
| - if (!_isSubscribed) return;
|
| - _source._cancel(this);
|
| - }
|
| -
|
| - void pause([Future resumeSignal]) {
|
| - if (!_isSubscribed) return;
|
| - _source._pause(this, resumeSignal);
|
| - }
|
| -
|
| - void resume() {
|
| - if (!_isSubscribed || !isPaused) return;
|
| - _source._resume(this, false);
|
| - }
|
| -
|
| - Future asFuture([var futureValue]) {
|
| - _FutureImpl<T> result = new _FutureImpl<T>();
|
| -
|
| - // Overwrite the onDone and onError handlers.
|
| - onDone(() { result._setValue(futureValue); });
|
| - onError((error) {
|
| - cancel();
|
| - result._setError(error);
|
| - });
|
|
|
| - return result;
|
| + void clear() {
|
| + if (isScheduled) cancelSchedule();
|
| + _iterator = null;
|
| }
|
| }
|
|
|
| +
|
| // Internal helpers.
|
|
|
| // Types of the different handlers on a stream. Types used to type fields.
|
| @@ -998,20 +558,20 @@ void _nullErrorHandler(error) {
|
| void _nullDoneHandler() {}
|
|
|
|
|
| -/** A delayed event on a stream implementation. */
|
| +/** A delayed event on a buffering stream subscription. */
|
| abstract class _DelayedEvent {
|
| /** Added as a linked list on the [StreamController]. */
|
| _DelayedEvent next;
|
| /** Execute the delayed event on the [StreamController]. */
|
| - void perform(_StreamImpl stream);
|
| + void perform(_EventDispatch dispatch);
|
| }
|
|
|
| /** A delayed data event. */
|
| class _DelayedData<T> extends _DelayedEvent{
|
| final T value;
|
| _DelayedData(this.value);
|
| - void perform(_StreamImpl<T> stream) {
|
| - stream._sendData(value);
|
| + void perform(_EventDispatch<T> dispatch) {
|
| + dispatch._sendData(value);
|
| }
|
| }
|
|
|
| @@ -1019,16 +579,16 @@ class _DelayedData<T> extends _DelayedEvent{
|
| class _DelayedError extends _DelayedEvent {
|
| final error;
|
| _DelayedError(this.error);
|
| - void perform(_StreamImpl stream) {
|
| - stream._sendError(error);
|
| + void perform(_EventDispatch dispatch) {
|
| + dispatch._sendError(error);
|
| }
|
| }
|
|
|
| /** A delayed done event. */
|
| class _DelayedDone implements _DelayedEvent {
|
| const _DelayedDone();
|
| - void perform(_StreamImpl stream) {
|
| - stream._sendDone();
|
| + void perform(_EventDispatch dispatch) {
|
| + dispatch._sendDone();
|
| }
|
|
|
| _DelayedEvent get next => null;
|
| @@ -1118,118 +678,66 @@ abstract class _InternalLinkList extends _InternalLink {
|
| }
|
| }
|
|
|
| -/** Abstract type for an internal interface for sending events. */
|
| -abstract class _EventOutputSink<T> {
|
| - _sendData(T data);
|
| - _sendError(error);
|
| - _sendDone();
|
| -}
|
| -
|
| -abstract class _StreamListener<T> extends _InternalLink
|
| - implements _EventOutputSink<T> {
|
| - final _StreamImpl _source;
|
| - int _state = _LISTENER_UNSUBSCRIBED;
|
| -
|
| - _StreamListener(this._source);
|
| -
|
| - bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
|
| -
|
| - bool get _isPendingUnsubscribe =>
|
| - (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
|
| -
|
| - bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
|
| +/** Superclass for provider of pending events. */
|
| +abstract class _PendingEvents {
|
| + // No async event has been scheduled.
|
| + static const int _STATE_UNSCHEDULED = 0;
|
| + // An async event has been scheduled to run a function.
|
| + static const int _STATE_SCHEDULED = 1;
|
| + // An async event has been scheduled, but it will do nothing when it runs.
|
| + // Async events can't be preempted.
|
| + static const int _STATE_CANCELED = 3;
|
|
|
| /**
|
| - * Whether the listener still needs to receive the currently firing event.
|
| + * State of being scheduled.
|
| *
|
| - * The currently firing event is identified by a single bit, which alternates
|
| - * between events. The [_state] contains the previously sent event's bit in
|
| - * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
|
| - * still need the current event.
|
| - */
|
| - bool _needsEvent(int currentEventIdBit) {
|
| - int lastEventIdBit =
|
| - (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
|
| - return lastEventIdBit != currentEventIdBit;
|
| - }
|
| -
|
| - /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
|
| - /// we are currently firing an event and the subscriber still need to receive
|
| - /// the event.
|
| - void _toggleEventReceived() {
|
| - _state ^= _LISTENER_EVENT_ID;
|
| - }
|
| -
|
| - void _setSubscribed(int eventIdBit) {
|
| - assert(eventIdBit == 0 || eventIdBit == 1);
|
| - _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
|
| - }
|
| -
|
| - void _setPendingUnsubscribe(int currentEventIdBit) {
|
| - assert(_isSubscribed);
|
| - // Sets the pending unsubscribe, and ensures that the listener
|
| - // won't get the current event.
|
| - _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID;
|
| - _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT;
|
| - assert(!_needsEvent(currentEventIdBit));
|
| - }
|
| -
|
| - /**
|
| - * Marks the listener as unsubscibed.
|
| + * Set to [_STATE_SCHEDULED] when pending events are scheduled for
|
| + * async dispatch. Since we can't cancel a [runAsync] call, if schduling
|
| + * is "canceled", the _state is simply set to [_STATE_CANCELED] which will
|
| + * make the async code do nothing except resetting [_state].
|
| *
|
| - * Returns the number of unresumed pauses for the listener.
|
| + * If events are scheduled while the state is [_STATE_CANCELED], it is
|
| + * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync]
|
| + * is performed.
|
| */
|
| - int _setUnsubscribed() {
|
| - assert(_isSubscribed);
|
| - int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
|
| - _state = _LISTENER_UNSUBSCRIBED;
|
| - return timesPaused;
|
| - }
|
| + int _state = _STATE_UNSCHEDULED;
|
|
|
| - void _incrementPauseCount() {
|
| - _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
|
| - }
|
| -
|
| - void _decrementPauseCount() {
|
| - assert(isPaused);
|
| - _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
|
| - }
|
| + bool get isEmpty;
|
|
|
| - _sendData(T data);
|
| - _sendError(error);
|
| - _sendDone();
|
| -}
|
| + bool get isScheduled => _state == _STATE_SCHEDULED;
|
| + bool get _eventScheduled => _state >= _STATE_SCHEDULED;
|
|
|
| -/** Superclass for provider of pending events. */
|
| -abstract class _PendingEvents {
|
| /**
|
| - * Timer set when pending events are scheduled for execution.
|
| + * Schedule an event to run later.
|
| *
|
| - * When scheduling pending events for execution in a later cycle, the timer
|
| - * is stored here. If pending events are executed earlier than that, e.g.,
|
| - * due to a second event in the current cycle, the timer is canceled again.
|
| + * If called more than once, it should be called with the same dispatch as
|
| + * argument each time. It may reuse an earlier argument in some cases.
|
| */
|
| - Timer scheduleTimer = null;
|
| -
|
| - bool get isEmpty;
|
| -
|
| - bool get isScheduled => scheduleTimer != null;
|
| -
|
| - void schedule(_StreamImpl stream) {
|
| + void schedule(_EventDispatch dispatch) {
|
| if (isScheduled) return;
|
| - scheduleTimer = new Timer(Duration.ZERO, () {
|
| - scheduleTimer = null;
|
| - stream._handlePendingEvents();
|
| + assert(!isEmpty);
|
| + if (_eventScheduled) {
|
| + assert(_state == _STATE_CANCELED);
|
| + _state = _STATE_SCHEDULED;
|
| + return;
|
| + }
|
| + runAsync(() {
|
| + int oldState = _state;
|
| + _state = _STATE_UNSCHEDULED;
|
| + if (oldState == _STATE_CANCELED) return;
|
| + handleNext(dispatch);
|
| });
|
| + _state = _STATE_SCHEDULED;
|
| }
|
|
|
| void cancelSchedule() {
|
| - assert(isScheduled);
|
| - scheduleTimer.cancel();
|
| - scheduleTimer = null;
|
| + if (isScheduled) _state = _STATE_CANCELED;
|
| }
|
|
|
| - void handleNext(_StreamImpl stream);
|
| + void handleNext(_EventDispatch dispatch);
|
| +
|
| + /** Throw away any pending events and cancel scheduled events. */
|
| + void clear();
|
| }
|
|
|
|
|
| @@ -1242,8 +750,6 @@ class _StreamImplEvents extends _PendingEvents {
|
|
|
| bool get isEmpty => lastPendingEvent == null;
|
|
|
| - bool get isScheduled => scheduleTimer != null;
|
| -
|
| void add(_DelayedEvent event) {
|
| if (lastPendingEvent == null) {
|
| firstPendingEvent = lastPendingEvent = event;
|
| @@ -1252,122 +758,465 @@ class _StreamImplEvents extends _PendingEvents {
|
| }
|
| }
|
|
|
| - void handleNext(_StreamImpl stream) {
|
| + void handleNext(_EventDispatch dispatch) {
|
| assert(!isScheduled);
|
| _DelayedEvent event = firstPendingEvent;
|
| firstPendingEvent = event.next;
|
| if (firstPendingEvent == null) {
|
| lastPendingEvent = null;
|
| }
|
| - event.perform(stream);
|
| + event.perform(dispatch);
|
| + }
|
| +
|
| + void clear() {
|
| + if (isScheduled) cancelSchedule();
|
| + firstPendingEvent = lastPendingEvent = null;
|
| }
|
| }
|
|
|
| +class _MultiplexerLinkedList {
|
| + _MultiplexerLinkedList _next;
|
| + _MultiplexerLinkedList _previous;
|
|
|
| -class _DoneSubscription<T> implements StreamSubscription<T> {
|
| - _DoneHandler _handler;
|
| - Timer _timer;
|
| - int _pauseCount = 0;
|
| + void _unlink() {
|
| + _previous._next = _next;
|
| + _next._previous = _previous;
|
| + _next = _previous = this;
|
| + }
|
|
|
| - _DoneSubscription(this._handler) {
|
| - _delayDone();
|
| + void _insertBefore(_MultiplexerLinkedList newNext) {
|
| + _MultiplexerLinkedList newPrevious = newNext._previous;
|
| + newPrevious._next = this;
|
| + newNext._previous = _previous;
|
| + _previous._next = newNext;
|
| + _previous = newPrevious;
|
| }
|
| +}
|
|
|
| - void _delayDone() {
|
| - assert(_timer == null && _pauseCount == 0);
|
| - _timer = new Timer(Duration.ZERO, () {
|
| - if (_handler != null) _handler();
|
| - });
|
| +/**
|
| + * A subscription used by [_SingleStreamMultiplexer].
|
| + *
|
| + * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
|
| + * listeners at a time. It is used to implement [Stream.asBroadcastStream].
|
| + *
|
| + * It is itself listening to another stream for events, and it forwards all
|
| + * events to all of its simultanous listeners.
|
| + *
|
| + * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
|
| + */
|
| +// TODO(lrn): Change "implements" to "with" when automatic mixin constructors
|
| +// are implemented.
|
| +class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
|
| + implements _MultiplexerLinkedList {
|
| + static const int _STATE_NOT_LISTENING = 0;
|
| + // Bit that alternates between event firings. If bit matches the one currently
|
| + // firing, the subscription will not be notified.
|
| + static const int _STATE_EVENT_ID_BIT = 1;
|
| + // Whether the subscription is listening at all. This should be set while
|
| + // it is part of the linked list of listeners of a multiplexer stream.
|
| + static const int _STATE_LISTENING = 2;
|
| + // State bit set while firing an event.
|
| + static const int _STATE_IS_FIRING = 4;
|
| + // Bit set if a subscription is canceled while it's firing (the
|
| + // [_STATE_IS_FIRING] bit is set).
|
| + // If the subscription is canceled while firing, it is not removed from the
|
| + // linked list immediately (to avoid breaking iteration), but is instead
|
| + // removed after it is done firing.
|
| + static const int _STATE_REMOVE_AFTER_FIRING = 8;
|
| +
|
| + // Firing state.
|
| + int _multiplexState;
|
| +
|
| + _SingleStreamMultiplexer _source;
|
| +
|
| + _MultiplexerSubscription(this._source,
|
| + void onData(T data),
|
| + void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError,
|
| + int nextEventId)
|
| + : _multiplexState = _STATE_LISTENING | nextEventId,
|
| + super(onData, onError, onDone, cancelOnError) {
|
| + _next = _previous = this;
|
| + }
|
| +
|
| + // Mixin workaround.
|
| + _MultiplexerLinkedList _next;
|
| + _MultiplexerLinkedList _previous;
|
| +
|
| + void _unlink() {
|
| + _previous._next = _next;
|
| + _next._previous = _previous;
|
| + _next = _previous = this;
|
| + }
|
| +
|
| + void _insertBefore(_MultiplexerLinkedList newNext) {
|
| + _MultiplexerLinkedList newPrevious = newNext._previous;
|
| + newPrevious._next = this;
|
| + newNext._previous = _previous;
|
| + _previous._next = newNext;
|
| + _previous = newPrevious;
|
| + }
|
| + // End mixin.
|
| +
|
| + bool get _isListening => _multiplexState >= _STATE_LISTENING;
|
| + bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
|
| + bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
|
| + int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
|
| +
|
| + void _setRemoveAfterFiring() {
|
| + assert(_isFiring);
|
| + _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
|
| + }
|
| +
|
| + void _startFiring() {
|
| + assert(!_isFiring);
|
| + _multiplexState |= _STATE_IS_FIRING;
|
| }
|
|
|
| - bool get _isComplete => _timer == null && _pauseCount == 0;
|
| + /// Marks listener as no longer firing, and toggles its event id.
|
| + void _endFiring() {
|
| + assert(_isFiring);
|
| + _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
|
| + }
|
|
|
| - void onData(void handleAction(T value)) {}
|
| + void _setNotListening() {
|
| + assert(_isListening);
|
| + _multiplexState = _STATE_NOT_LISTENING;
|
| + }
|
|
|
| - void onError(void handleError(error)) {}
|
| + void _onCancel() {
|
| + assert(_isListening);
|
| + _source._removeListener(this);
|
| + }
|
| +}
|
|
|
| - void onDone(void handleDone()) {
|
| - _handler = handleDone;
|
| +/**
|
| + * A stream that sends events from another stream to multiple listeners.
|
| + *
|
| + * This is used to implement [Stream.asBroadcastStream].
|
| + *
|
| + * This stream allows listening more than once.
|
| + * When the first listener is added, it starts listening on its source
|
| + * stream for events. All events from the source stream are sent to all
|
| + * active listeners. The listeners handle their own buffering.
|
| + * When the last listener cancels, the source stream subscription is also
|
| + * canceled, and no further listening is possible.
|
| + */
|
| +// TODO(lrn): change "implements" to "with" when the VM supports it.
|
| +class _SingleStreamMultiplexer<T> extends Stream<T>
|
| + implements _MultiplexerLinkedList,
|
| + _EventDispatch<T> {
|
| + final Stream<T> _source;
|
| + StreamSubscription<T> _subscription;
|
| + // Alternates between zero and one for each event fired.
|
| + // Listeners are initialized with the next event's id, and will
|
| + // only be notified if they match the event being fired.
|
| + // That way listeners added during event firing will not receive
|
| + // the current event.
|
| + int _eventId = 0;
|
| +
|
| + bool _isFiring = false;
|
| +
|
| + // Remember events added while firing.
|
| + _StreamImplEvents _pending;
|
| +
|
| + _SingleStreamMultiplexer(this._source) {
|
| + _next = _previous = this;
|
| + }
|
| +
|
| + bool get _hasPending => _pending != null && !_pending.isEmpty;
|
| +
|
| + // Should be mixin.
|
| + _MultiplexerLinkedList _next;
|
| + _MultiplexerLinkedList _previous;
|
| +
|
| + void _unlink() {
|
| + _previous._next = _next;
|
| + _next._previous = _previous;
|
| + _next = _previous = this;
|
| + }
|
| +
|
| + void _insertBefore(_MultiplexerLinkedList newNext) {
|
| + _MultiplexerLinkedList newPrevious = newNext._previous;
|
| + newPrevious._next = this;
|
| + newNext._previous = _previous;
|
| + _previous._next = newNext;
|
| + _previous = newPrevious;
|
| + }
|
| + // End of mixin.
|
| +
|
| + StreamSubscription<T> listen(void onData(T data),
|
| + { void onError(Object error),
|
| + void onDone(),
|
| + bool cancelOnError }) {
|
| + if (onData == null) onData = _nullDataHandler;
|
| + if (onError == null) onError = _nullErrorHandler;
|
| + if (onDone == null) onDone = _nullDoneHandler;
|
| + cancelOnError = identical(true, cancelOnError);
|
| + _MultiplexerSubscription subscription =
|
| + new _MultiplexerSubscription(this, onData, onError, onDone,
|
| + cancelOnError, _eventId);
|
| + if (_subscription == null) {
|
| + _subscription = _source.listen(_add, onError: _addError, onDone: _close);
|
| + }
|
| + subscription._insertBefore(this);
|
| + return subscription;
|
| }
|
|
|
| - void pause([Future signal]) {
|
| - if (_isComplete) return;
|
| - if (_timer != null) {
|
| - _timer.cancel();
|
| - _timer = null;
|
| + /** Called by [_MultiplexerSubscription.remove]. */
|
| + void _removeListener(_MultiplexerSubscription listener) {
|
| + if (listener._isFiring) {
|
| + listener._setRemoveAfterFiring();
|
| + } else {
|
| + _unlinkListener(listener);
|
| }
|
| - _pauseCount++;
|
| - if (signal != null) signal.whenComplete(resume);
|
| }
|
|
|
| - void resume() {
|
| - if (_isComplete) return;
|
| - if (_pauseCount == 0) return;
|
| - _pauseCount--;
|
| - if (_pauseCount == 0) {
|
| - _delayDone();
|
| + /** Remove a listener and close the subscription after the last one. */
|
| + void _unlinkListener(_MultiplexerSubscription listener) {
|
| + listener._setNotListening();
|
| + listener._unlink();
|
| + if (identical(_next, this)) {
|
| + // Last listener removed.
|
| + _cancel();
|
| }
|
| }
|
|
|
| - bool get isPaused => _pauseCount > 0;
|
| + void _cancel() {
|
| + StreamSubscription subscription = _subscription;
|
| + _subscription = null;
|
| + subscription.cancel();
|
| + if (_pending != null) _pending.cancelSchedule();
|
| + }
|
| +
|
| + void _forEachListener(void action(_MultiplexerSubscription listener)) {
|
| + int eventId = _eventId;
|
| + _eventId ^= 1;
|
| + _isFiring = true;
|
| + _MultiplexerLinkedList entry = _next;
|
| + // Call each listener in order. A listener can be removed during any
|
| + // other listener's event. During its own event it will only be marked
|
| + // as "to be removed", and it will be handled after the event is done.
|
| + while (!identical(entry, this)) {
|
| + _MultiplexerSubscription listener = entry;
|
| + if (listener._eventId == eventId) {
|
| + listener._startFiring();
|
| + action(listener);
|
| + listener._endFiring(); // Also toggles the event id.
|
| + }
|
| + entry = listener._next;
|
| + if (listener._removeAfterFiring) {
|
| + _unlinkListener(listener);
|
| + }
|
| + }
|
| + _isFiring = false;
|
| + }
|
|
|
| - void cancel() {
|
| - if (_isComplete) return;
|
| - if (_timer != null) {
|
| - _timer.cancel();
|
| - _timer = null;
|
| + void _add(T data) {
|
| + if (_isFiring || _hasPending) {
|
| + _StreamImplEvents pending = _pending;
|
| + if (pending == null) pending = _pending = new _StreamImplEvents();
|
| + pending.add(new _DelayedData(data));
|
| + } else {
|
| + _sendData(data);
|
| }
|
| - _pauseCount = 0;
|
| }
|
|
|
| - Future asFuture([var futureValue]) {
|
| - // TODO(floitsch): share more code.
|
| - _FutureImpl<T> result = new _FutureImpl<T>();
|
| + void _addError(Object error) {
|
| + if (_isFiring || _hasPending) {
|
| + _StreamImplEvents pending = _pending;
|
| + if (pending == null) pending = _pending = new _StreamImplEvents();
|
| + pending.add(new _DelayedError(error));
|
| + } else {
|
| + _sendError(error);
|
| + }
|
| + }
|
|
|
| - // Overwrite the onDone and onError handlers.
|
| - onDone(() { result._setValue(futureValue); });
|
| - onError((error) {
|
| - cancel();
|
| - result._setError(error);
|
| + void _close() {
|
| + if (_isFiring || _hasPending) {
|
| + _StreamImplEvents pending = _pending;
|
| + if (pending == null) pending = _pending = new _StreamImplEvents();
|
| + pending.add(const _DelayedDone());
|
| + } else {
|
| + _sendDone();
|
| + }
|
| + }
|
| +
|
| + void _sendData(T data) {
|
| + _forEachListener((_MultiplexerSubscription listener) {
|
| + listener._add(data);
|
| });
|
| + if (_hasPending) {
|
| + _pending.schedule(this);
|
| + }
|
| + }
|
|
|
| - return result;
|
| + void _sendError(Object error) {
|
| + _forEachListener((_MultiplexerSubscription listener) {
|
| + listener._addError(error);
|
| + });
|
| + if (_hasPending) {
|
| + _pending.schedule(this);
|
| + }
|
| }
|
| -}
|
|
|
| -class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
|
| - final Stream<T> _source;
|
| - StreamSubscription<T> _subscription;
|
| + void _sendDone() {
|
| + _forEachListener((_MultiplexerSubscription listener) {
|
| + listener._setRemoveAfterFiring();
|
| + listener._close();
|
| + });
|
| + }
|
| +}
|
|
|
| - _SingleStreamMultiplexer(this._source);
|
|
|
| - void _callOnPauseStateChange() {
|
| - if (_isPaused) {
|
| - if (_subscription != null) {
|
| - _subscription.pause();
|
| - }
|
| +/**
|
| + * Simple implementation of [StreamIterator].
|
| + */
|
| +class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| + // Internal state of the stream iterator.
|
| + // At any time, it is in one of these states.
|
| + // The interpretation of the [_futureOrPrefecth] field depends on the state.
|
| + // In _STATE_MOVING, the _data field holds the most recently returned
|
| + // future.
|
| + // When in one of the _STATE_EXTRA_* states, the it may hold the
|
| + // next data/error object, and the subscription is paused.
|
| +
|
| + /// The simple state where [_data] holds the data to return, and [moveNext]
|
| + /// is allowed. The subscription is actively listening.
|
| + static const int _STATE_FOUND = 0;
|
| + /// State set after [moveNext] has returned false or an error,
|
| + /// or after calling [cancel]. The subscription is always canceled.
|
| + static const int _STATE_DONE = 1;
|
| + /// State set after calling [moveNext], but before its returned future has
|
| + /// completed. Calling [moveNext] again is not allowed in this state.
|
| + /// The subscription is actively listening.
|
| + static const int _STATE_MOVING = 2;
|
| + /// States set when another event occurs while in _STATE_FOUND.
|
| + /// This extra overflow event is cached until the next call to [moveNext],
|
| + /// which will complete as if it received the event normally.
|
| + /// The subscription is paused in these states, so we only ever get one
|
| + /// event too many.
|
| + static const int _STATE_EXTRA_DATA = 3;
|
| + static const int _STATE_EXTRA_ERROR = 4;
|
| + static const int _STATE_EXTRA_DONE = 5;
|
| +
|
| + /// Subscription being listened to.
|
| + StreamSubscription _subscription;
|
| +
|
| + /// The current element represented by the most recent call to moveNext.
|
| + ///
|
| + /// Is null between the time moveNext is called and its future completes.
|
| + T _current = null;
|
| +
|
| + /// The future returned by the most recent call to [moveNext].
|
| + ///
|
| + /// Also used to store the next value/error in case the stream provides an
|
| + /// event before [moveNext] is called again. In that case, the stream will
|
| + /// be paused to prevent further events.
|
| + var _futureOrPrefetch = null;
|
| +
|
| + /// The current state.
|
| + int _state = _STATE_FOUND;
|
| +
|
| + _StreamIteratorImpl(final Stream<T> stream) {
|
| + _subscription = stream.listen(_onData,
|
| + onError: _onError,
|
| + onDone: _onDone,
|
| + cancelOnError: true);
|
| + }
|
| +
|
| + T get current => _current;
|
| +
|
| + Future<bool> moveNext() {
|
| + if (_state == _STATE_DONE) {
|
| + return new _FutureImpl<bool>.immediate(false);
|
| + }
|
| + if (_state == _STATE_MOVING) {
|
| + throw new StateError("Already waiting for next.");
|
| + }
|
| + if (_state == _STATE_FOUND) {
|
| + _state = _STATE_MOVING;
|
| + _futureOrPrefetch = new _FutureImpl<bool>();
|
| + return _futureOrPrefetch;
|
| } else {
|
| - if (_subscription != null) {
|
| - _subscription.resume();
|
| + assert(_state >= _STATE_EXTRA_DATA);
|
| + switch (_state) {
|
| + case _STATE_EXTRA_DATA:
|
| + _state = _STATE_FOUND;
|
| + _current = _futureOrPrefetch;
|
| + _futureOrPrefetch = null;
|
| + _subscription.resume();
|
| + return new FutureImpl<bool>.immediate(true);
|
| + case _STATE_EXTRA_ERROR:
|
| + Object prefetch = _futureOrPrefetch;
|
| + _cancel();
|
| + return new FutureImpl<bool>.error(prefetch);
|
| + case _STATE_EXTRA_DONE:
|
| + _cancel();
|
| + return new FutureImpl<bool>.immediate(false);
|
| }
|
| }
|
| }
|
|
|
| - /**
|
| - * Subscribe or unsubscribe on [_source] depending on whether
|
| - * [_stream] has subscribers.
|
| - */
|
| - void _onSubscriptionStateChange() {
|
| - if (_hasListener) {
|
| - assert(_subscription == null);
|
| - _subscription = _source.listen(this._add,
|
| - onError: this._addError,
|
| - onDone: this._close);
|
| + /** Clears up the internal state when the iterator ends. */
|
| + void _clear() {
|
| + _subscription = null;
|
| + _futureOrPrefetch = null;
|
| + _current = null;
|
| + _state = _STATE_DONE;
|
| + }
|
| +
|
| + void cancel() {
|
| + StreamSubscription subscription = _subscription;
|
| + if (_state == _STATE_MOVING) {
|
| + _FutureImpl<bool> hasNext = _futureOrPrefetch;
|
| + _clear();
|
| + hasNext._setValue(false);
|
| } else {
|
| - // TODO(lrn): Check why this can happen.
|
| - if (_subscription == null) return;
|
| - _subscription.cancel();
|
| - _subscription = null;
|
| + _clear();
|
| + }
|
| + subscription.cancel();
|
| + }
|
| +
|
| + void _onData(T data) {
|
| + if (_state == _STATE_MOVING) {
|
| + _current = data;
|
| + _FutureImpl<bool> hasNext = _futureOrPrefetch;
|
| + _futureOrPrefetch = null;
|
| + _state = _STATE_FOUND;
|
| + hasNext._setValue(true);
|
| + return;
|
| + }
|
| + _subscription.pause();
|
| + assert(_futureOrPrefetch == null);
|
| + _futureOrPrefetch = data;
|
| + _state = _STATE_EXTRA_DATA;
|
| + }
|
| +
|
| + void _onError(Object error) {
|
| + if (_state == _STATE_MOVING) {
|
| + _FutureImpl<bool> hasNext = _futureOrPrefetch;
|
| + // We have cancelOnError: true, so the subscription is canceled.
|
| + _clear();
|
| + hasNext._setError(error);
|
| + return;
|
| + }
|
| + _subscription.pause();
|
| + assert(_futureOrPrefetch == null);
|
| + _futureOrPrefetch = error;
|
| + _state = _STATE_EXTRA_ERROR;
|
| + }
|
| +
|
| + void _onDone() {
|
| + if (_state == _STATE_MOVING) {
|
| + _FutureImpl<bool> hasNext = _futureOrPrefetch;
|
| + _clear();
|
| + hasNext._setValue(false);
|
| + return;
|
| }
|
| + _subscription.pause();
|
| + _futureOrPrefetch = null;
|
| + _state = _STATE_EXTRA_DONE;
|
| }
|
| }
|
|
|