Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(361)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 8fa1848d14ac0261b2943dfc41c4ba314d457162..3d704856e73355adadbc6cfe424f7140eef2450c 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -4,57 +4,6 @@
part of dart.async;
-// States shared by single/multi stream implementations.
-
-// Completion state of the stream.
-/// Initial and default state where the stream can receive and send events.
-const int _STREAM_OPEN = 0;
-/// The stream has received a request to complete, but hasn't done so yet.
-/// No further events can be added to the stream.
-const int _STREAM_CLOSED = 1;
-/// The stream has completed and will no longer receive or send events.
-/// Also counts as closed. The stream must not be paused when it's completed.
-/// Always used in conjunction with [_STREAM_CLOSED].
-const int _STREAM_COMPLETE = 2;
-
-/// Bit that alternates between events, and listeners are updated to the
-/// current value when they are notified of the event.
-const int _STREAM_EVENT_ID = 4;
-const int _STREAM_EVENT_ID_SHIFT = 2;
-
-// The activity state of the stream: What is it currently doing.
-/// Bit set while firing and clear while not.
-const int _STREAM_FIRING = 8;
-/// Bit set while calling a pause-state or subscription-state change callback.
-const int _STREAM_CALLBACK = 16;
-
-// The pause state of the stream.
-/// Bit set when resuming with pending events. Cleared after all pending events
-/// have been transmitted. Means that the controller still considers the
-/// stream paused, even if the listener doesn't.
-const int _STREAM_PENDING_RESUME = 32;
-/// The count of times a stream has paused is stored in the
-/// state, shifted by this amount.
-const int _STREAM_PAUSE_COUNT_SHIFT = 6;
-
-// States for listeners.
-
-/// The listener is currently not subscribed to its source stream.
-const int _LISTENER_UNSUBSCRIBED = 0;
-/// The listener is actively subscribed to its source stream.
-const int _LISTENER_SUBSCRIBED = 1;
-/// The listener is subscribed until it has been notified of the current event.
-/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
-const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
-
-/// Bit that contains the last sent event's "id bit".
-const int _LISTENER_EVENT_ID = 4;
-const int _LISTENER_EVENT_ID_SHIFT = 2;
-
-/// The count of times a listener has paused is stored in the
-/// state, shifted by this amount.
-const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
-
/** Throws the given error in the next cycle. */
_throwDelayed(var error, [Object stackTrace]) {
// We are going to reach the top-level here, but there might be a global
@@ -69,915 +18,526 @@ _throwDelayed(var error, [Object stackTrace]) {
});
}
+/** Abstract and private interface for a place to put events. */
+abstract class _EventSink<T> {
+ void _add(T data);
+ void _addError(Object error);
+ void _close();
+}
-// -------------------------------------------------------------------
-// Common base class for single and multi-subscription streams.
-// -------------------------------------------------------------------
-abstract class _StreamImpl<T> extends Stream<T> {
- /** Current state of the stream. */
- int _state = _STREAM_OPEN;
-
- /**
- * List of pending events.
- *
- * If events are added to the stream (using [_add], [_addError] or [_done])
- * while the stream is paused, or while another event is firing, events will
- * stored here.
- * Also supports scheduling the events for later execution.
- */
- _PendingEvents _pendingEvents;
-
- // ------------------------------------------------------------------
- // Stream interface.
-
- StreamSubscription<T> listen(void onData(T data),
- { void onError(error),
- void onDone(),
- bool cancelOnError }) {
- if (_isComplete) {
- return new _DoneSubscription(onDone);
- }
- if (onData == null) onData = _nullDataHandler;
- if (onError == null) onError = _nullErrorHandler;
- if (onDone == null) onDone = _nullDoneHandler;
- cancelOnError = identical(true, cancelOnError);
- _StreamSubscriptionImpl subscription =
- _createSubscription(onData, onError, onDone, cancelOnError);
- _addListener(subscription);
- return subscription;
- }
-
- // ------------------------------------------------------------------
- // EventSink interface-like methods for sending events into the stream.
- // It's the responsibility of the caller to ensure that the stream is not
- // paused when adding events. If the stream is paused, the events will be
- // queued, but it's better to not send events at all.
+/**
+ * Abstract and private interface for a place to send events.
+ *
+ * Used by event buffering to finally dispatch the pending event, where
+ * [_EventSink] is where the event first enters the stream subscription,
+ * and may yet be buffered.
+ */
+abstract class _EventDispatch<T> {
+ void _sendData(T data);
+ void _sendError(Object error);
+ void _sendDone();
+}
+/**
+ * Default implementation of stream subscription of buffering events.
+ *
+ * The only public methods are those of [StreamSubscription], so instances of
+ * [_BufferingStreamSubscription] can be returned directly as a
+ * [StreamSubscription] without exposing internal functionality.
+ *
+ * The [StreamController] is a public facing version of [Stream] and this class,
+ * with some methods made public.
+ *
+ * The user interface of [_BufferingStreamSubscription] are the following
+ * methods:
+ * * [_add]: Add a data event to the stream.
+ * * [_addError]: Add an error event to the stream.
+ * * [_close]: Request to close the stream.
+ * * [_onCancel]: Called when the subscription will provide no more events,
+ * either due to being actively canceled, or after sending a done event.
+ * * [_onPause]: Called when the subscription wants the event source to pause.
+ * * [_onResume]: Called when allowing new events after a pause.
+ * The user should not add new events when the subscription requests a paused,
+ * but if it happens anyway, the subscription will enqueue the events just as
+ * when new events arrive while still firing an old event.
+ */
+class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
+ _EventSink<T>,
+ _EventDispatch<T> {
+ /** The `cancelOnError` flag from the `listen` call. */
+ static const int _STATE_CANCEL_ON_ERROR = 1;
/**
- * Send or queue a data event.
+ * Whether the "done" event has been received.
+ * No further events are accepted after this.
*/
- void _add(T value) {
- if (_isClosed) throw new StateError("Sending on closed stream");
- if (!_mayFireState) {
- // Not the time to send events.
- _addPendingEvent(new _DelayedData<T>(value));
- return;
- }
- if (_hasPendingEvent) {
- _addPendingEvent(new _DelayedData<T>(value));
- } else {
- _sendData(value);
- }
- _handlePendingEvents();
- }
-
+ static const int _STATE_CLOSED = 2;
/**
- * Send or enqueue an error event.
+ * Set if the input has been asked not to send events.
*
- * If a subscription has requested to be unsubscribed on errors,
- * it will be unsubscribed after receiving this event.
+ * This is not the same as being paused, since the input will remain paused
+ * after a call to [resume] if there are pending events.
*/
- void _addError(error) {
- if (_isClosed) throw new StateError("Sending on closed stream");
- if (!_mayFireState) {
- // Not the time to send events.
- _addPendingEvent(new _DelayedError(error));
- return;
- }
- if (_hasPendingEvent) {
- _addPendingEvent(new _DelayedError(error));
- } else {
- _sendError(error);
- }
- _handlePendingEvents();
- }
-
+ static const int _STATE_INPUT_PAUSED = 4;
/**
- * Send or enqueue a "done" message.
+ * Whether the subscription has been canceled.
*
- * The "done" message should be sent at most once by a stream, and it
- * should be the last message sent.
+ * Set by calling [cancel], or by handling a "done" event, or an "error" event
+ * when `cancelOnError` is true.
*/
- void _close() {
- if (_isClosed) return;
- _state |= _STREAM_CLOSED;
- if (!_mayFireState) {
- // Not the time to send events.
- _addPendingEvent(const _DelayedDone());
- return;
- }
- if (_hasPendingEvent) {
- _addPendingEvent(new _DelayedDone());
- _handlePendingEvents();
- } else {
- _sendDone();
- assert(_isComplete);
- assert(!_hasPendingEvent);
- }
- }
-
- // -------------------------------------------------------------------
- // Internal implementation.
-
- // State predicates.
-
- // Lifecycle state.
- /** Whether the stream is in the default, open, state for events. */
- bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0;
-
- /** Whether the stream has been closed (a done event requested). */
- bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
-
- /** Whether the stream is completed. */
- bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
-
- // Pause state.
-
- /** Whether one or more active subscribers have requested a pause. */
- bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
+ static const int _STATE_CANCELED = 8;
+ static const int _STATE_IN_CALLBACK = 16;
+ static const int _STATE_HAS_PENDING = 32;
+ static const int _STATE_PAUSE_COUNT = 64;
+ static const int _STATE_PAUSE_COUNT_SHIFT = 6;
+
+ /* Event handlers provided in constructor. */
+ /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands
+ * parameterized function types. */
+ Function _onData;
+ _ErrorHandler _onError;
+ _DoneHandler _onDone;
- /** How many times the stream has been paused. */
- int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT;
+ /** Bit vector based on state-constants above. */
+ int _state;
/**
- * Whether a controller thinks the stream is paused.
+ * Queue of pending events.
*
- * When this changes, a pause-state change callback is performed.
- *
- * It may differ from [_isPaused] if there are pending events
- * in the queue when the listeners resume. The controller won't
- * be informed until all queued events have been fired.
+ * Is created when necessary, or set in constructor for preconfigured events.
*/
- bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME);
-
- /** Whether we have a pending resume scheduled. */
- bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0;
-
-
- // Action state. If the stream makes a call-out to external code,
- // this state tracks it and avoids reentrancy problems.
-
- /** Whether the stream is not currently firing or calling a callback. */
- bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0;
+ _PendingEvents _pending;
- /** Whether we are currently executing a state-chance callback. */
- bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0;
-
- /** Whether we are currently firing an event. */
- bool get _isFiring => (_state & _STREAM_FIRING) != 0;
-
- /** Check whether the pending event queue is non-empty */
- bool get _hasPendingEvent =>
- _pendingEvents != null && !_pendingEvents.isEmpty;
+ _BufferingStreamSubscription(this._onData,
+ this._onError,
+ this._onDone,
+ bool cancelOnError)
+ : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
+ assert(_onData != null);
+ assert(_onError != null);
+ assert(_onDone != null);
+ }
/**
- * The bit representing the current or last event fired.
+ * Sets the subscription's pending events object.
*
- * This bit matches a bit on listeners that have received the corresponding
- * event. It is toggled for each new event being fired.
+ * This can only be done once. The pending events object is used for the
+ * rest of the subscription's life cycle.
*/
- int get _currentEventIdBit =>
- (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
-
- /** Whether there is currently a subscriber on this [Stream]. */
- bool get _hasListener;
-
-
- /** Whether the state bits allow firing. */
- bool get _mayFireState {
- // The state allows firing unless:
- // - it's currently firing
- // - it's currently in a callback
- // - it's paused
- const int mask =
- _STREAM_FIRING |
- _STREAM_CALLBACK |
- ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1);
- return (_state & mask) == 0;
- }
-
- // State modification.
-
- /** Record an increases in the number of times the listener has paused. */
- void _incrementPauseCount(_StreamListener<T> listener) {
- listener._incrementPauseCount();
- _state &= ~_STREAM_PENDING_RESUME;
- _updatePauseCount(1);
- }
-
- /** Record a decrease in the number of times the listener has paused. */
- void _decrementPauseCount(_StreamListener<T> listener) {
- assert(_isPaused);
- listener._decrementPauseCount();
- _updatePauseCount(-1);
- }
-
- /** Update the stream's own pause count only. */
- void _updatePauseCount(int by) {
- int oldState = _state;
- // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js
- // converts the result of the left-shift to a positive number.
- if (by >= 0) {
- _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT);
- } else {
- _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT);
+ void _setPendingEvents(_PendingEvents pendingEvents) {
+ assert(_pending == null);
+ if (pendingEvents == null) return;
+ _pending = pendingEvents;
+ if (!pendingEvents.isEmpty) {
+ _state |= _STATE_HAS_PENDING;
+ _pending.schedule(this);
}
- assert(_state >= 0);
- assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) ==
- (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by);
}
- void _setClosed() {
- assert(!_isClosed);
- _state |= _STREAM_CLOSED;
+ /**
+ * Extracts the pending events from a canceled stream.
+ *
+ * This can only be done during the [_onCancel] method call. After that,
+ * any remaining pending events will be cleared.
+ */
+ _PendingEvents _extractPending() {
+ assert(_isCanceled);
+ _PendingEvents events = _pending;
+ _pending = null;
+ return events;
}
- void _setComplete() {
- assert(_isClosed);
- _state = _state |_STREAM_COMPLETE;
+ // StreamSubscription interface.
+
+ void onData(void handleData(T event)) {
+ if (handleData == null) handleData = _nullDataHandler;
+ _onData = handleData;
}
- void _startFiring() {
- assert(!_isFiring);
- assert(!_isInCallback);
- assert(_hasListener);
- assert(!_isPaused);
- // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
- // bit. All current subscribers will now have a _LISTENER_EVENT_ID
- // that doesn't match _STREAM_EVENT_ID, and they will receive the
- // event being fired.
- _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
+ void onError(void handleError(error)) {
+ if (handleError == null) handleError = _nullErrorHandler;
+ _onError = handleError;
}
- void _endFiring(bool wasInputPaused) {
- assert(_isFiring);
- _state ^= _STREAM_FIRING;
- // Had listeners, or we wouldn't have fired.
- _checkCallbacks(true, wasInputPaused);
+ void onDone(void handleDone()) {
+ if (handleDone == null) handleDone = _nullDoneHandler;
+ _onDone = handleDone;
}
- /**
- * Record that a listener wants a pause from events.
- *
- * This methods is called from [_StreamListener.pause()].
- * Subclasses can override this method, along with [isPaused] and
- * [createSubscription], if they want to do a different handling of paused
- * subscriptions, e.g., a filtering stream pausing its own source if all its
- * subscribers are paused.
- */
- void _pause(_StreamListener<T> listener, Future resumeSignal) {
- assert(identical(listener._source, this));
- if (!listener._isSubscribed) {
- throw new StateError("Subscription has been canceled.");
- }
- assert(!_isComplete); // There can be no subscribers when complete.
- bool wasInputPaused = _isInputPaused;
+ void pause([Future resumeSignal]) {
+ if (_isCanceled) return;
bool wasPaused = _isPaused;
- _incrementPauseCount(listener);
- if (resumeSignal != null) {
- resumeSignal.whenComplete(() { this._resume(listener, true); });
- }
- if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) {
- _pendingEvents.cancelSchedule();
- }
- if (_isInactive && !wasInputPaused) {
- _checkCallbacks(true, false);
- if (!_isPaused && _hasPendingEvent) {
- _schedulePendingEvents();
- }
- }
+ bool wasInputPaused = _isInputPaused;
+ // Increment pause count and mark input paused (if it isn't already).
+ _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
+ if (resumeSignal != null) resumeSignal.whenComplete(resume);
+ if (!wasPaused && _pending != null) _pending.cancelSchedule();
+ if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
}
- /** Stops pausing due to one request from the given listener. */
- void _resume(_StreamListener<T> listener, bool fromEvent) {
- if (!listener.isPaused) return;
- assert(listener._isSubscribed);
- assert(_isPaused);
- _decrementPauseCount(listener);
- if (!_isPaused) {
- if (_hasPendingEvent) {
- _state |= _STREAM_PENDING_RESUME;
- // Controller's pause state hasn't changed.
- // If we can fire events now, fire any pending events right away.
- if (_isInactive) {
- if (fromEvent) {
- _handlePendingEvents();
- } else {
- _schedulePendingEvents();
- }
- }
- } else if (_isInactive) {
- _checkCallbacks(true, true);
- if (!_isPaused && _hasPendingEvent) {
- if (fromEvent) {
- _handlePendingEvents();
- } else {
- _schedulePendingEvents();
- }
+ void resume() {
+ if (_isCanceled) return;
+ if (_isPaused) {
+ _decrementPauseCount();
+ if (!_isPaused) {
+ if (_hasPending && !_pending.isEmpty) {
+ // Input is still paused.
+ _pending.schedule(this);
+ } else {
+ assert(_mayResumeInput);
+ _state &= ~_STATE_INPUT_PAUSED;
+ if (!_inCallback) _guardCallback(_onResume);
}
}
}
}
- /** Schedule pending events to be executed. */
- void _schedulePendingEvents() {
- assert(_hasPendingEvent);
- _pendingEvents.schedule(this);
+ void cancel() {
+ if (_isCanceled) return;
+ _cancel();
+ if (!_inCallback) {
+ // otherwise checkState will be called after firing or callback completes.
+ _state |= _STATE_IN_CALLBACK;
+ _onCancel();
+ _pending = null;
+ _state &= ~_STATE_IN_CALLBACK;
+ }
}
- /** Create a subscription object. Called by [subcribe]. */
- _StreamSubscriptionImpl<T> _createSubscription(
- void onData(T data),
- void onError(error),
- void onDone(),
- bool cancelOnError);
+ Future asFuture([var futureValue]) {
+ _FutureImpl<T> result = new _FutureImpl<T>();
- /**
- * Adds a listener to this stream.
- */
- void _addListener(_StreamSubscriptionImpl subscription);
+ // Overwrite the onDone and onError handlers.
+ _onDone = () { result._setValue(futureValue); };
+ _onError = (error) {
+ cancel();
+ result._setError(error);
+ };
- /**
- * Handle a cancel requested from a [_StreamSubscriptionImpl].
- *
- * This method is called from [_StreamSubscriptionImpl.cancel].
- *
- * If an event is currently firing, the cancel is delayed
- * until after the subscribers have received the event.
- */
- void _cancel(_StreamSubscriptionImpl subscriber);
+ return result;
+ }
- /**
- * Iterate over all current subscribers and perform an action on each.
- *
- * Subscribers added during the iteration will not be visited.
- * Subscribers unsubscribed during the iteration will only be removed
- * after they have been acted on.
- *
- * Any change in the pause state is only reported after all subscribers have
- * received the event.
- *
- * The [action] must not throw, or the controller will be left in an
- * invalid state.
- *
- * This method must not be called while [isFiring] is true.
- */
- void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
+ // State management.
- /**
- * Checks whether the subscription/pause state has changed.
- *
- * Calls the appropriate callback if the state has changed from the
- * provided one. Repeats calling callbacks as long as the call changes
- * the state.
- */
- void _checkCallbacks(bool hadListener, bool wasPaused) {
- assert(!_isFiring);
- // Will be handled after the current callback.
- if (_isInCallback) return;
- if (_hasPendingResume && !_hasPendingEvent) {
- _state ^= _STREAM_PENDING_RESUME;
- }
- _state |= _STREAM_CALLBACK;
- while (true) {
- bool hasListener = _hasListener;
- bool isPaused = _isInputPaused;
- if (hadListener != hasListener) {
- _onSubscriptionStateChange();
- } else if (isPaused != wasPaused) {
- _onPauseStateChange();
- } else {
- _state ^= _STREAM_CALLBACK;
- return;
- }
- wasPaused = isPaused;
- hadListener = hasListener;
+ bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
+ bool get _isClosed => (_state & _STATE_CLOSED) != 0;
+ bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
+ bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
+ bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
+ bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
+ bool get _canFire => _state < _STATE_IN_CALLBACK;
+ bool get _mayResumeInput =>
+ !_isPaused && (_pending == null || _pending.isEmpty);
+ bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
+
+ bool get isPaused => _isPaused;
+
+ void _cancel() {
+ _state |= _STATE_CANCELED;
+ if (_hasPending) {
+ _pending.cancelSchedule();
}
}
/**
- * Called when the first subscriber requests a pause or the last a resume.
+ * Increment the pause count.
*
- * Read [isPaused] to see the new state.
+ * Also marks input as paused.
*/
- void _onPauseStateChange() {}
-
- /**
- * Called when the first listener subscribes or the last unsubscribes.
- *
- * Read [hasListener] to see what the new state is.
- */
- void _onSubscriptionStateChange() {}
+ void _incrementPauseCount() {
+ _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
+ }
/**
- * Add a pending event at the end of the pending event queue.
+ * Decrements the pause count.
*
- * Schedules events if currently not paused and inside a callback.
+ * Does not automatically unpause the input (call [_onResume]) when
+ * the pause count reaches zero. This is handled elsewhere, and only
+ * if there are no pending events buffered.
*/
- void _addPendingEvent(_DelayedEvent event) {
- if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
- _StreamImplEvents events = _pendingEvents;
- events.add(event);
- if (_isPaused || _isFiring) return;
- if (_isInCallback) {
- _schedulePendingEvents();
- return;
- }
+ void _decrementPauseCount() {
+ assert(_isPaused);
+ _state -= _STATE_PAUSE_COUNT;
}
- /** Fire any pending events until the pending event queue is empty. */
- void _handlePendingEvents() {
- assert(_isInactive);
- if (!_hasPendingEvent) return;
- _PendingEvents events = _pendingEvents;
- do {
- if (_isPaused) return;
- if (events.isScheduled) events.cancelSchedule();
- events.handleNext(this);
- } while (!events.isEmpty);
- }
+ // _EventSink interface.
- /**
- * Send a data event directly to each subscriber.
- */
- _sendData(T value) {
- assert(!_isPaused);
- assert(!_isComplete);
- if (!_hasListener) return;
- _forEachSubscriber((subscriber) {
- try {
- subscriber._sendData(value);
- } catch (e, s) {
- _throwDelayed(e, s);
- }
- });
+ void _add(T data) {
+ assert(!_isClosed);
+ if (_isCanceled) return;
+ if (_canFire) {
+ _sendData(data);
+ } else {
+ _addPending(new _DelayedData(data));
+ }
}
- /**
- * Sends an error event directly to each subscriber.
- */
- void _sendError(error) {
- assert(!_isPaused);
- assert(!_isComplete);
- if (!_hasListener) return;
- _forEachSubscriber((subscriber) {
- try {
- subscriber._sendError(error);
- } catch (e, s) {
- _throwDelayed(e, s);
- }
- });
+ void _addError(Object error) {
+ if (_isCanceled) return;
+ if (_canFire) {
+ _sendError(error); // Reports cancel after sending.
+ } else {
+ _addPending(new _DelayedError(error));
+ }
}
- /**
- * Sends the "done" message directly to each subscriber.
- * This automatically stops further subscription and
- * unsubscribes all subscribers.
- */
- void _sendDone() {
- assert(!_isPaused);
- assert(_isClosed);
- _setComplete();
- if (!_hasListener) return;
- _forEachSubscriber((subscriber) {
- _cancel(subscriber);
- try {
- subscriber._sendDone();
- } catch (e, s) {
- _throwDelayed(e, s);
- }
- });
- assert(!_hasListener);
+ void _close() {
+ assert(!_isClosed);
+ if (_isCanceled) return;
+ _state |= _STATE_CLOSED;
+ if (_canFire) {
+ _sendDone();
+ } else {
+ _addPending(const _DelayedDone());
+ }
}
-}
-
-// -------------------------------------------------------------------
-// Default implementation of a stream with a single subscriber.
-// -------------------------------------------------------------------
-/**
- * Default implementation of stream capable of sending events to one subscriber.
- *
- * Any class needing to implement [Stream] can either directly extend this
- * class, or extend [Stream] and delegate the subscribe method to an instance
- * of this class.
- *
- * The only public methods are those of [Stream], so instances of
- * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
- * internal functionality.
- *
- * The [StreamController] is a public facing version of this class, with
- * some methods made public.
- *
- * The user interface of [_SingleStreamImpl] are the following methods:
- * * [_add]: Add a data event to the stream.
- * * [_addError]: Add an error event to the stream.
- * * [_close]: Request to close the stream.
- * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
- * when losing the last subscriber.
- * * [_onPauseStateChange]: Called when entering or leaving paused mode.
- * * [_hasListener]: Test whether there are currently any subscribers.
- * * [_isInputPaused]: Test whether the stream is currently paused.
- * The user should not add new events while the stream is paused, but if it
- * happens anyway, the stream will enqueue the events just as when new events
- * arrive while still firing an old event.
- */
-class _SingleStreamImpl<T> extends _StreamImpl<T> {
- _StreamListener _subscriber = null;
- /** Whether there is currently a subscriber on this [Stream]. */
- bool get _hasListener => _subscriber != null;
-
- // -------------------------------------------------------------------
- // Internal implementation.
-
- _SingleStreamImpl() {
- // Start out paused.
- _updatePauseCount(1);
+ // Hooks called when the input is paused, unpaused or canceled.
+ // These must not throw. If overwritten to call user code, include suitable
+ // try/catch wrapping and send any errors to [_throwDelayed].
+ void _onPause() {
+ assert(_isInputPaused);
}
- /**
- * Create the new subscription object.
- */
- _StreamSubscriptionImpl<T> _createSubscription(
- void onData(T data),
- void onError(error),
- void onDone(),
- bool cancelOnError) {
- return new _StreamSubscriptionImpl<T>(
- this, onData, onError, onDone, cancelOnError);
+ void _onResume() {
+ assert(!_isInputPaused);
}
- void _addListener(_StreamListener subscription) {
- assert(!_isComplete);
- if (_hasListener) {
- throw new StateError("Stream already has subscriber.");
- }
- assert(_pauseCount == 1);
- _updatePauseCount(-1);
- _subscriber = subscription;
- subscription._setSubscribed(0);
- if (_isInactive) {
- _checkCallbacks(false, true);
- if (!_isPaused && _hasPendingEvent) {
- _schedulePendingEvents();
- }
- }
+ void _onCancel() {
+ assert(_isCanceled);
}
+ // Handle pending events.
+
/**
- * Handle a cancel requested from a [_StreamSubscriptionImpl].
+ * Add a pending event.
*
- * This method is called from [_StreamSubscriptionImpl.cancel].
+ * If the subscription is not paused, this also schedules a firing
+ * of pending events later (if necessary).
*/
- void _cancel(_StreamListener subscriber) {
- assert(identical(subscriber._source, this));
- // We allow unsubscribing the currently firing subscription during
- // the event firing, because it is indistinguishable from delaying it since
- // that event has already received the event.
- if (!identical(_subscriber, subscriber)) {
- // You may unsubscribe more than once, only the first one counts.
- return;
- }
- _subscriber = null;
- // Unsubscribing a paused subscription also cancels its pauses.
- int resumeCount = subscriber._setUnsubscribed();
- // Keep being paused while there is no subscriber and the stream is not
- // complete.
- _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1);
- if (_isInactive) {
- _checkCallbacks(true, resumeCount > 0);
- if (!_isPaused && _hasPendingEvent) {
- _schedulePendingEvents();
+ void _addPending(_DelayedEvent event) {
+ _StreamImplEvents pending = _pending;
+ if (_pending == null) pending = _pending = new _StreamImplEvents();
+ pending.add(event);
+ if (!_hasPending) {
+ _state |= _STATE_HAS_PENDING;
+ if (!_isPaused) {
+ _pending.schedule(this);
}
}
}
- void _forEachSubscriber(
- void action(_StreamListener<T> subscription)) {
+ /* _EventDispatch interface. */
+
+ void _sendData(T data) {
+ assert(!_isCanceled);
assert(!_isPaused);
+ assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
- _StreamListener subscription = _subscriber;
- assert(subscription != null);
- _startFiring();
- action(subscription);
- _endFiring(wasInputPaused);
- }
-}
-
-// -------------------------------------------------------------------
-// Default implementation of a stream with subscribers.
-// -------------------------------------------------------------------
-
-/**
- * Default implementation of stream capable of sending events to subscribers.
- *
- * Any class needing to implement [Stream] can either directly extend this
- * class, or extend [Stream] and delegate the subscribe method to an instance
- * of this class.
- *
- * The only public methods are those of [Stream], so instances of
- * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
- * internal functionality.
- *
- * The [StreamController] is a public facing version of this class, with
- * some methods made public.
- *
- * The user interface of [_MultiStreamImpl] are the following methods:
- * * [_add]: Add a data event to the stream.
- * * [_addError]: Add an error event to the stream.
- * * [_close]: Request to close the stream.
- * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
- * when losing the last subscriber.
- * * [_onPauseStateChange]: Called when entering or leaving paused mode.
- * * [_hasListener]: Test whether there are currently any subscribers.
- * * [_isPaused]: Test whether the stream is currently paused.
- * The user should not add new events while the stream is paused, but if it
- * happens anyway, the stream will enqueue the events just as when new events
- * arrive while still firing an old event.
- */
-class _MultiStreamImpl<T> extends _StreamImpl<T>
- implements _InternalLinkList {
- // Link list implementation (mixin when possible).
- _InternalLink _nextLink;
- _InternalLink _previousLink;
-
- _MultiStreamImpl() {
- _nextLink = _previousLink = this;
+ _state |= _STATE_IN_CALLBACK;
+ try {
+ _onData(data);
+ } catch (e, s) {
+ _throwDelayed(e, s);
+ }
+ _state &= ~_STATE_IN_CALLBACK;
+ _checkState(wasInputPaused);
}
- bool get isBroadcast => true;
-
- Stream<T> asBroadcastStream() => this;
-
- // ------------------------------------------------------------------
- // Helper functions that can be overridden in subclasses.
-
- /** Whether there are currently any subscribers on this [Stream]. */
- bool get _hasListener => !_InternalLinkList.isEmpty(this);
-
- /**
- * Create the new subscription object.
- */
- _StreamListener<T> _createSubscription(
- void onData(T data),
- void onError(error),
- void onDone(),
- bool cancelOnError) {
- return new _StreamSubscriptionImpl<T>(
- this, onData, onError, onDone, cancelOnError);
+ void _sendError(var error) {
+ assert(!_isCanceled);
+ assert(!_isPaused);
+ assert(!_inCallback);
+ bool wasInputPaused = _isInputPaused;
+ _state |= _STATE_IN_CALLBACK;
+ try {
+ _onError(error);
+ } catch (e, s) {
+ _throwDelayed(e, s);
+ }
+ _state &= ~_STATE_IN_CALLBACK;
+ if (_cancelOnError) {
+ _cancel();
+ }
+ _checkState(wasInputPaused);
}
- // -------------------------------------------------------------------
- // Internal implementation.
-
- /**
- * Iterate over all current subscribers and perform an action on each.
- *
- * The set of subscribers cannot be modified during this iteration.
- * All attempts to add or unsubscribe subscribers will be delayed until
- * after the iteration is complete.
- *
- * The [action] must not throw, or the controller will be left in an
- * invalid state.
- *
- * This method must not be called while [isFiring] is true.
- */
- void _forEachSubscriber(
- void action(_StreamListener<T> subscription)) {
- assert(!_isFiring);
- if (!_hasListener) return;
- bool wasInputPaused = _isInputPaused;
- _startFiring();
- _InternalLink cursor = this._nextLink;
- while (!identical(cursor, this)) {
- _StreamListener<T> current = cursor;
- if (current._needsEvent(_currentEventIdBit)) {
- action(current);
- // Marks as having received the event.
- current._toggleEventReceived();
- }
- cursor = current._nextLink;
- if (current._isPendingUnsubscribe) {
- _removeListener(current);
- }
+ void _sendDone() {
+ assert(!_isCanceled);
+ assert(!_isPaused);
+ assert(!_inCallback);
+ _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
+ try {
+ _onDone();
+ } catch (e, s) {
+ _throwDelayed(e, s);
}
- _endFiring(wasInputPaused);
+ _onCancel(); // No checkState after cancel, it is always the last event.
+ _state &= ~_STATE_IN_CALLBACK;
}
- void _addListener(_StreamListener listener) {
- listener._setSubscribed(_currentEventIdBit);
- bool hadListener = _hasListener;
- _InternalLinkList.add(this, listener);
- if (!hadListener && _isInactive) {
- _checkCallbacks(false, false);
- if (!_isPaused && _hasPendingEvent) {
- _schedulePendingEvents();
- }
- }
+ /**
+ * Call a hook function.
+ *
+ * The call is properly wrapped in code to avoid other callbacks
+ * during the call, and it checks for state changes after the call
+ * that should cause further callbacks.
+ */
+ void _guardCallback(callback) {
+ assert(!_inCallback);
+ bool wasInputPaused = _isInputPaused;
+ _state |= _STATE_IN_CALLBACK;
+ callback();
+ _state &= ~_STATE_IN_CALLBACK;
+ _checkState(wasInputPaused);
}
/**
- * Handle a cancel requested from a [_StreamListener].
+ * Check if the input needs to be informed of state changes.
+ *
+ * State changes are pausing, resuming and canceling.
*
- * This method is called from [_StreamListener.cancel].
+ * After canceling, no further callbacks will happen.
*
- * If an event is currently firing, the cancel is delayed
- * until after the subscribers have received the event.
+ * The cancel callback is called after a user cancel, or after
+ * the final done event is sent.
*/
- void _cancel(_StreamListener listener) {
- assert(identical(listener._source, this));
- if (_InternalLink.isUnlinked(listener)) {
- // You may unsubscribe more than once, only the first one counts.
- return;
+ void _checkState(bool wasInputPaused) {
+ assert(!_inCallback);
+ if (_hasPending && _pending.isEmpty) {
+ _state &= ~_STATE_HAS_PENDING;
+ if (_isInputPaused && _mayResumeInput) {
+ _state &= ~_STATE_INPUT_PAUSED;
+ }
}
- if (_isFiring) {
- if (listener._needsEvent(_currentEventIdBit)) {
- assert(listener._isSubscribed);
- listener._setPendingUnsubscribe(_currentEventIdBit);
- } else {
- // The listener has been notified of the event (or don't need to,
- // if it's still pending subscription) so it's safe to remove it.
- _removeListener(listener);
+ // If the state changes during a callback, we immediately
+ // make a new state-change callback. Loop until the state didn't change.
+ while (true) {
+ if (_isCanceled) {
+ _onCancel();
+ _pending = null;
+ return;
}
- // Pause and subscription state changes are reported when we end
- // firing.
- } else {
- bool wasInputPaused = _isInputPaused;
- _removeListener(listener);
- if (_isInactive) {
- _checkCallbacks(true, wasInputPaused);
- if (!_isPaused && _hasPendingEvent) {
- _schedulePendingEvents();
- }
+ bool isInputPaused = _isInputPaused;
+ if (wasInputPaused == isInputPaused) break;
+ _state ^= _STATE_IN_CALLBACK;
+ if (isInputPaused) {
+ _onPause();
+ } else {
+ _onResume();
}
+ _state &= ~_STATE_IN_CALLBACK;
+ wasInputPaused = isInputPaused;
}
- }
-
- /**
- * Removes a listener from this stream and cancels its pauses.
- *
- * This is a low-level action that doesn't call [_onSubscriptionStateChange].
- * or [_callOnPauseStateChange].
- */
- void _removeListener(_StreamListener listener) {
- int pauseCount = listener._setUnsubscribed();
- _InternalLinkList.remove(listener);
- if (pauseCount > 0) {
- _updatePauseCount(-pauseCount);
- if (!_isPaused && _hasPendingEvent) {
- _state |= _STREAM_PENDING_RESUME;
- }
+ if (_hasPending && !_isPaused) {
+ _pending.schedule(this);
}
}
}
+// -------------------------------------------------------------------
+// Common base class for single and multi-subscription streams.
+// -------------------------------------------------------------------
+abstract class _StreamImpl<T> extends Stream<T> {
+ // ------------------------------------------------------------------
+ // Stream interface.
-/** Stream that generates its own events. */
-class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
- /**
- * Initializes the stream to have only the events provided by [events].
- *
- * A [_PendingEvents] implementation provides events that are handled
- * by calling [_PendingEvents.handleNext] with the [_StreamImpl].
- */
- _GeneratedSingleStreamImpl(_PendingEvents events) {
- _pendingEvents = events;
- _setClosed(); // Closed for input since all events are already pending.
+ StreamSubscription<T> listen(void onData(T data),
+ { void onError(error),
+ void onDone(),
+ bool cancelOnError }) {
+ if (onData == null) onData = _nullDataHandler;
+ if (onError == null) onError = _nullErrorHandler;
+ if (onDone == null) onDone = _nullDoneHandler;
+ cancelOnError = identical(true, cancelOnError);
+ StreamSubscription subscription =
+ _createSubscription(onData, onError, onDone, cancelOnError);
+ _onListen(subscription);
+ return subscription;
}
- void _add(T value) {
- throw new UnsupportedError("Cannot inject events into generated stream");
+ // -------------------------------------------------------------------
+ /** Create a subscription object. Called by [subcribe]. */
+ _BufferingStreamSubscription<T> _createSubscription(
+ void onData(T data),
+ void onError(error),
+ void onDone(),
+ bool cancelOnError) {
+ return new _BufferingStreamSubscription<T>(
+ onData, onError, onDone, cancelOnError);
}
- void _addError(value) {
- throw new UnsupportedError("Cannot inject events into generated stream");
- }
+ /** Hook called when the subscription has been created. */
+ void _onListen(StreamSubscription subscription) {}
+}
- void _close() {
- throw new UnsupportedError("Cannot inject events into generated stream");
+typedef _PendingEvents _EventGenerator();
+
+/** Stream that generates its own events. */
+class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
+ final _EventGenerator _pending;
+ /**
+ * Initializes the stream to have only the events provided by a
+ * [_PendingEvents].
+ *
+ * A new [_PendingEvents] must be generated for each listen.
+ */
+ _GeneratedStreamImpl(this._pending);
+
+ StreamSubscription _createSubscription(void onData(T data),
+ void onError(Object error),
+ void onDone(),
+ bool cancelOnError) {
+ _BufferingStreamSubscription<T> subscription =
+ new _BufferingStreamSubscription(
+ onData, onError, onDone, cancelOnError);
+ subscription._setPendingEvents(_pending());
+ return subscription;
}
}
/** Pending events object that gets its events from an [Iterable]. */
class _IterablePendingEvents<T> extends _PendingEvents {
- final Iterator<T> _iterator;
- /**
- * Whether there are no more events to be sent.
- *
- * This starts out as [:false:] since there is always at least
- * a 'done' event to be sent.
- */
- bool _isDone = false;
+ // The iterator providing data for data events.
+ // Set to null when iteration has completed.
+ Iterator<T> _iterator;
_IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
- bool get isEmpty => _isDone;
+ bool get isEmpty => _iterator == null;
- void handleNext(_StreamImpl<T> stream) {
- if (_isDone) throw new StateError("No events pending.");
+ void handleNext(_EventDispatch dispatch) {
+ if (_iterator == null) {
+ throw new StateError("No events pending.");
+ }
+ // Send one event per call to moveNext.
+ // If moveNext returns true, send the current element as data.
+ // If moveNext returns false, send a done event and clear the _iterator.
+ // If moveNext throws an error, send an error and clear the _iterator.
+ // After an error, no further events will be sent.
+ bool isDone;
try {
- _isDone = !_iterator.moveNext();
- if (!_isDone) {
- stream._sendData(_iterator.current);
- } else {
- stream._sendDone();
- }
+ isDone = !_iterator.moveNext();
} catch (e, s) {
- stream._sendError(_asyncError(e, s));
- stream._sendDone();
- _isDone = true;
+ _iterator = null;
+ dispatch._sendError(_asyncError(e, s));
+ return;
+ }
+ if (!isDone) {
+ dispatch._sendData(_iterator.current);
+ } else {
+ _iterator = null;
+ dispatch._sendDone();
}
}
-}
-
-
-/**
- * The subscription class that the [StreamController] uses.
- *
- * The [_StreamImpl.createSubscription] method should
- * create an object of this type, or another subclass of [_StreamListener].
- * A subclass of [_StreamImpl] can specify which subclass
- * of [_StreamSubscriptionImpl] it uses by overriding
- * [_StreamImpl.createSubscription].
- *
- * The subscription is in one of three states:
- * * Subscribed.
- * * Paused-and-subscribed.
- * * Unsubscribed.
- * Unsubscribing also resumes any pauses started by the subscription.
- */
-class _StreamSubscriptionImpl<T> extends _StreamListener<T>
- implements StreamSubscription<T> {
- final bool _cancelOnError;
- // TODO(ahe): Restore type when feature is implemented in dart2js
- // checked mode. http://dartbug.com/7733
- var /* _DataHandler<T> */ _onData;
- _ErrorHandler _onError;
- _DoneHandler _onDone;
- _StreamSubscriptionImpl(_StreamImpl source,
- this._onData,
- this._onError,
- this._onDone,
- this._cancelOnError) : super(source);
-
- void onData(void handleData(T event)) {
- if (handleData == null) handleData = _nullDataHandler;
- _onData = handleData;
- }
-
- void onError(void handleError(error)) {
- if (handleError == null) handleError = _nullErrorHandler;
- _onError = handleError;
- }
-
- void onDone(void handleDone()) {
- if (handleDone == null) handleDone = _nullDoneHandler;
- _onDone = handleDone;
- }
-
- void _sendData(T data) {
- _onData(data);
- }
-
- void _sendError(error) {
- _onError(error);
- if (_cancelOnError) _source._cancel(this);
- }
-
- void _sendDone() {
- _onDone();
- }
-
- void cancel() {
- if (!_isSubscribed) return;
- _source._cancel(this);
- }
-
- void pause([Future resumeSignal]) {
- if (!_isSubscribed) return;
- _source._pause(this, resumeSignal);
- }
-
- void resume() {
- if (!_isSubscribed || !isPaused) return;
- _source._resume(this, false);
- }
-
- Future asFuture([var futureValue]) {
- _FutureImpl<T> result = new _FutureImpl<T>();
-
- // Overwrite the onDone and onError handlers.
- onDone(() { result._setValue(futureValue); });
- onError((error) {
- cancel();
- result._setError(error);
- });
- return result;
+ void clear() {
+ if (isScheduled) cancelSchedule();
+ _iterator = null;
}
}
+
// Internal helpers.
// Types of the different handlers on a stream. Types used to type fields.
@@ -998,20 +558,20 @@ void _nullErrorHandler(error) {
void _nullDoneHandler() {}
-/** A delayed event on a stream implementation. */
+/** A delayed event on a buffering stream subscription. */
abstract class _DelayedEvent {
/** Added as a linked list on the [StreamController]. */
_DelayedEvent next;
/** Execute the delayed event on the [StreamController]. */
- void perform(_StreamImpl stream);
+ void perform(_EventDispatch dispatch);
}
/** A delayed data event. */
class _DelayedData<T> extends _DelayedEvent{
final T value;
_DelayedData(this.value);
- void perform(_StreamImpl<T> stream) {
- stream._sendData(value);
+ void perform(_EventDispatch<T> dispatch) {
+ dispatch._sendData(value);
}
}
@@ -1019,16 +579,16 @@ class _DelayedData<T> extends _DelayedEvent{
class _DelayedError extends _DelayedEvent {
final error;
_DelayedError(this.error);
- void perform(_StreamImpl stream) {
- stream._sendError(error);
+ void perform(_EventDispatch dispatch) {
+ dispatch._sendError(error);
}
}
/** A delayed done event. */
class _DelayedDone implements _DelayedEvent {
const _DelayedDone();
- void perform(_StreamImpl stream) {
- stream._sendDone();
+ void perform(_EventDispatch dispatch) {
+ dispatch._sendDone();
}
_DelayedEvent get next => null;
@@ -1118,118 +678,66 @@ abstract class _InternalLinkList extends _InternalLink {
}
}
-/** Abstract type for an internal interface for sending events. */
-abstract class _EventOutputSink<T> {
- _sendData(T data);
- _sendError(error);
- _sendDone();
-}
-
-abstract class _StreamListener<T> extends _InternalLink
- implements _EventOutputSink<T> {
- final _StreamImpl _source;
- int _state = _LISTENER_UNSUBSCRIBED;
-
- _StreamListener(this._source);
-
- bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
-
- bool get _isPendingUnsubscribe =>
- (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
-
- bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
+/** Superclass for provider of pending events. */
+abstract class _PendingEvents {
+ // No async event has been scheduled.
+ static const int _STATE_UNSCHEDULED = 0;
+ // An async event has been scheduled to run a function.
+ static const int _STATE_SCHEDULED = 1;
+ // An async event has been scheduled, but it will do nothing when it runs.
+ // Async events can't be preempted.
+ static const int _STATE_CANCELED = 3;
/**
- * Whether the listener still needs to receive the currently firing event.
+ * State of being scheduled.
*
- * The currently firing event is identified by a single bit, which alternates
- * between events. The [_state] contains the previously sent event's bit in
- * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
- * still need the current event.
- */
- bool _needsEvent(int currentEventIdBit) {
- int lastEventIdBit =
- (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
- return lastEventIdBit != currentEventIdBit;
- }
-
- /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
- /// we are currently firing an event and the subscriber still need to receive
- /// the event.
- void _toggleEventReceived() {
- _state ^= _LISTENER_EVENT_ID;
- }
-
- void _setSubscribed(int eventIdBit) {
- assert(eventIdBit == 0 || eventIdBit == 1);
- _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
- }
-
- void _setPendingUnsubscribe(int currentEventIdBit) {
- assert(_isSubscribed);
- // Sets the pending unsubscribe, and ensures that the listener
- // won't get the current event.
- _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID;
- _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT;
- assert(!_needsEvent(currentEventIdBit));
- }
-
- /**
- * Marks the listener as unsubscibed.
+ * Set to [_STATE_SCHEDULED] when pending events are scheduled for
+ * async dispatch. Since we can't cancel a [runAsync] call, if schduling
+ * is "canceled", the _state is simply set to [_STATE_CANCELED] which will
+ * make the async code do nothing except resetting [_state].
*
- * Returns the number of unresumed pauses for the listener.
+ * If events are scheduled while the state is [_STATE_CANCELED], it is
+ * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync]
+ * is performed.
*/
- int _setUnsubscribed() {
- assert(_isSubscribed);
- int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
- _state = _LISTENER_UNSUBSCRIBED;
- return timesPaused;
- }
+ int _state = _STATE_UNSCHEDULED;
- void _incrementPauseCount() {
- _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
- }
-
- void _decrementPauseCount() {
- assert(isPaused);
- _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
- }
+ bool get isEmpty;
- _sendData(T data);
- _sendError(error);
- _sendDone();
-}
+ bool get isScheduled => _state == _STATE_SCHEDULED;
+ bool get _eventScheduled => _state >= _STATE_SCHEDULED;
-/** Superclass for provider of pending events. */
-abstract class _PendingEvents {
/**
- * Timer set when pending events are scheduled for execution.
+ * Schedule an event to run later.
*
- * When scheduling pending events for execution in a later cycle, the timer
- * is stored here. If pending events are executed earlier than that, e.g.,
- * due to a second event in the current cycle, the timer is canceled again.
+ * If called more than once, it should be called with the same dispatch as
+ * argument each time. It may reuse an earlier argument in some cases.
*/
- Timer scheduleTimer = null;
-
- bool get isEmpty;
-
- bool get isScheduled => scheduleTimer != null;
-
- void schedule(_StreamImpl stream) {
+ void schedule(_EventDispatch dispatch) {
if (isScheduled) return;
- scheduleTimer = new Timer(Duration.ZERO, () {
- scheduleTimer = null;
- stream._handlePendingEvents();
+ assert(!isEmpty);
+ if (_eventScheduled) {
+ assert(_state == _STATE_CANCELED);
+ _state = _STATE_SCHEDULED;
+ return;
+ }
+ runAsync(() {
+ int oldState = _state;
+ _state = _STATE_UNSCHEDULED;
+ if (oldState == _STATE_CANCELED) return;
+ handleNext(dispatch);
});
+ _state = _STATE_SCHEDULED;
}
void cancelSchedule() {
- assert(isScheduled);
- scheduleTimer.cancel();
- scheduleTimer = null;
+ if (isScheduled) _state = _STATE_CANCELED;
}
- void handleNext(_StreamImpl stream);
+ void handleNext(_EventDispatch dispatch);
+
+ /** Throw away any pending events and cancel scheduled events. */
+ void clear();
}
@@ -1242,8 +750,6 @@ class _StreamImplEvents extends _PendingEvents {
bool get isEmpty => lastPendingEvent == null;
- bool get isScheduled => scheduleTimer != null;
-
void add(_DelayedEvent event) {
if (lastPendingEvent == null) {
firstPendingEvent = lastPendingEvent = event;
@@ -1252,122 +758,465 @@ class _StreamImplEvents extends _PendingEvents {
}
}
- void handleNext(_StreamImpl stream) {
+ void handleNext(_EventDispatch dispatch) {
assert(!isScheduled);
_DelayedEvent event = firstPendingEvent;
firstPendingEvent = event.next;
if (firstPendingEvent == null) {
lastPendingEvent = null;
}
- event.perform(stream);
+ event.perform(dispatch);
+ }
+
+ void clear() {
+ if (isScheduled) cancelSchedule();
+ firstPendingEvent = lastPendingEvent = null;
}
}
+class _MultiplexerLinkedList {
+ _MultiplexerLinkedList _next;
+ _MultiplexerLinkedList _previous;
-class _DoneSubscription<T> implements StreamSubscription<T> {
- _DoneHandler _handler;
- Timer _timer;
- int _pauseCount = 0;
+ void _unlink() {
+ _previous._next = _next;
+ _next._previous = _previous;
+ _next = _previous = this;
+ }
- _DoneSubscription(this._handler) {
- _delayDone();
+ void _insertBefore(_MultiplexerLinkedList newNext) {
+ _MultiplexerLinkedList newPrevious = newNext._previous;
+ newPrevious._next = this;
+ newNext._previous = _previous;
+ _previous._next = newNext;
+ _previous = newPrevious;
}
+}
- void _delayDone() {
- assert(_timer == null && _pauseCount == 0);
- _timer = new Timer(Duration.ZERO, () {
- if (_handler != null) _handler();
- });
+/**
+ * A subscription used by [_SingleStreamMultiplexer].
+ *
+ * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
+ * listeners at a time. It is used to implement [Stream.asBroadcastStream].
+ *
+ * It is itself listening to another stream for events, and it forwards all
+ * events to all of its simultanous listeners.
+ *
+ * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
+ */
+// TODO(lrn): Change "implements" to "with" when automatic mixin constructors
+// are implemented.
+class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
+ implements _MultiplexerLinkedList {
+ static const int _STATE_NOT_LISTENING = 0;
+ // Bit that alternates between event firings. If bit matches the one currently
+ // firing, the subscription will not be notified.
+ static const int _STATE_EVENT_ID_BIT = 1;
+ // Whether the subscription is listening at all. This should be set while
+ // it is part of the linked list of listeners of a multiplexer stream.
+ static const int _STATE_LISTENING = 2;
+ // State bit set while firing an event.
+ static const int _STATE_IS_FIRING = 4;
+ // Bit set if a subscription is canceled while it's firing (the
+ // [_STATE_IS_FIRING] bit is set).
+ // If the subscription is canceled while firing, it is not removed from the
+ // linked list immediately (to avoid breaking iteration), but is instead
+ // removed after it is done firing.
+ static const int _STATE_REMOVE_AFTER_FIRING = 8;
+
+ // Firing state.
+ int _multiplexState;
+
+ _SingleStreamMultiplexer _source;
+
+ _MultiplexerSubscription(this._source,
+ void onData(T data),
+ void onError(Object error),
+ void onDone(),
+ bool cancelOnError,
+ int nextEventId)
+ : _multiplexState = _STATE_LISTENING | nextEventId,
+ super(onData, onError, onDone, cancelOnError) {
+ _next = _previous = this;
+ }
+
+ // Mixin workaround.
+ _MultiplexerLinkedList _next;
+ _MultiplexerLinkedList _previous;
+
+ void _unlink() {
+ _previous._next = _next;
+ _next._previous = _previous;
+ _next = _previous = this;
+ }
+
+ void _insertBefore(_MultiplexerLinkedList newNext) {
+ _MultiplexerLinkedList newPrevious = newNext._previous;
+ newPrevious._next = this;
+ newNext._previous = _previous;
+ _previous._next = newNext;
+ _previous = newPrevious;
+ }
+ // End mixin.
+
+ bool get _isListening => _multiplexState >= _STATE_LISTENING;
+ bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
+ bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
+ int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
+
+ void _setRemoveAfterFiring() {
+ assert(_isFiring);
+ _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
+ }
+
+ void _startFiring() {
+ assert(!_isFiring);
+ _multiplexState |= _STATE_IS_FIRING;
}
- bool get _isComplete => _timer == null && _pauseCount == 0;
+ /// Marks listener as no longer firing, and toggles its event id.
+ void _endFiring() {
+ assert(_isFiring);
+ _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
+ }
- void onData(void handleAction(T value)) {}
+ void _setNotListening() {
+ assert(_isListening);
+ _multiplexState = _STATE_NOT_LISTENING;
+ }
- void onError(void handleError(error)) {}
+ void _onCancel() {
+ assert(_isListening);
+ _source._removeListener(this);
+ }
+}
- void onDone(void handleDone()) {
- _handler = handleDone;
+/**
+ * A stream that sends events from another stream to multiple listeners.
+ *
+ * This is used to implement [Stream.asBroadcastStream].
+ *
+ * This stream allows listening more than once.
+ * When the first listener is added, it starts listening on its source
+ * stream for events. All events from the source stream are sent to all
+ * active listeners. The listeners handle their own buffering.
+ * When the last listener cancels, the source stream subscription is also
+ * canceled, and no further listening is possible.
+ */
+// TODO(lrn): change "implements" to "with" when the VM supports it.
+class _SingleStreamMultiplexer<T> extends Stream<T>
+ implements _MultiplexerLinkedList,
+ _EventDispatch<T> {
+ final Stream<T> _source;
+ StreamSubscription<T> _subscription;
+ // Alternates between zero and one for each event fired.
+ // Listeners are initialized with the next event's id, and will
+ // only be notified if they match the event being fired.
+ // That way listeners added during event firing will not receive
+ // the current event.
+ int _eventId = 0;
+
+ bool _isFiring = false;
+
+ // Remember events added while firing.
+ _StreamImplEvents _pending;
+
+ _SingleStreamMultiplexer(this._source) {
+ _next = _previous = this;
+ }
+
+ bool get _hasPending => _pending != null && !_pending.isEmpty;
+
+ // Should be mixin.
+ _MultiplexerLinkedList _next;
+ _MultiplexerLinkedList _previous;
+
+ void _unlink() {
+ _previous._next = _next;
+ _next._previous = _previous;
+ _next = _previous = this;
+ }
+
+ void _insertBefore(_MultiplexerLinkedList newNext) {
+ _MultiplexerLinkedList newPrevious = newNext._previous;
+ newPrevious._next = this;
+ newNext._previous = _previous;
+ _previous._next = newNext;
+ _previous = newPrevious;
+ }
+ // End of mixin.
+
+ StreamSubscription<T> listen(void onData(T data),
+ { void onError(Object error),
+ void onDone(),
+ bool cancelOnError }) {
+ if (onData == null) onData = _nullDataHandler;
+ if (onError == null) onError = _nullErrorHandler;
+ if (onDone == null) onDone = _nullDoneHandler;
+ cancelOnError = identical(true, cancelOnError);
+ _MultiplexerSubscription subscription =
+ new _MultiplexerSubscription(this, onData, onError, onDone,
+ cancelOnError, _eventId);
+ if (_subscription == null) {
+ _subscription = _source.listen(_add, onError: _addError, onDone: _close);
+ }
+ subscription._insertBefore(this);
+ return subscription;
}
- void pause([Future signal]) {
- if (_isComplete) return;
- if (_timer != null) {
- _timer.cancel();
- _timer = null;
+ /** Called by [_MultiplexerSubscription.remove]. */
+ void _removeListener(_MultiplexerSubscription listener) {
+ if (listener._isFiring) {
+ listener._setRemoveAfterFiring();
+ } else {
+ _unlinkListener(listener);
}
- _pauseCount++;
- if (signal != null) signal.whenComplete(resume);
}
- void resume() {
- if (_isComplete) return;
- if (_pauseCount == 0) return;
- _pauseCount--;
- if (_pauseCount == 0) {
- _delayDone();
+ /** Remove a listener and close the subscription after the last one. */
+ void _unlinkListener(_MultiplexerSubscription listener) {
+ listener._setNotListening();
+ listener._unlink();
+ if (identical(_next, this)) {
+ // Last listener removed.
+ _cancel();
}
}
- bool get isPaused => _pauseCount > 0;
+ void _cancel() {
+ StreamSubscription subscription = _subscription;
+ _subscription = null;
+ subscription.cancel();
+ if (_pending != null) _pending.cancelSchedule();
+ }
+
+ void _forEachListener(void action(_MultiplexerSubscription listener)) {
+ int eventId = _eventId;
+ _eventId ^= 1;
+ _isFiring = true;
+ _MultiplexerLinkedList entry = _next;
+ // Call each listener in order. A listener can be removed during any
+ // other listener's event. During its own event it will only be marked
+ // as "to be removed", and it will be handled after the event is done.
+ while (!identical(entry, this)) {
+ _MultiplexerSubscription listener = entry;
+ if (listener._eventId == eventId) {
+ listener._startFiring();
+ action(listener);
+ listener._endFiring(); // Also toggles the event id.
+ }
+ entry = listener._next;
+ if (listener._removeAfterFiring) {
+ _unlinkListener(listener);
+ }
+ }
+ _isFiring = false;
+ }
- void cancel() {
- if (_isComplete) return;
- if (_timer != null) {
- _timer.cancel();
- _timer = null;
+ void _add(T data) {
+ if (_isFiring || _hasPending) {
+ _StreamImplEvents pending = _pending;
+ if (pending == null) pending = _pending = new _StreamImplEvents();
+ pending.add(new _DelayedData(data));
+ } else {
+ _sendData(data);
}
- _pauseCount = 0;
}
- Future asFuture([var futureValue]) {
- // TODO(floitsch): share more code.
- _FutureImpl<T> result = new _FutureImpl<T>();
+ void _addError(Object error) {
+ if (_isFiring || _hasPending) {
+ _StreamImplEvents pending = _pending;
+ if (pending == null) pending = _pending = new _StreamImplEvents();
+ pending.add(new _DelayedError(error));
+ } else {
+ _sendError(error);
+ }
+ }
- // Overwrite the onDone and onError handlers.
- onDone(() { result._setValue(futureValue); });
- onError((error) {
- cancel();
- result._setError(error);
+ void _close() {
+ if (_isFiring || _hasPending) {
+ _StreamImplEvents pending = _pending;
+ if (pending == null) pending = _pending = new _StreamImplEvents();
+ pending.add(const _DelayedDone());
+ } else {
+ _sendDone();
+ }
+ }
+
+ void _sendData(T data) {
+ _forEachListener((_MultiplexerSubscription listener) {
+ listener._add(data);
});
+ if (_hasPending) {
+ _pending.schedule(this);
+ }
+ }
- return result;
+ void _sendError(Object error) {
+ _forEachListener((_MultiplexerSubscription listener) {
+ listener._addError(error);
+ });
+ if (_hasPending) {
+ _pending.schedule(this);
+ }
}
-}
-class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
- final Stream<T> _source;
- StreamSubscription<T> _subscription;
+ void _sendDone() {
+ _forEachListener((_MultiplexerSubscription listener) {
+ listener._setRemoveAfterFiring();
+ listener._close();
+ });
+ }
+}
- _SingleStreamMultiplexer(this._source);
- void _callOnPauseStateChange() {
- if (_isPaused) {
- if (_subscription != null) {
- _subscription.pause();
- }
+/**
+ * Simple implementation of [StreamIterator].
+ */
+class _StreamIteratorImpl<T> implements StreamIterator<T> {
+ // Internal state of the stream iterator.
+ // At any time, it is in one of these states.
+ // The interpretation of the [_futureOrPrefecth] field depends on the state.
+ // In _STATE_MOVING, the _data field holds the most recently returned
+ // future.
+ // When in one of the _STATE_EXTRA_* states, the it may hold the
+ // next data/error object, and the subscription is paused.
+
+ /// The simple state where [_data] holds the data to return, and [moveNext]
+ /// is allowed. The subscription is actively listening.
+ static const int _STATE_FOUND = 0;
+ /// State set after [moveNext] has returned false or an error,
+ /// or after calling [cancel]. The subscription is always canceled.
+ static const int _STATE_DONE = 1;
+ /// State set after calling [moveNext], but before its returned future has
+ /// completed. Calling [moveNext] again is not allowed in this state.
+ /// The subscription is actively listening.
+ static const int _STATE_MOVING = 2;
+ /// States set when another event occurs while in _STATE_FOUND.
+ /// This extra overflow event is cached until the next call to [moveNext],
+ /// which will complete as if it received the event normally.
+ /// The subscription is paused in these states, so we only ever get one
+ /// event too many.
+ static const int _STATE_EXTRA_DATA = 3;
+ static const int _STATE_EXTRA_ERROR = 4;
+ static const int _STATE_EXTRA_DONE = 5;
+
+ /// Subscription being listened to.
+ StreamSubscription _subscription;
+
+ /// The current element represented by the most recent call to moveNext.
+ ///
+ /// Is null between the time moveNext is called and its future completes.
+ T _current = null;
+
+ /// The future returned by the most recent call to [moveNext].
+ ///
+ /// Also used to store the next value/error in case the stream provides an
+ /// event before [moveNext] is called again. In that case, the stream will
+ /// be paused to prevent further events.
+ var _futureOrPrefetch = null;
+
+ /// The current state.
+ int _state = _STATE_FOUND;
+
+ _StreamIteratorImpl(final Stream<T> stream) {
+ _subscription = stream.listen(_onData,
+ onError: _onError,
+ onDone: _onDone,
+ cancelOnError: true);
+ }
+
+ T get current => _current;
+
+ Future<bool> moveNext() {
+ if (_state == _STATE_DONE) {
+ return new _FutureImpl<bool>.immediate(false);
+ }
+ if (_state == _STATE_MOVING) {
+ throw new StateError("Already waiting for next.");
+ }
+ if (_state == _STATE_FOUND) {
+ _state = _STATE_MOVING;
+ _futureOrPrefetch = new _FutureImpl<bool>();
+ return _futureOrPrefetch;
} else {
- if (_subscription != null) {
- _subscription.resume();
+ assert(_state >= _STATE_EXTRA_DATA);
+ switch (_state) {
+ case _STATE_EXTRA_DATA:
+ _state = _STATE_FOUND;
+ _current = _futureOrPrefetch;
+ _futureOrPrefetch = null;
+ _subscription.resume();
+ return new FutureImpl<bool>.immediate(true);
+ case _STATE_EXTRA_ERROR:
+ Object prefetch = _futureOrPrefetch;
+ _cancel();
+ return new FutureImpl<bool>.error(prefetch);
+ case _STATE_EXTRA_DONE:
+ _cancel();
+ return new FutureImpl<bool>.immediate(false);
}
}
}
- /**
- * Subscribe or unsubscribe on [_source] depending on whether
- * [_stream] has subscribers.
- */
- void _onSubscriptionStateChange() {
- if (_hasListener) {
- assert(_subscription == null);
- _subscription = _source.listen(this._add,
- onError: this._addError,
- onDone: this._close);
+ /** Clears up the internal state when the iterator ends. */
+ void _clear() {
+ _subscription = null;
+ _futureOrPrefetch = null;
+ _current = null;
+ _state = _STATE_DONE;
+ }
+
+ void cancel() {
+ StreamSubscription subscription = _subscription;
+ if (_state == _STATE_MOVING) {
+ _FutureImpl<bool> hasNext = _futureOrPrefetch;
+ _clear();
+ hasNext._setValue(false);
} else {
- // TODO(lrn): Check why this can happen.
- if (_subscription == null) return;
- _subscription.cancel();
- _subscription = null;
+ _clear();
+ }
+ subscription.cancel();
+ }
+
+ void _onData(T data) {
+ if (_state == _STATE_MOVING) {
+ _current = data;
+ _FutureImpl<bool> hasNext = _futureOrPrefetch;
+ _futureOrPrefetch = null;
+ _state = _STATE_FOUND;
+ hasNext._setValue(true);
+ return;
+ }
+ _subscription.pause();
+ assert(_futureOrPrefetch == null);
+ _futureOrPrefetch = data;
+ _state = _STATE_EXTRA_DATA;
+ }
+
+ void _onError(Object error) {
+ if (_state == _STATE_MOVING) {
+ _FutureImpl<bool> hasNext = _futureOrPrefetch;
+ // We have cancelOnError: true, so the subscription is canceled.
+ _clear();
+ hasNext._setError(error);
+ return;
+ }
+ _subscription.pause();
+ assert(_futureOrPrefetch == null);
+ _futureOrPrefetch = error;
+ _state = _STATE_EXTRA_ERROR;
+ }
+
+ void _onDone() {
+ if (_state == _STATE_MOVING) {
+ _FutureImpl<bool> hasNext = _futureOrPrefetch;
+ _clear();
+ hasNext._setValue(false);
+ return;
}
+ _subscription.pause();
+ _futureOrPrefetch = null;
+ _state = _STATE_EXTRA_DONE;
}
}
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698