| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..f99dc83ba23218b7b30b32dec07ba1024bbfbfd0
|
| --- /dev/null
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -0,0 +1,1017 @@
|
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// part of dart.async;
|
| +
|
| +// States shared by single/multi stream implementations.
|
| +
|
| +/// Initial and default state where the stream can receive and send events.
|
| +const int _STREAM_OPEN = 0;
|
| +/// The stream has received a request to complete, but hasn't done so yet.
|
| +/// No further events can be aded to the stream.
|
| +const int _STREAM_CLOSED = 1;
|
| +/// The stream has completed and will no longer receive or send events.
|
| +/// Also counts as closed. The stream must not be paused when it's completed.
|
| +/// Always used in conjunction with [_STREAM_CLOSED].
|
| +const int _STREAM_COMPLETE = 2;
|
| +/// Bit that alternates between events, and listeners are updated to the
|
| +/// current value when they are notified of the event.
|
| +const int _STREAM_EVENT_ID = 4;
|
| +const int _STREAM_EVENT_ID_SHIFT = 2;
|
| +/// Bit set while firing and clear while not.
|
| +const int _STREAM_FIRING = 8;
|
| +/// The count of times a stream has paused is stored in the
|
| +/// state, shifted by this amount.
|
| +const int _STREAM_PAUSE_COUNT_SHIFT = 4;
|
| +
|
| +// States for listeners.
|
| +
|
| +/// The listener is currently not subscribed to its source stream.
|
| +const int _LISTENER_UNSUBSCRIBED = 0;
|
| +/// The listener is actively subscribed to its source stream.
|
| +const int _LISTENER_SUBSCRIBED = 1;
|
| +/// The listener is subscribed until it has been notified of the current event.
|
| +/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
|
| +const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
|
| +/// Bit that contains the last sent event's "id bit".
|
| +const int _LISTENER_EVENT_ID = 4;
|
| +const int _LISTENER_EVENT_ID_SHIFT = 2;
|
| +/// The count of times a listener has paused is stored in the
|
| +/// state, shifted by this amount.
|
| +const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
|
| +
|
| +
|
| +// -------------------------------------------------------------------
|
| +// Common base class for single and multi-subscription streams.
|
| +// -------------------------------------------------------------------
|
| +abstract class _StreamImpl<T> extends Stream<T> {
|
| + /** Current state of the stream. */
|
| + int _state = _STREAM_OPEN;
|
| +
|
| + /**
|
| + * List of pending events.
|
| + *
|
| + * If events are added to the stream (using [_add], [_signalError] or [_done])
|
| + * while the stream is paused, or while another event is firing, events will
|
| + * stored here.
|
| + * Also supports scheduling the events for later execution.
|
| + */
|
| + _StreamImplEvents _pendingEvents;
|
| +
|
| + // ------------------------------------------------------------------
|
| + // Stream interface.
|
| +
|
| + StreamSubscription listen(void onData(T data),
|
| + { void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError }) {
|
| + if (_isComplete) {
|
| + return new _DoneSubscription(onDone);
|
| + }
|
| + if (onData == null) onData = _nullDataHandler;
|
| + if (onError == null) onError = _nullErrorHandler;
|
| + if (onDone == null) onDone = _nullDoneHandler;
|
| + unsubscribeOnError = identical(true, unsubscribeOnError);
|
| + _StreamListener subscription =
|
| + _createSubscription(onData, onError, onDone, unsubscribeOnError);
|
| + _addListener(subscription);
|
| + return subscription;
|
| + }
|
| +
|
| + // ------------------------------------------------------------------
|
| + // StreamSink interface-like methods for sending events into the stream.
|
| + // It's the responsibility of the caller to ensure that the stream is not
|
| + // paused when adding events. If the stream is paused, the events will be
|
| + // queued, but it's better to not send events at all.
|
| +
|
| + /**
|
| + * Send or queue a data event.
|
| + */
|
| + void _add(T value) {
|
| + if (_isClosed) throw new StateError("Sending on closed stream");
|
| + if (!_canFireEvent) {
|
| + _addPendingEvent(new _DelayedData<T>(value));
|
| + return;
|
| + }
|
| + _sendData(value);
|
| + _handlePendingEvents();
|
| + }
|
| +
|
| + /**
|
| + * Send or enqueue an error event.
|
| + *
|
| + * If a subscription has requested to be unsubscribed on errors,
|
| + * it will be unsubscribed after receiving this event.
|
| + */
|
| + void _signalError(AsyncError error) {
|
| + if (_isClosed) throw new StateError("Sending on closed stream");
|
| + if (!_canFireEvent) {
|
| + _addPendingEvent(new _DelayedError(error));
|
| + return;
|
| + }
|
| + _sendError(error);
|
| + _handlePendingEvents();
|
| + }
|
| +
|
| + /**
|
| + * Send or enqueue a "done" message.
|
| + *
|
| + * The "done" message should be sent at most once by a stream, and it
|
| + * should be the last message sent.
|
| + */
|
| + void _close() {
|
| + if (_isClosed) throw new StateError("Sending on closed stream");
|
| + _state |= _STREAM_CLOSED;
|
| + if (!_canFireEvent) {
|
| + // You can't enqueue an event after the Done, so make it const.
|
| + _addPendingEvent(const _DelayedDone());
|
| + return;
|
| + }
|
| + _sendDone();
|
| + assert(!_hasPendingEvent);
|
| + }
|
| +
|
| + // -------------------------------------------------------------------
|
| + // Internal implementation.
|
| +
|
| + // State prediates.
|
| +
|
| + /** Whether the stream has been closed (a done event requested). */
|
| + bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
|
| +
|
| + /** Whether the stream is completed. */
|
| + bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
|
| +
|
| + /** Whether one or more active subscribers have requested a pause. */
|
| + bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
|
| +
|
| + /** Check whether the pending event queue is non-empty */
|
| + bool get _hasPendingEvent =>
|
| + _pendingEvents != null && !_pendingEvents.isEmpty;
|
| +
|
| + /** Whether we are currently firing an event. */
|
| + bool get _isFiring => (_state & _STREAM_FIRING) != 0;
|
| +
|
| + int get _currentEventIdBit =>
|
| + (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
|
| +
|
| + /** Whether there is currently a subscriber on this [Stream]. */
|
| + bool get _hasSubscribers;
|
| +
|
| + /** Whether the stream can fire a new event. */
|
| + bool get _canFireEvent => !_isFiring && !_isPaused && !_hasPendingEvent;
|
| +
|
| + // State modification.
|
| +
|
| + /** Record an increases in the number of times the listener has paused. */
|
| + void _incrementPauseCount(_StreamListener<T> listener) {
|
| + listener._incrementPauseCount();
|
| + _updatePauseCount(1);
|
| + }
|
| +
|
| + /** Record a decrease in the number of times the listener has paused. */
|
| + void _decrementPauseCount(_StreamListener<T> listener) {
|
| + assert(_isPaused);
|
| + listener._decrementPauseCount();
|
| + _updatePauseCount(-1);
|
| + }
|
| +
|
| + /** Update the stream's own pause count only. */
|
| + void _updatePauseCount(int by) {
|
| + _state += by << _STREAM_PAUSE_COUNT_SHIFT;
|
| + assert(_state >= 0);
|
| + }
|
| +
|
| + void _setClosed() {
|
| + assert(!_isClosed);
|
| + _state |= _STREAM_CLOSED;
|
| + }
|
| +
|
| + void _setComplete() {
|
| + assert(_isClosed);
|
| + _state = _state |_STREAM_COMPLETE;
|
| + }
|
| +
|
| + void _startFiring() {
|
| + assert(!_isFiring);
|
| + // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
|
| + // bit. All current subscribers will now have a _LISTENER_EVENT_ID
|
| + // that doesn't match _STREAM_EVENT_ID, and they will receive the
|
| + // event being fired.
|
| + _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
|
| + }
|
| +
|
| + void _endFiring() {
|
| + assert(_isFiring);
|
| + _state ^= _STREAM_FIRING;
|
| + }
|
| +
|
| + /**
|
| + * Record that a listener wants a pause from events.
|
| + *
|
| + * This methods is called from [_StreamListener.pause()].
|
| + * Subclasses can override this method, along with [isPaused] and
|
| + * [createSubscription], if they want to do a different handling of paused
|
| + * subscriptions, e.g., a filtering stream pausing its own source if all its
|
| + * subscribers are paused.
|
| + */
|
| + void _pause(_StreamListener<T> listener, Signal resumeSignal) {
|
| + assert(identical(listener._source, this));
|
| + if (!listener._isSubscribed) {
|
| + throw new StateError("Subscription has been canceled.");
|
| + }
|
| + assert(!_isComplete); // There can be no subscribers when complete.
|
| + bool wasPaused = _isPaused;
|
| + _incrementPauseCount(listener);
|
| + if (resumeSignal != null) {
|
| + resumeSignal.then(() { this._resume(listener, true); });
|
| + }
|
| + if (!wasPaused) {
|
| + _onPauseStateChange();
|
| + }
|
| + }
|
| +
|
| + /** Stops pausing due to one request from the given listener. */
|
| + void _resume(_StreamListener<T> listener, bool fromEvent) {
|
| + if (!listener.isPaused) return;
|
| + assert(listener._isSubscribed);
|
| + assert(_isPaused);
|
| + _decrementPauseCount(listener);
|
| + if (!_isPaused) {
|
| + _onPauseStateChange();
|
| + if (_hasPendingEvent) {
|
| + // If we can fire events now, fire any pending events right away.
|
| + if (fromEvent && !_isFiring) {
|
| + _handlePendingEvents();
|
| + } else {
|
| + _pendingEvents.schedule(this);
|
| + }
|
| + }
|
| + }
|
| + }
|
| +
|
| + /** Create a subscription object. Called by [subcribe]. */
|
| + _StreamSubscriptionImpl<T> _createSubscription(
|
| + void onData(T data),
|
| + void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError);
|
| +
|
| + /**
|
| + * Adds a listener to this stream.
|
| + */
|
| + void _addListener(_StreamSubscriptionImpl subscription);
|
| +
|
| + /**
|
| + * Handle a cancel requested from a [_StreamSubscriptionImpl].
|
| + *
|
| + * This method is called from [_StreamSubscriptionImpl.cancel].
|
| + *
|
| + * If an event is currently firing, the cancel is delayed
|
| + * until after the subscribers have received the event.
|
| + */
|
| + void _cancel(_StreamSubscriptionImpl subscriber);
|
| +
|
| + /**
|
| + * Iterate over all current subscribers and perform an action on each.
|
| + *
|
| + * Subscribers added during the iteration will not be visited.
|
| + * Subscribers unsubscribed during the iteration will only be removed
|
| + * after they have been acted on.
|
| + *
|
| + * Any change in the pause state is only reported after all subscribers have
|
| + * received the event.
|
| + *
|
| + * The [action] must not throw, or the controller will be left in an
|
| + * invalid state.
|
| + *
|
| + * This method must not be called while [isFiring] is true.
|
| + */
|
| + void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
|
| +
|
| + /**
|
| + * Called when the first subscriber requests a pause or the last a resume.
|
| + *
|
| + * Read [isPaused] to see the new state.
|
| + */
|
| + void _onPauseStateChange() {}
|
| +
|
| + /**
|
| + * Called when the first listener subscribes or the last unsubscribes.
|
| + *
|
| + * Read [hasSubscribers] to see what the new state is.
|
| + */
|
| + void _onSubscriptionStateChange() {}
|
| +
|
| + /** Add a pending event at the end of the pending event queue. */
|
| + void _addPendingEvent(_DelayedEvent event) {
|
| + if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
|
| + _pendingEvents.add(event);
|
| + }
|
| +
|
| + /** Fire any pending events until the pending event queue. */
|
| + void _handlePendingEvents() {
|
| + _StreamImplEvents events = _pendingEvents;
|
| + if (events == null) return;
|
| + while (!events.isEmpty && !_isPaused) {
|
| + events.removeFirst().perform(this);
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Send a data event directly to each subscriber.
|
| + */
|
| + _sendData(T value) {
|
| + assert(!_isPaused);
|
| + assert(!_isComplete);
|
| + _forEachSubscriber((subscriber) {
|
| + try {
|
| + subscriber._sendData(value);
|
| + } catch (e, s) {
|
| + new AsyncError(e, s).throwDelayed();
|
| + }
|
| + });
|
| + }
|
| +
|
| + /**
|
| + * Sends an error event directly to each subscriber.
|
| + */
|
| + void _sendError(AsyncError error) {
|
| + assert(!_isPaused);
|
| + assert(!_isComplete);
|
| + _forEachSubscriber((subscriber) {
|
| + try {
|
| + subscriber._sendError(error);
|
| + } catch (e, s) {
|
| + new AsyncError.withCause(e, s, error).throwDelayed();
|
| + }
|
| + });
|
| + }
|
| +
|
| + /**
|
| + * Sends the "done" message directly to each subscriber.
|
| + * This automatically stops further subscription and
|
| + * unsubscribes all subscribers.
|
| + */
|
| + void _sendDone() {
|
| + assert(!_isPaused);
|
| + assert(_isClosed);
|
| + _setComplete();
|
| + if (!_hasSubscribers) return;
|
| + _forEachSubscriber((subscriber) {
|
| + _cancel(subscriber);
|
| + try {
|
| + subscriber._sendDone();
|
| + } catch (e, s) {
|
| + new AsyncError(e, s).throwDelayed();
|
| + }
|
| + });
|
| + assert(!_hasSubscribers);
|
| + _onSubscriptionStateChange();
|
| + }
|
| +}
|
| +
|
| +// -------------------------------------------------------------------
|
| +// Default implementation of a stream with a single subscriber.
|
| +// -------------------------------------------------------------------
|
| +/**
|
| + * Default implementation of stream capable of sending events to one subscriber.
|
| + *
|
| + * Any class needing to implement [Stream] can either directly extend this
|
| + * class, or extend [Stream] and delegate the subscribe method to an instance
|
| + * of this class.
|
| + *
|
| + * The only public methods are those of [Stream], so instances of
|
| + * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
|
| + * internal functionality.
|
| + *
|
| + * The [StreamController] is a public facing version of this class, with
|
| + * some methods made public.
|
| + *
|
| + * The user interface of [_SingleStreamImpl] are the following methods:
|
| + * * [_add]: Add a data event to the stream.
|
| + * * [_signalError]: Add an error event to the stream.
|
| + * * [_close]: Request to close the stream.
|
| + * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
|
| + * when losing the last subscriber.
|
| + * * [_onPauseStateChange]: Called when entering or leaving paused mode.
|
| + * * [_hasSubscribers]: Test whether there are currently any subscribers.
|
| + * * [_isPaused]: Test whether the stream is currently paused.
|
| + * The user should not add new events while the stream is paused, but if it
|
| + * happens anyway, the stream will enqueue the events just as when new events
|
| + * arrive while still firing an old event.
|
| + */
|
| +class _SingleStreamImpl<T> extends _StreamImpl<T> {
|
| + _StreamSubscriptionImpl _subscriber = null;
|
| +
|
| + /** Whether one or more active subscribers have requested a pause. */
|
| + bool get _isPaused => !_hasSubscribers || super._isPaused;
|
| +
|
| + /** Whether there is currently a subscriber on this [Stream]. */
|
| + bool get _hasSubscribers => _subscriber != null;
|
| +
|
| + // -------------------------------------------------------------------
|
| + // Internal implementation.
|
| +
|
| + /**
|
| + * Create the new subscription object.
|
| + */
|
| + _StreamSubscriptionImpl<T> _createSubscription(
|
| + void onData(T data),
|
| + void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError) {
|
| + return new _StreamSubscriptionImpl<T>(
|
| + this, onData, onError, onDone, unsubscribeOnError);
|
| + }
|
| +
|
| + void _addListener(_StreamSubscriptionImpl subscription) {
|
| + if (_hasSubscribers) {
|
| + throw new StateError("Stream has already subscriber.");
|
| + }
|
| + _subscriber = subscription;
|
| + subscription._setSubscribed(0);
|
| + _onSubscriptionStateChange();
|
| + // TODO(floitsch): Should this be delayed?
|
| + _handlePendingEvents();
|
| + }
|
| +
|
| + /**
|
| + * Handle a cancel requested from a [_StreamSubscriptionImpl].
|
| + *
|
| + * This method is called from [_StreamSubscriptionImpl.cancel].
|
| + *
|
| + * If an event is currently firing, the cancel is delayed
|
| + * until after the subscriber has received the event.
|
| + */
|
| + void _cancel(_StreamSubscriptionImpl subscriber) {
|
| + assert(identical(subscriber._source, this));
|
| + // We allow unsubscribing the currently firing subscription during
|
| + // the event firing, because it is indistinguishable from delaying it since
|
| + // that event has already received the event.
|
| + if (!identical(_subscriber, subscriber)) {
|
| + // You may unsubscribe more than once, only the first one counts.
|
| + return;
|
| + }
|
| + _subscriber = null;
|
| + int timesPaused = subscriber._setUnsubscribed();
|
| + _updatePauseCount(-timesPaused);
|
| + if (timesPaused > 0) {
|
| + _onPauseStateChange();
|
| + }
|
| + _onSubscriptionStateChange();
|
| + }
|
| +
|
| + void _forEachSubscriber(
|
| + void action(_StreamSubscriptionImpl<T> subscription)) {
|
| + _StreamSubscriptionImpl subscription = _subscriber;
|
| + assert(subscription != null);
|
| + _startFiring();
|
| + action(subscription);
|
| + _endFiring();
|
| + }
|
| +}
|
| +
|
| +// -------------------------------------------------------------------
|
| +// Default implementation of a stream with subscribers.
|
| +// -------------------------------------------------------------------
|
| +
|
| +/**
|
| + * Default implementation of stream capable of sending events to subscribers.
|
| + *
|
| + * Any class needing to implement [Stream] can either directly extend this
|
| + * class, or extend [Stream] and delegate the subscribe method to an instance
|
| + * of this class.
|
| + *
|
| + * The only public methods are those of [Stream], so instances of
|
| + * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
|
| + * internal functionality.
|
| + *
|
| + * The [StreamController] is a public facing version of this class, with
|
| + * some methods made public.
|
| + *
|
| + * The user interface of [_MultiStreamImpl] are the following methods:
|
| + * * [_add]: Add a data event to the stream.
|
| + * * [_signalError]: Add an error event to the stream.
|
| + * * [_close]: Request to close the stream.
|
| + * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
|
| + * when losing the last subscriber.
|
| + * * [_onPauseStateChange]: Called when entering or leaving paused mode.
|
| + * * [_hasSubscribers]: Test whether there are currently any subscribers.
|
| + * * [_isPaused]: Test whether the stream is currently paused.
|
| + * The user should not add new events while the stream is paused, but if it
|
| + * happens anyway, the stream will enqueue the events just as when new events
|
| + * arrive while still firing an old event.
|
| + */
|
| +class _MultiStreamImpl<T> extends _StreamImpl<T>
|
| + implements _InternalLinkList {
|
| + // Link list implementation (mixin when possible).
|
| + _InternalLink _nextLink;
|
| + _InternalLink _previousLink;
|
| +
|
| + _MultiStreamImpl() {
|
| + _nextLink = _previousLink = this;
|
| + }
|
| +
|
| + // ------------------------------------------------------------------
|
| + // Helper functions that can be overridden in subclasses.
|
| +
|
| + /** Whether there are currently any subscribers on this [Stream]. */
|
| + bool get _hasSubscribers => !_InternalLinkList.isEmpty(this);
|
| +
|
| + /**
|
| + * Create the new subscription object.
|
| + */
|
| + _StreamListener<T> _createSubscription(
|
| + void onData(T data),
|
| + void onError(AsyncError error),
|
| + void onDone(),
|
| + bool unsubscribeOnError) {
|
| + return new _StreamSubscriptionImpl<T>(
|
| + this, onData, onError, onDone, unsubscribeOnError);
|
| + }
|
| +
|
| + // -------------------------------------------------------------------
|
| + // Internal implementation.
|
| +
|
| + /**
|
| + * Iterate over all current subscribers and perform an action on each.
|
| + *
|
| + * The set of subscribers cannot be modified during this iteration.
|
| + * All attempts to add or unsubscribe subscribers will be delayed until
|
| + * after the iteration is complete.
|
| + *
|
| + * The [action] must not throw, or the controller will be left in an
|
| + * invalid state.
|
| + *
|
| + * This method must not be called while [isFiring] is true.
|
| + */
|
| + void _forEachSubscriber(
|
| + void action(_StreamListener<T> subscription)) {
|
| + assert(!_isFiring);
|
| + if (!_hasSubscribers) return;
|
| + _startFiring();
|
| + _InternalLink cursor = this._nextLink;
|
| + while (!identical(cursor, this)) {
|
| + _StreamListener<T> current = cursor;
|
| + if (current._needsEvent(_currentEventIdBit)) {
|
| + action(current);
|
| + // Marks as having received the event.
|
| + current._toggleEventReceived();
|
| + }
|
| + cursor = current._nextLink;
|
| + if (current._isPendingUnsubscribe) {
|
| + _removeListener(current);
|
| + }
|
| + }
|
| + _endFiring();
|
| + if (_isPaused) _onPauseStateChange();
|
| + if (!_hasSubscribers) _onSubscriptionStateChange();
|
| + }
|
| +
|
| + void _addListener(_StreamListener listener) {
|
| + listener._setSubscribed(_currentEventIdBit);
|
| + bool firstSubscriber = !_hasSubscribers;
|
| + _InternalLinkList.add(this, listener);
|
| + if (firstSubscriber) {
|
| + _onSubscriptionStateChange();
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Handle a cancel requested from a [_StreamListener].
|
| + *
|
| + * This method is called from [_StreamListener.cancel].
|
| + *
|
| + * If an event is currently firing, the cancel is delayed
|
| + * until after the subscribers have received the event.
|
| + */
|
| + void _cancel(_StreamListener listener) {
|
| + assert(identical(listener._source, this));
|
| + if (_InternalLink.isUnlinked(listener)) {
|
| + // You may unsubscribe more than once, only the first one counts.
|
| + return;
|
| + }
|
| + if (_isFiring) {
|
| + if (listener._needsEvent(_currentEventIdBit)) {
|
| + assert(listener._isSubscribed);
|
| + listener._setPendingUnsubscribe();
|
| + } else {
|
| + // The listener has been notified of the event (or don't need to,
|
| + // if it's still pending subscription) so it's safe to remove it.
|
| + _removeListener(listener);
|
| + }
|
| + // Pause and subscription state changes are reported when we end
|
| + // firing.
|
| + } else {
|
| + bool wasPaused = _isPaused;
|
| + _removeListener(listener);
|
| + if (wasPaused != _isPaused) _onPauseStateChange();
|
| + if (!_hasSubscribers) _onSubscriptionStateChange();
|
| + }
|
| + }
|
| +
|
| + /**
|
| + * Removes a listener from this stream and cancels its pauses.
|
| + *
|
| + * This is a low-level action that doesn't call [_onSubscriptionStateChange].
|
| + * or [_onPauseStateChange].
|
| + */
|
| + void _removeListener(_StreamListener listener) {
|
| + int pauseCount = listener._setUnsubscribed();
|
| + _updatePauseCount(-pauseCount);
|
| + _InternalLinkList.remove(listener);
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * The subscription class that the [StreamController] uses.
|
| + *
|
| + * The [StreamController.createSubscription] method should
|
| + * create an object of this type, or another subclass of [_StreamListener].
|
| + * A subclass of [StreamController] can specify which subclass
|
| + * of [_StreamSubscriptionImpl] it uses by overriding
|
| + * [StreamController.createSubscription].
|
| + *
|
| + * The subscription is in one of three states:
|
| + * * Subscribed.
|
| + * * Paused-and-subscribed.
|
| + * * Unsubscribed.
|
| + * Unsubscribing also unpauses.
|
| + */
|
| +class _StreamSubscriptionImpl<T> extends _StreamListener<T>
|
| + implements StreamSubscription<T> {
|
| + final bool _unsubscribeOnError;
|
| + _DataHandler _onData;
|
| + _ErrorHandler _onError;
|
| + _DoneHandler _onDone;
|
| + _StreamSubscriptionImpl(_StreamImpl source,
|
| + this._onData,
|
| + this._onError,
|
| + this._onDone,
|
| + this._unsubscribeOnError) : super(source);
|
| +
|
| + void onData(void handleData(T event)) {
|
| + if (handleData == null) handleData = _nullDataHandler;
|
| + _onData = handleData;
|
| + }
|
| +
|
| + void onError(void handleError(AsyncError error)) {
|
| + if (handleError == null) handleError = _nullErrorHandler;
|
| + _onError = handleError;
|
| + }
|
| +
|
| + void onDone(void handleDone()) {
|
| + if (handleDone == null) handleDone = _nullDoneHandler;
|
| + _onDone = handleDone;
|
| + }
|
| +
|
| + void _sendData(T data) {
|
| + _onData(data);
|
| + }
|
| +
|
| + void _sendError(AsyncError error) {
|
| + _onError(error);
|
| + if (_unsubscribeOnError) _source._cancel(this);
|
| + }
|
| +
|
| + void _sendDone() {
|
| + _onDone();
|
| + }
|
| +
|
| + void cancel() {
|
| + _source._cancel(this);
|
| + }
|
| +
|
| + void pause([Signal resumeSignal]) {
|
| + _source._pause(this, resumeSignal);
|
| + }
|
| +
|
| + void resume() {
|
| + if (!isPaused) {
|
| + throw new StateError("Resuming unpaused subscription");
|
| + }
|
| + _source._resume(this, false);
|
| + }
|
| +}
|
| +
|
| +// Internal helpers.
|
| +
|
| +// Types of the different handlers on a stream. Types used to type fields.
|
| +typedef void _DataHandler<T>(T value);
|
| +typedef void _ErrorHandler(AsyncError error);
|
| +typedef void _DoneHandler();
|
| +
|
| +
|
| +/** Default data handler, does nothing. */
|
| +void _nullDataHandler(var value) {}
|
| +
|
| +/** Default error handler, reports the error to the global handler. */
|
| +void _nullErrorHandler(AsyncError error) {
|
| + error.throwDelayed();
|
| +}
|
| +
|
| +/** Default done handler, does nothing. */
|
| +void _nullDoneHandler() {}
|
| +
|
| +
|
| +/** A delayed event on a stream implementation. */
|
| +abstract class _DelayedEvent {
|
| + /** Added as a linked list on the [StreamController]. */
|
| + _DelayedEvent next;
|
| + /** Execute the delayed event on the [StreamController]. */
|
| + void perform(_StreamImpl stream);
|
| +}
|
| +
|
| +/** A delayed data event. */
|
| +class _DelayedData<T> extends _DelayedEvent{
|
| + T value;
|
| + _DelayedData(this.value);
|
| + void perform(_StreamImpl<T> stream) {
|
| + stream._sendData(value);
|
| + }
|
| +}
|
| +
|
| +/** A delayed error event. */
|
| +class _DelayedError extends _DelayedEvent {
|
| + AsyncError error;
|
| + _DelayedError(this.error);
|
| + void perform(_StreamImpl stream) {
|
| + stream._sendError(error);
|
| + }
|
| +}
|
| +
|
| +/** A delayed done event. */
|
| +class _DelayedDone implements _DelayedEvent {
|
| + const _DelayedDone();
|
| + void perform(_StreamImpl stream) {
|
| + stream._sendDone();
|
| + }
|
| +
|
| + _DelayedEvent get next => null;
|
| +
|
| + void set next(_DelayedEvent _) {
|
| + throw new StateError("No events after a done.");
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Simple internal doubly-linked list implementation.
|
| + *
|
| + * In an internal linked list, the links are in the data objects themselves,
|
| + * instead of in a separate object. That means each element can be in at most
|
| + * one list at a time.
|
| + *
|
| + * All links are always members of an element cycle. At creation it's a
|
| + * singleton cycle.
|
| + */
|
| +abstract class _InternalLink {
|
| + _InternalLink _nextLink;
|
| + _InternalLink _previousLink;
|
| +
|
| + _InternalLink() {
|
| + this._previousLink = this._nextLink = this;
|
| + }
|
| +
|
| + /* Removes a link from any list it may be part of, and links it to itself. */
|
| + static void unlink(_InternalLink element) {
|
| + _InternalLink next = element._nextLink;
|
| + _InternalLink previous = element._previousLink;
|
| + next._previousLink = previous;
|
| + previous._nextLink = next;
|
| + element._nextLink = element._previousLink = element;
|
| + }
|
| +
|
| + /** Check whether an element is unattached to other elements. */
|
| + static bool isUnlinked(_InternalLink element) {
|
| + return identical(element, element._nextLink);
|
| + }
|
| +}
|
| +
|
| +/**
|
| + * Marker interface for "list" links.
|
| + *
|
| + * An "InternalLinkList" is an abstraction on top of a link cycle, where the
|
| + * "list" object itself is not considered an element (it's just a header link
|
| + * created to avoid edge cases).
|
| + * An element is considered part of a list if it is in the list's cycle.
|
| + * There should never be more than one "list" object in a cycle.
|
| + */
|
| +abstract class _InternalLinkList extends _InternalLink {
|
| + /**
|
| + * Adds an element to a list, just before the header link.
|
| + *
|
| + * This effectively adds it at the end of the list.
|
| + */
|
| + static void add(_InternalLinkList list, _InternalLink element) {
|
| + if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
|
| + _InternalLink listEnd = list._previousLink;
|
| + listEnd._nextLink = element;
|
| + list._previousLink = element;
|
| + element._previousLink = listEnd;
|
| + element._nextLink = list;
|
| + }
|
| +
|
| + /** Removes an element from its list. */
|
| + static void remove(_InternalLink element) {
|
| + _InternalLink.unlink(element);
|
| + }
|
| +
|
| + /** Check whether a list contains no elements, only the header link. */
|
| + static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
|
| +
|
| + /** Moves all elements from the list [other] to [list]. */
|
| + static void addAll(_InternalLinkList list, _InternalLinkList other) {
|
| + if (isEmpty(other)) return;
|
| + _InternalLink listLast = list._previousLink;
|
| + _InternalLink otherNext = other._nextLink;
|
| + listLast._nextLink = otherNext;
|
| + otherNext._previousLink = listLast;
|
| + _InternalLink otherLast = other._previousLink;
|
| + list._previousLink = otherLast;
|
| + otherLast._nextLink = list;
|
| + // Clean up [other].
|
| + other._nextLink = other._previousLink = other;
|
| + }
|
| +}
|
| +
|
| +abstract class _StreamListener<T> extends _InternalLink {
|
| + final _StreamImpl _source;
|
| + int _state = _LISTENER_UNSUBSCRIBED;
|
| +
|
| + _StreamListener(this._source);
|
| +
|
| + bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
|
| +
|
| + bool get _isPendingUnsubscribe =>
|
| + (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
|
| +
|
| + bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
|
| +
|
| + /**
|
| + * Whether the listener still needs to receive the currently firing event.
|
| + *
|
| + * The currently firing event is identified by a single bit, which alternates
|
| + * between events. The [_state] contains the previously sent event's bit in
|
| + * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
|
| + * still need the current event.
|
| + */
|
| + bool _needsEvent(int currentEventIdBit) {
|
| + int lastEventIdBit =
|
| + (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
|
| + return lastEventIdBit != currentEventIdBit;
|
| + }
|
| +
|
| + /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
|
| + /// we are currently firing an event and the subscriber still need to receive
|
| + /// the event.
|
| + void _toggleEventReceived() {
|
| + _state ^= _LISTENER_EVENT_ID;
|
| + }
|
| +
|
| + void _setSubscribed(int eventIdBit) {
|
| + assert(eventIdBit == 0 || eventIdBit == 1);
|
| + _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
|
| + }
|
| +
|
| + void _setPendingUnsubscribe() {
|
| + assert(_isSubscribed);
|
| + _state |= _LISTENER_PENDING_UNSUBSCRIBE;
|
| + }
|
| +
|
| + /**
|
| + * Marks the listener as unsubscibed.
|
| + *
|
| + * Returns the number of unresumed pauses for the listener.
|
| + */
|
| + int _setUnsubscribed() {
|
| + assert(_isSubscribed);
|
| + int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
|
| + _state = _LISTENER_UNSUBSCRIBED;
|
| + return timesPaused;
|
| + }
|
| +
|
| + void _incrementPauseCount() {
|
| + _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
|
| + }
|
| +
|
| + void _decrementPauseCount() {
|
| + assert(isPaused);
|
| + _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
|
| + }
|
| +
|
| + _sendData(T data);
|
| + _sendError(AsyncError error);
|
| + _sendDone();
|
| +}
|
| +
|
| +/** Class holding pending events for a [_StreamImpl]. */
|
| +class _StreamImplEvents {
|
| + /// Single linked list of [_DelayedEvent] objects.
|
| + _DelayedEvent firstPendingEvent = null;
|
| + /// Last element in the list of pending events. New events are added after it.
|
| + _DelayedEvent lastPendingEvent = null;
|
| + /**
|
| + * Timer set when pending events are scheduled for execution.
|
| + *
|
| + * When scheduling pending events for execution in a later cycle, the timer
|
| + * is stored here. If pending events are executed earlier than that, e.g.,
|
| + * due to a second event in the current cycle, the timer is canceled again.
|
| + */
|
| + Timer scheduleTimer = null;
|
| +
|
| + bool get isEmpty => lastPendingEvent == null;
|
| +
|
| + bool get isScheduled => scheduleTimer != null;
|
| +
|
| + void schedule(_StreamImpl stream) {
|
| + if (isScheduled) return;
|
| + scheduleTimer = new Timer(0, (_) {
|
| + scheduleTimer = null;
|
| + stream._handlePendingEvents();
|
| + });
|
| + }
|
| +
|
| + void cancelSchedule() {
|
| + assert(isScheduled);
|
| + scheduleTimer.cancel();
|
| + scheduleTimer = null;
|
| + }
|
| +
|
| + void add(_DelayedEvent event) {
|
| + if (lastPendingEvent == null) {
|
| + firstPendingEvent = lastPendingEvent = event;
|
| + } else {
|
| + lastPendingEvent = lastPendingEvent.next = event;
|
| + }
|
| + }
|
| +
|
| + _DelayedEvent removeFirst() {
|
| + if (isScheduled) cancelSchedule();
|
| + _DelayedEvent event = firstPendingEvent;
|
| + firstPendingEvent = event.next;
|
| + if (firstPendingEvent == null) {
|
| + lastPendingEvent = null;
|
| + }
|
| + return event;
|
| + }
|
| +}
|
| +
|
| +
|
| +class _DoneSubscription<T> implements StreamSubscription<T> {
|
| + _DoneHandler _handler;
|
| + Timer _timer;
|
| + int _pauseCount = 0;
|
| +
|
| + _DoneSubscription(this._handler) {
|
| + _delayDone();
|
| + }
|
| +
|
| + void _delayDone() {
|
| + assert(_timer == null && _pauseCount == 0);
|
| + _timer = new Timer(0, (_) {
|
| + if (_handler != null) _handler();
|
| + });
|
| + }
|
| +
|
| + bool get _isComplete => _timer == null && _pauseCount == 0;
|
| +
|
| + void onData(void handleAction(T value)) {}
|
| + void onError(void handleError(StateError error)) {}
|
| + void onDone(void handleDone(T value)) {
|
| + _handler = handleDone;
|
| + }
|
| +
|
| + void pause([Signal signal]) {
|
| + if (_isComplete) {
|
| + throw new StateError("Subscription has been canceled.");
|
| + }
|
| + if (_timer != null) _timer.cancel();
|
| + _pauseCount++;
|
| + }
|
| +
|
| + void resume() {
|
| + if (_isComplete) {
|
| + throw new StateError("Subscription has been canceled.");
|
| + }
|
| + if (_pauseCount == 0) return;
|
| + _pauseCount--;
|
| + if (_pauseCount == 0) {
|
| + _delayDone();
|
| + }
|
| + }
|
| +
|
| + bool get isPaused => _pauseCount > 0;
|
| +
|
| + void cancel() {
|
| + if (_isComplete) {
|
| + throw new StateError("Subscription has been canceled.");
|
| + }
|
| + if (_timer != null) {
|
| + _timer.cancel();
|
| + _timer = null;
|
| + }
|
| + _pauseCount = 0;
|
| + }
|
| +}
|
|
|