Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(382)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 15989006: Revert until Windows crash is debugged. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index 5987050a586da99de0d00b8f31a99d06c08531ad..8fa1848d14ac0261b2943dfc41c4ba314d457162 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -4,6 +4,57 @@
part of dart.async;
+// States shared by single/multi stream implementations.
+
+// Completion state of the stream.
+/// Initial and default state where the stream can receive and send events.
+const int _STREAM_OPEN = 0;
+/// The stream has received a request to complete, but hasn't done so yet.
+/// No further events can be added to the stream.
+const int _STREAM_CLOSED = 1;
+/// The stream has completed and will no longer receive or send events.
+/// Also counts as closed. The stream must not be paused when it's completed.
+/// Always used in conjunction with [_STREAM_CLOSED].
+const int _STREAM_COMPLETE = 2;
+
+/// Bit that alternates between events, and listeners are updated to the
+/// current value when they are notified of the event.
+const int _STREAM_EVENT_ID = 4;
+const int _STREAM_EVENT_ID_SHIFT = 2;
+
+// The activity state of the stream: What is it currently doing.
+/// Bit set while firing and clear while not.
+const int _STREAM_FIRING = 8;
+/// Bit set while calling a pause-state or subscription-state change callback.
+const int _STREAM_CALLBACK = 16;
+
+// The pause state of the stream.
+/// Bit set when resuming with pending events. Cleared after all pending events
+/// have been transmitted. Means that the controller still considers the
+/// stream paused, even if the listener doesn't.
+const int _STREAM_PENDING_RESUME = 32;
+/// The count of times a stream has paused is stored in the
+/// state, shifted by this amount.
+const int _STREAM_PAUSE_COUNT_SHIFT = 6;
+
+// States for listeners.
+
+/// The listener is currently not subscribed to its source stream.
+const int _LISTENER_UNSUBSCRIBED = 0;
+/// The listener is actively subscribed to its source stream.
+const int _LISTENER_SUBSCRIBED = 1;
+/// The listener is subscribed until it has been notified of the current event.
+/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
+const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
+
+/// Bit that contains the last sent event's "id bit".
+const int _LISTENER_EVENT_ID = 4;
+const int _LISTENER_EVENT_ID_SHIFT = 2;
+
+/// The count of times a listener has paused is stored in the
+/// state, shifted by this amount.
+const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
+
/** Throws the given error in the next cycle. */
_throwDelayed(var error, [Object stackTrace]) {
// We are going to reach the top-level here, but there might be a global
@@ -18,525 +69,914 @@ _throwDelayed(var error, [Object stackTrace]) {
});
}
-/** Abstract and private interface for a place to put events. */
-abstract class _EventSink<T> {
- void _add(T data);
- void _addError(Object error);
- void _close();
-}
-/**
- * Abstract and private interface for a place to send events.
- *
- * Used by event buffering to finally dispatch the pending event, where
- * [_EventSink] is where the event first enters the stream subscription,
- * and may yet be buffered.
- */
-abstract class _EventDispatch<T> {
- void _sendData(T data);
- void _sendError(Object error);
- void _sendDone();
-}
+// -------------------------------------------------------------------
+// Common base class for single and multi-subscription streams.
+// -------------------------------------------------------------------
+abstract class _StreamImpl<T> extends Stream<T> {
+ /** Current state of the stream. */
+ int _state = _STREAM_OPEN;
-/**
- * Default implementation of stream subscription of buffering events.
- *
- * The only public methods are those of [StreamSubscription], so instances of
- * [_BufferingStreamSubscription] can be returned directly as a
- * [StreamSubscription] without exposing internal functionality.
- *
- * The [StreamController] is a public facing version of [Stream] and this class,
- * with some methods made public.
- *
- * The user interface of [_BufferingStreamSubscription] are the following
- * methods:
- * * [_add]: Add a data event to the stream.
- * * [_addError]: Add an error event to the stream.
- * * [_close]: Request to close the stream.
- * * [_onCancel]: Called when the subscription will provide no more events,
- * either due to being actively canceled, or after sending a done event.
- * * [_onPause]: Called when the subscription wants the event source to pause.
- * * [_onResume]: Called when allowing new events after a pause.
- * The user should not add new events when the subscription requests a paused,
- * but if it happens anyway, the subscription will enqueue the events just as
- * when new events arrive while still firing an old event.
- */
-class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
- _EventSink<T>,
- _EventDispatch<T> {
- /** The `cancelOnError` flag from the `listen` call. */
- static const int _STATE_CANCEL_ON_ERROR = 1;
/**
- * Whether the "done" event has been received.
- * No further events are accepted after this.
- */
- static const int _STATE_CLOSED = 2;
- /**
- * Set if the input has been asked not to send events.
+ * List of pending events.
*
- * This is not the same as being paused, since the input will remain paused
- * after a call to [resume] if there are pending events.
+ * If events are added to the stream (using [_add], [_addError] or [_done])
+ * while the stream is paused, or while another event is firing, events will
+ * stored here.
+ * Also supports scheduling the events for later execution.
*/
- static const int _STATE_INPUT_PAUSED = 4;
+ _PendingEvents _pendingEvents;
+
+ // ------------------------------------------------------------------
+ // Stream interface.
+
+ StreamSubscription<T> listen(void onData(T data),
+ { void onError(error),
+ void onDone(),
+ bool cancelOnError }) {
+ if (_isComplete) {
+ return new _DoneSubscription(onDone);
+ }
+ if (onData == null) onData = _nullDataHandler;
+ if (onError == null) onError = _nullErrorHandler;
+ if (onDone == null) onDone = _nullDoneHandler;
+ cancelOnError = identical(true, cancelOnError);
+ _StreamSubscriptionImpl subscription =
+ _createSubscription(onData, onError, onDone, cancelOnError);
+ _addListener(subscription);
+ return subscription;
+ }
+
+ // ------------------------------------------------------------------
+ // EventSink interface-like methods for sending events into the stream.
+ // It's the responsibility of the caller to ensure that the stream is not
+ // paused when adding events. If the stream is paused, the events will be
+ // queued, but it's better to not send events at all.
+
/**
- * Whether the subscription has been canceled.
- *
- * Set by calling [cancel], or by handling a "done" event, or an "error" event
- * when `cancelOnError` is true.
+ * Send or queue a data event.
*/
- static const int _STATE_CANCELED = 8;
- static const int _STATE_IN_CALLBACK = 16;
- static const int _STATE_HAS_PENDING = 32;
- static const int _STATE_PAUSE_COUNT = 64;
- static const int _STATE_PAUSE_COUNT_SHIFT = 6;
-
- /* Event handlers provided in constructor. */
- /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands
- * parameterized function types. */
- Function _onData;
- _ErrorHandler _onError;
- _DoneHandler _onDone;
-
- /** Bit vector based on state-constants above. */
- int _state;
+ void _add(T value) {
+ if (_isClosed) throw new StateError("Sending on closed stream");
+ if (!_mayFireState) {
+ // Not the time to send events.
+ _addPendingEvent(new _DelayedData<T>(value));
+ return;
+ }
+ if (_hasPendingEvent) {
+ _addPendingEvent(new _DelayedData<T>(value));
+ } else {
+ _sendData(value);
+ }
+ _handlePendingEvents();
+ }
/**
- * Queue of pending events.
+ * Send or enqueue an error event.
*
- * Is created when necessary, or set in constructor for preconfigured events.
+ * If a subscription has requested to be unsubscribed on errors,
+ * it will be unsubscribed after receiving this event.
*/
- _PendingEvents _pending;
-
- _BufferingStreamSubscription(this._onData,
- this._onError,
- this._onDone,
- bool cancelOnError)
- : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) {
- assert(_onData != null);
- assert(_onError != null);
- assert(_onDone != null);
+ void _addError(error) {
+ if (_isClosed) throw new StateError("Sending on closed stream");
+ if (!_mayFireState) {
+ // Not the time to send events.
+ _addPendingEvent(new _DelayedError(error));
+ return;
+ }
+ if (_hasPendingEvent) {
+ _addPendingEvent(new _DelayedError(error));
+ } else {
+ _sendError(error);
+ }
+ _handlePendingEvents();
}
/**
- * Sets the subscription's pending events object.
+ * Send or enqueue a "done" message.
*
- * This can only be done once. The pending events object is used for the
- * rest of the subscription's life cycle.
+ * The "done" message should be sent at most once by a stream, and it
+ * should be the last message sent.
*/
- void _setPendingEvents(_PendingEvents pendingEvents) {
- assert(_pending == null);
- if (pendingEvents == null) return;
- _pending = pendingEvents;
- if (!pendingEvents.isEmpty) {
- _state |= _STATE_HAS_PENDING;
- _pending.schedule(this);
+ void _close() {
+ if (_isClosed) return;
+ _state |= _STREAM_CLOSED;
+ if (!_mayFireState) {
+ // Not the time to send events.
+ _addPendingEvent(const _DelayedDone());
+ return;
+ }
+ if (_hasPendingEvent) {
+ _addPendingEvent(new _DelayedDone());
+ _handlePendingEvents();
+ } else {
+ _sendDone();
+ assert(_isComplete);
+ assert(!_hasPendingEvent);
}
}
+ // -------------------------------------------------------------------
+ // Internal implementation.
+
+ // State predicates.
+
+ // Lifecycle state.
+ /** Whether the stream is in the default, open, state for events. */
+ bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0;
+
+ /** Whether the stream has been closed (a done event requested). */
+ bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
+
+ /** Whether the stream is completed. */
+ bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
+
+ // Pause state.
+
+ /** Whether one or more active subscribers have requested a pause. */
+ bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
+
+ /** How many times the stream has been paused. */
+ int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT;
+
/**
- * Extracts the pending events from a canceled stream.
+ * Whether a controller thinks the stream is paused.
*
- * This can only be done during the [_onCancel] method call. After that,
- * any remaining pending events will be cleared.
+ * When this changes, a pause-state change callback is performed.
+ *
+ * It may differ from [_isPaused] if there are pending events
+ * in the queue when the listeners resume. The controller won't
+ * be informed until all queued events have been fired.
*/
- _PendingEvents _extractPending() {
- assert(_isCanceled);
- _PendingEvents events = _pending;
- _pending = null;
- return events;
+ bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME);
+
+ /** Whether we have a pending resume scheduled. */
+ bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0;
+
+
+ // Action state. If the stream makes a call-out to external code,
+ // this state tracks it and avoids reentrancy problems.
+
+ /** Whether the stream is not currently firing or calling a callback. */
+ bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0;
+
+ /** Whether we are currently executing a state-chance callback. */
+ bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0;
+
+ /** Whether we are currently firing an event. */
+ bool get _isFiring => (_state & _STREAM_FIRING) != 0;
+
+ /** Check whether the pending event queue is non-empty */
+ bool get _hasPendingEvent =>
+ _pendingEvents != null && !_pendingEvents.isEmpty;
+
+ /**
+ * The bit representing the current or last event fired.
+ *
+ * This bit matches a bit on listeners that have received the corresponding
+ * event. It is toggled for each new event being fired.
+ */
+ int get _currentEventIdBit =>
+ (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
+
+ /** Whether there is currently a subscriber on this [Stream]. */
+ bool get _hasListener;
+
+
+ /** Whether the state bits allow firing. */
+ bool get _mayFireState {
+ // The state allows firing unless:
+ // - it's currently firing
+ // - it's currently in a callback
+ // - it's paused
+ const int mask =
+ _STREAM_FIRING |
+ _STREAM_CALLBACK |
+ ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1);
+ return (_state & mask) == 0;
}
- // StreamSubscription interface.
+ // State modification.
- void onData(void handleData(T event)) {
- if (handleData == null) handleData = _nullDataHandler;
- _onData = handleData;
+ /** Record an increases in the number of times the listener has paused. */
+ void _incrementPauseCount(_StreamListener<T> listener) {
+ listener._incrementPauseCount();
+ _state &= ~_STREAM_PENDING_RESUME;
+ _updatePauseCount(1);
}
- void onError(void handleError(error)) {
- if (handleError == null) handleError = _nullErrorHandler;
- _onError = handleError;
+ /** Record a decrease in the number of times the listener has paused. */
+ void _decrementPauseCount(_StreamListener<T> listener) {
+ assert(_isPaused);
+ listener._decrementPauseCount();
+ _updatePauseCount(-1);
}
- void onDone(void handleDone()) {
- if (handleDone == null) handleDone = _nullDoneHandler;
- _onDone = handleDone;
+ /** Update the stream's own pause count only. */
+ void _updatePauseCount(int by) {
+ int oldState = _state;
+ // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js
+ // converts the result of the left-shift to a positive number.
+ if (by >= 0) {
+ _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT);
+ } else {
+ _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT);
+ }
+ assert(_state >= 0);
+ assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) ==
+ (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by);
}
- void pause([Future resumeSignal]) {
- if (_isCanceled) return;
- bool wasPaused = _isPaused;
- bool wasInputPaused = _isInputPaused;
- // Increment pause count and mark input paused (if it isn't already).
- _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
- if (resumeSignal != null) resumeSignal.whenComplete(resume);
- if (!wasPaused && _pending != null) _pending.cancelSchedule();
- if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
+ void _setClosed() {
+ assert(!_isClosed);
+ _state |= _STREAM_CLOSED;
}
- void resume() {
- if (_isCanceled) return;
- if (_isPaused) {
- _decrementPauseCount();
- if (!_isPaused) {
- if (_hasPending && !_pending.isEmpty) {
- // Input is still paused.
- _pending.schedule(this);
- } else {
- assert(_mayResumeInput);
- _state &= ~_STATE_INPUT_PAUSED;
- if (!_inCallback) _guardCallback(_onResume);
- }
+ void _setComplete() {
+ assert(_isClosed);
+ _state = _state |_STREAM_COMPLETE;
+ }
+
+ void _startFiring() {
+ assert(!_isFiring);
+ assert(!_isInCallback);
+ assert(_hasListener);
+ assert(!_isPaused);
+ // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
+ // bit. All current subscribers will now have a _LISTENER_EVENT_ID
+ // that doesn't match _STREAM_EVENT_ID, and they will receive the
+ // event being fired.
+ _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
+ }
+
+ void _endFiring(bool wasInputPaused) {
+ assert(_isFiring);
+ _state ^= _STREAM_FIRING;
+ // Had listeners, or we wouldn't have fired.
+ _checkCallbacks(true, wasInputPaused);
+ }
+
+ /**
+ * Record that a listener wants a pause from events.
+ *
+ * This methods is called from [_StreamListener.pause()].
+ * Subclasses can override this method, along with [isPaused] and
+ * [createSubscription], if they want to do a different handling of paused
+ * subscriptions, e.g., a filtering stream pausing its own source if all its
+ * subscribers are paused.
+ */
+ void _pause(_StreamListener<T> listener, Future resumeSignal) {
+ assert(identical(listener._source, this));
+ if (!listener._isSubscribed) {
+ throw new StateError("Subscription has been canceled.");
+ }
+ assert(!_isComplete); // There can be no subscribers when complete.
+ bool wasInputPaused = _isInputPaused;
+ bool wasPaused = _isPaused;
+ _incrementPauseCount(listener);
+ if (resumeSignal != null) {
+ resumeSignal.whenComplete(() { this._resume(listener, true); });
+ }
+ if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) {
+ _pendingEvents.cancelSchedule();
+ }
+ if (_isInactive && !wasInputPaused) {
+ _checkCallbacks(true, false);
+ if (!_isPaused && _hasPendingEvent) {
+ _schedulePendingEvents();
}
}
}
- void cancel() {
- if (_isCanceled) return;
- _cancel();
- if (!_inCallback) {
- // otherwise checkState will be called after firing or callback completes.
- _state |= _STATE_IN_CALLBACK;
- _onCancel();
- _pending = null;
- _state &= ~_STATE_IN_CALLBACK;
+ /** Stops pausing due to one request from the given listener. */
+ void _resume(_StreamListener<T> listener, bool fromEvent) {
+ if (!listener.isPaused) return;
+ assert(listener._isSubscribed);
+ assert(_isPaused);
+ _decrementPauseCount(listener);
+ if (!_isPaused) {
+ if (_hasPendingEvent) {
+ _state |= _STREAM_PENDING_RESUME;
+ // Controller's pause state hasn't changed.
+ // If we can fire events now, fire any pending events right away.
+ if (_isInactive) {
+ if (fromEvent) {
+ _handlePendingEvents();
+ } else {
+ _schedulePendingEvents();
+ }
+ }
+ } else if (_isInactive) {
+ _checkCallbacks(true, true);
+ if (!_isPaused && _hasPendingEvent) {
+ if (fromEvent) {
+ _handlePendingEvents();
+ } else {
+ _schedulePendingEvents();
+ }
+ }
+ }
}
}
- Future asFuture([var futureValue]) {
- _FutureImpl<T> result = new _FutureImpl<T>();
-
- // Overwrite the onDone and onError handlers.
- _onDone = () { result._setValue(futureValue); };
- _onError = (error) {
- cancel();
- result._setError(error);
- };
-
- return result;
+ /** Schedule pending events to be executed. */
+ void _schedulePendingEvents() {
+ assert(_hasPendingEvent);
+ _pendingEvents.schedule(this);
}
- // State management.
+ /** Create a subscription object. Called by [subcribe]. */
+ _StreamSubscriptionImpl<T> _createSubscription(
+ void onData(T data),
+ void onError(error),
+ void onDone(),
+ bool cancelOnError);
+
+ /**
+ * Adds a listener to this stream.
+ */
+ void _addListener(_StreamSubscriptionImpl subscription);
- bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
- bool get _isClosed => (_state & _STATE_CLOSED) != 0;
- bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
- bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
- bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
- bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
- bool get _canFire => _state < _STATE_IN_CALLBACK;
- bool get _mayResumeInput =>
- !_isPaused && (_pending == null || _pending.isEmpty);
- bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
+ /**
+ * Handle a cancel requested from a [_StreamSubscriptionImpl].
+ *
+ * This method is called from [_StreamSubscriptionImpl.cancel].
+ *
+ * If an event is currently firing, the cancel is delayed
+ * until after the subscribers have received the event.
+ */
+ void _cancel(_StreamSubscriptionImpl subscriber);
- bool get isPaused => _isPaused;
+ /**
+ * Iterate over all current subscribers and perform an action on each.
+ *
+ * Subscribers added during the iteration will not be visited.
+ * Subscribers unsubscribed during the iteration will only be removed
+ * after they have been acted on.
+ *
+ * Any change in the pause state is only reported after all subscribers have
+ * received the event.
+ *
+ * The [action] must not throw, or the controller will be left in an
+ * invalid state.
+ *
+ * This method must not be called while [isFiring] is true.
+ */
+ void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
- void _cancel() {
- _state |= _STATE_CANCELED;
- if (_hasPending) {
- _pending.cancelSchedule();
+ /**
+ * Checks whether the subscription/pause state has changed.
+ *
+ * Calls the appropriate callback if the state has changed from the
+ * provided one. Repeats calling callbacks as long as the call changes
+ * the state.
+ */
+ void _checkCallbacks(bool hadListener, bool wasPaused) {
+ assert(!_isFiring);
+ // Will be handled after the current callback.
+ if (_isInCallback) return;
+ if (_hasPendingResume && !_hasPendingEvent) {
+ _state ^= _STREAM_PENDING_RESUME;
+ }
+ _state |= _STREAM_CALLBACK;
+ while (true) {
+ bool hasListener = _hasListener;
+ bool isPaused = _isInputPaused;
+ if (hadListener != hasListener) {
+ _onSubscriptionStateChange();
+ } else if (isPaused != wasPaused) {
+ _onPauseStateChange();
+ } else {
+ _state ^= _STREAM_CALLBACK;
+ return;
+ }
+ wasPaused = isPaused;
+ hadListener = hasListener;
}
}
/**
- * Increment the pause count.
+ * Called when the first subscriber requests a pause or the last a resume.
*
- * Also marks input as paused.
+ * Read [isPaused] to see the new state.
*/
- void _incrementPauseCount() {
- _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
- }
+ void _onPauseStateChange() {}
/**
- * Decrements the pause count.
+ * Called when the first listener subscribes or the last unsubscribes.
*
- * Does not automatically unpause the input (call [_onResume]) when
- * the pause count reaches zero. This is handled elsewhere, and only
- * if there are no pending events buffered.
+ * Read [hasListener] to see what the new state is.
*/
- void _decrementPauseCount() {
- assert(_isPaused);
- _state -= _STATE_PAUSE_COUNT;
- }
+ void _onSubscriptionStateChange() {}
- // _EventSink interface.
-
- void _add(T data) {
- assert(!_isClosed);
- if (_isCanceled) return;
- if (_canFire) {
- _sendData(data);
- } else {
- _addPending(new _DelayedData(data));
+ /**
+ * Add a pending event at the end of the pending event queue.
+ *
+ * Schedules events if currently not paused and inside a callback.
+ */
+ void _addPendingEvent(_DelayedEvent event) {
+ if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
+ _StreamImplEvents events = _pendingEvents;
+ events.add(event);
+ if (_isPaused || _isFiring) return;
+ if (_isInCallback) {
+ _schedulePendingEvents();
+ return;
}
}
- void _addError(Object error) {
- if (_isCanceled) return;
- if (_canFire) {
- _sendError(error); // Reports cancel after sending.
- } else {
- _addPending(new _DelayedError(error));
- }
+ /** Fire any pending events until the pending event queue is empty. */
+ void _handlePendingEvents() {
+ assert(_isInactive);
+ if (!_hasPendingEvent) return;
+ _PendingEvents events = _pendingEvents;
+ do {
+ if (_isPaused) return;
+ if (events.isScheduled) events.cancelSchedule();
+ events.handleNext(this);
+ } while (!events.isEmpty);
}
- void _close() {
- assert(!_isClosed);
- if (_isCanceled) return;
- _state |= _STATE_CLOSED;
- if (_canFire) {
- _sendDone();
- } else {
- _addPending(const _DelayedDone());
- }
+ /**
+ * Send a data event directly to each subscriber.
+ */
+ _sendData(T value) {
+ assert(!_isPaused);
+ assert(!_isComplete);
+ if (!_hasListener) return;
+ _forEachSubscriber((subscriber) {
+ try {
+ subscriber._sendData(value);
+ } catch (e, s) {
+ _throwDelayed(e, s);
+ }
+ });
}
- // Hooks called when the input is paused, unpaused or canceled.
- // These must not throw. If overwritten to call user code, include suitable
- // try/catch wrapping and send any errors to [_throwDelayed].
- void _onPause() {
- assert(_isInputPaused);
+ /**
+ * Sends an error event directly to each subscriber.
+ */
+ void _sendError(error) {
+ assert(!_isPaused);
+ assert(!_isComplete);
+ if (!_hasListener) return;
+ _forEachSubscriber((subscriber) {
+ try {
+ subscriber._sendError(error);
+ } catch (e, s) {
+ _throwDelayed(e, s);
+ }
+ });
}
- void _onResume() {
- assert(!_isInputPaused);
+ /**
+ * Sends the "done" message directly to each subscriber.
+ * This automatically stops further subscription and
+ * unsubscribes all subscribers.
+ */
+ void _sendDone() {
+ assert(!_isPaused);
+ assert(_isClosed);
+ _setComplete();
+ if (!_hasListener) return;
+ _forEachSubscriber((subscriber) {
+ _cancel(subscriber);
+ try {
+ subscriber._sendDone();
+ } catch (e, s) {
+ _throwDelayed(e, s);
+ }
+ });
+ assert(!_hasListener);
}
+}
- void _onCancel() {
- assert(_isCanceled);
+// -------------------------------------------------------------------
+// Default implementation of a stream with a single subscriber.
+// -------------------------------------------------------------------
+/**
+ * Default implementation of stream capable of sending events to one subscriber.
+ *
+ * Any class needing to implement [Stream] can either directly extend this
+ * class, or extend [Stream] and delegate the subscribe method to an instance
+ * of this class.
+ *
+ * The only public methods are those of [Stream], so instances of
+ * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
+ * internal functionality.
+ *
+ * The [StreamController] is a public facing version of this class, with
+ * some methods made public.
+ *
+ * The user interface of [_SingleStreamImpl] are the following methods:
+ * * [_add]: Add a data event to the stream.
+ * * [_addError]: Add an error event to the stream.
+ * * [_close]: Request to close the stream.
+ * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
+ * when losing the last subscriber.
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode.
+ * * [_hasListener]: Test whether there are currently any subscribers.
+ * * [_isInputPaused]: Test whether the stream is currently paused.
+ * The user should not add new events while the stream is paused, but if it
+ * happens anyway, the stream will enqueue the events just as when new events
+ * arrive while still firing an old event.
+ */
+class _SingleStreamImpl<T> extends _StreamImpl<T> {
+ _StreamListener _subscriber = null;
+
+ /** Whether there is currently a subscriber on this [Stream]. */
+ bool get _hasListener => _subscriber != null;
+
+ // -------------------------------------------------------------------
+ // Internal implementation.
+
+ _SingleStreamImpl() {
+ // Start out paused.
+ _updatePauseCount(1);
+ }
+
+ /**
+ * Create the new subscription object.
+ */
+ _StreamSubscriptionImpl<T> _createSubscription(
+ void onData(T data),
+ void onError(error),
+ void onDone(),
+ bool cancelOnError) {
+ return new _StreamSubscriptionImpl<T>(
+ this, onData, onError, onDone, cancelOnError);
}
- // Handle pending events.
+ void _addListener(_StreamListener subscription) {
+ assert(!_isComplete);
+ if (_hasListener) {
+ throw new StateError("Stream already has subscriber.");
+ }
+ assert(_pauseCount == 1);
+ _updatePauseCount(-1);
+ _subscriber = subscription;
+ subscription._setSubscribed(0);
+ if (_isInactive) {
+ _checkCallbacks(false, true);
+ if (!_isPaused && _hasPendingEvent) {
+ _schedulePendingEvents();
+ }
+ }
+ }
/**
- * Add a pending event.
+ * Handle a cancel requested from a [_StreamSubscriptionImpl].
*
- * If the subscription is not paused, this also schedules a firing
- * of pending events later (if necessary).
+ * This method is called from [_StreamSubscriptionImpl.cancel].
*/
- void _addPending(_DelayedEvent event) {
- _StreamImplEvents pending = _pending;
- if (_pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(event);
- if (!_hasPending) {
- _state |= _STATE_HAS_PENDING;
- if (!_isPaused) {
- _pending.schedule(this);
+ void _cancel(_StreamListener subscriber) {
+ assert(identical(subscriber._source, this));
+ // We allow unsubscribing the currently firing subscription during
+ // the event firing, because it is indistinguishable from delaying it since
+ // that event has already received the event.
+ if (!identical(_subscriber, subscriber)) {
+ // You may unsubscribe more than once, only the first one counts.
+ return;
+ }
+ _subscriber = null;
+ // Unsubscribing a paused subscription also cancels its pauses.
+ int resumeCount = subscriber._setUnsubscribed();
+ // Keep being paused while there is no subscriber and the stream is not
+ // complete.
+ _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1);
+ if (_isInactive) {
+ _checkCallbacks(true, resumeCount > 0);
+ if (!_isPaused && _hasPendingEvent) {
+ _schedulePendingEvents();
}
}
}
- /* _EventDispatch interface. */
-
- void _sendData(T data) {
- assert(!_isCanceled);
+ void _forEachSubscriber(
+ void action(_StreamListener<T> subscription)) {
assert(!_isPaused);
- assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
- _state |= _STATE_IN_CALLBACK;
- try {
- _onData(data);
- } catch (e, s) {
- _throwDelayed(e, s);
- }
- _state &= ~_STATE_IN_CALLBACK;
- _checkState(wasInputPaused);
+ _StreamListener subscription = _subscriber;
+ assert(subscription != null);
+ _startFiring();
+ action(subscription);
+ _endFiring(wasInputPaused);
}
+}
- void _sendError(var error) {
- assert(!_isCanceled);
- assert(!_isPaused);
- assert(!_inCallback);
- bool wasInputPaused = _isInputPaused;
- _state |= _STATE_IN_CALLBACK;
- try {
- _onError(error);
- } catch (e, s) {
- _throwDelayed(e, s);
- }
- _state &= ~_STATE_IN_CALLBACK;
- if (_cancelOnError) {
- _cancel();
- }
- _checkState(wasInputPaused);
+// -------------------------------------------------------------------
+// Default implementation of a stream with subscribers.
+// -------------------------------------------------------------------
+
+/**
+ * Default implementation of stream capable of sending events to subscribers.
+ *
+ * Any class needing to implement [Stream] can either directly extend this
+ * class, or extend [Stream] and delegate the subscribe method to an instance
+ * of this class.
+ *
+ * The only public methods are those of [Stream], so instances of
+ * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
+ * internal functionality.
+ *
+ * The [StreamController] is a public facing version of this class, with
+ * some methods made public.
+ *
+ * The user interface of [_MultiStreamImpl] are the following methods:
+ * * [_add]: Add a data event to the stream.
+ * * [_addError]: Add an error event to the stream.
+ * * [_close]: Request to close the stream.
+ * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
+ * when losing the last subscriber.
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode.
+ * * [_hasListener]: Test whether there are currently any subscribers.
+ * * [_isPaused]: Test whether the stream is currently paused.
+ * The user should not add new events while the stream is paused, but if it
+ * happens anyway, the stream will enqueue the events just as when new events
+ * arrive while still firing an old event.
+ */
+class _MultiStreamImpl<T> extends _StreamImpl<T>
+ implements _InternalLinkList {
+ // Link list implementation (mixin when possible).
+ _InternalLink _nextLink;
+ _InternalLink _previousLink;
+
+ _MultiStreamImpl() {
+ _nextLink = _previousLink = this;
}
- void _sendDone() {
- assert(!_isCanceled);
- assert(!_isPaused);
- assert(!_inCallback);
- _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
- try {
- _onDone();
- } catch (e, s) {
- _throwDelayed(e, s);
+ bool get isBroadcast => true;
+
+ Stream<T> asBroadcastStream() => this;
+
+ // ------------------------------------------------------------------
+ // Helper functions that can be overridden in subclasses.
+
+ /** Whether there are currently any subscribers on this [Stream]. */
+ bool get _hasListener => !_InternalLinkList.isEmpty(this);
+
+ /**
+ * Create the new subscription object.
+ */
+ _StreamListener<T> _createSubscription(
+ void onData(T data),
+ void onError(error),
+ void onDone(),
+ bool cancelOnError) {
+ return new _StreamSubscriptionImpl<T>(
+ this, onData, onError, onDone, cancelOnError);
+ }
+
+ // -------------------------------------------------------------------
+ // Internal implementation.
+
+ /**
+ * Iterate over all current subscribers and perform an action on each.
+ *
+ * The set of subscribers cannot be modified during this iteration.
+ * All attempts to add or unsubscribe subscribers will be delayed until
+ * after the iteration is complete.
+ *
+ * The [action] must not throw, or the controller will be left in an
+ * invalid state.
+ *
+ * This method must not be called while [isFiring] is true.
+ */
+ void _forEachSubscriber(
+ void action(_StreamListener<T> subscription)) {
+ assert(!_isFiring);
+ if (!_hasListener) return;
+ bool wasInputPaused = _isInputPaused;
+ _startFiring();
+ _InternalLink cursor = this._nextLink;
+ while (!identical(cursor, this)) {
+ _StreamListener<T> current = cursor;
+ if (current._needsEvent(_currentEventIdBit)) {
+ action(current);
+ // Marks as having received the event.
+ current._toggleEventReceived();
+ }
+ cursor = current._nextLink;
+ if (current._isPendingUnsubscribe) {
+ _removeListener(current);
+ }
}
- _onCancel(); // No checkState after cancel, it is always the last event.
- _state &= ~_STATE_IN_CALLBACK;
+ _endFiring(wasInputPaused);
}
- /**
- * Call a hook function.
- *
- * The call is properly wrapped in code to avoid other callbacks
- * during the call, and it checks for state changes after the call
- * that should cause further callbacks.
- */
- void _guardCallback(callback) {
- assert(!_inCallback);
- bool wasInputPaused = _isInputPaused;
- _state |= _STATE_IN_CALLBACK;
- callback();
- _state &= ~_STATE_IN_CALLBACK;
- _checkState(wasInputPaused);
+ void _addListener(_StreamListener listener) {
+ listener._setSubscribed(_currentEventIdBit);
+ bool hadListener = _hasListener;
+ _InternalLinkList.add(this, listener);
+ if (!hadListener && _isInactive) {
+ _checkCallbacks(false, false);
+ if (!_isPaused && _hasPendingEvent) {
+ _schedulePendingEvents();
+ }
+ }
}
/**
- * Check if the input needs to be informed of state changes.
- *
- * State changes are pausing, resuming and canceling.
+ * Handle a cancel requested from a [_StreamListener].
*
- * After canceling, no further callbacks will happen.
+ * This method is called from [_StreamListener.cancel].
*
- * The cancel callback is called after a user cancel, or after
- * the final done event is sent.
+ * If an event is currently firing, the cancel is delayed
+ * until after the subscribers have received the event.
*/
- void _checkState(bool wasInputPaused) {
- assert(!_inCallback);
- if (_hasPending && _pending.isEmpty) {
- _state &= ~_STATE_HAS_PENDING;
- if (_isInputPaused && _mayResumeInput) {
- _state &= ~_STATE_INPUT_PAUSED;
- }
+ void _cancel(_StreamListener listener) {
+ assert(identical(listener._source, this));
+ if (_InternalLink.isUnlinked(listener)) {
+ // You may unsubscribe more than once, only the first one counts.
+ return;
}
- // If the state changes during a callback, we immediately
- // make a new state-change callback. Loop until the state didn't change.
- while (true) {
- if (_isCanceled) {
- _onCancel();
- _pending = null;
- return;
- }
- bool isInputPaused = _isInputPaused;
- if (wasInputPaused == isInputPaused) break;
- _state ^= _STATE_IN_CALLBACK;
- if (isInputPaused) {
- _onPause();
+ if (_isFiring) {
+ if (listener._needsEvent(_currentEventIdBit)) {
+ assert(listener._isSubscribed);
+ listener._setPendingUnsubscribe(_currentEventIdBit);
} else {
- _onResume();
+ // The listener has been notified of the event (or don't need to,
+ // if it's still pending subscription) so it's safe to remove it.
+ _removeListener(listener);
+ }
+ // Pause and subscription state changes are reported when we end
+ // firing.
+ } else {
+ bool wasInputPaused = _isInputPaused;
+ _removeListener(listener);
+ if (_isInactive) {
+ _checkCallbacks(true, wasInputPaused);
+ if (!_isPaused && _hasPendingEvent) {
+ _schedulePendingEvents();
+ }
}
- _state &= ~_STATE_IN_CALLBACK;
- wasInputPaused = isInputPaused;
- }
- if (_hasPending && !_isPaused) {
- _pending.schedule(this);
}
}
-}
-
-// -------------------------------------------------------------------
-// Common base class for single and multi-subscription streams.
-// -------------------------------------------------------------------
-abstract class _StreamImpl<T> extends Stream<T> {
- // ------------------------------------------------------------------
- // Stream interface.
-
- StreamSubscription<T> listen(void onData(T data),
- { void onError(error),
- void onDone(),
- bool cancelOnError }) {
- if (onData == null) onData = _nullDataHandler;
- if (onError == null) onError = _nullErrorHandler;
- if (onDone == null) onDone = _nullDoneHandler;
- cancelOnError = identical(true, cancelOnError);
- StreamSubscription subscription =
- _createSubscription(onData, onError, onDone, cancelOnError);
- _onListen(subscription);
- return subscription;
- }
- // -------------------------------------------------------------------
- /** Create a subscription object. Called by [subcribe]. */
- _BufferingStreamSubscription<T> _createSubscription(
- void onData(T data),
- void onError(error),
- void onDone(),
- bool cancelOnError) {
- return new _BufferingStreamSubscription<T>(
- onData, onError, onDone, cancelOnError);
+ /**
+ * Removes a listener from this stream and cancels its pauses.
+ *
+ * This is a low-level action that doesn't call [_onSubscriptionStateChange].
+ * or [_callOnPauseStateChange].
+ */
+ void _removeListener(_StreamListener listener) {
+ int pauseCount = listener._setUnsubscribed();
+ _InternalLinkList.remove(listener);
+ if (pauseCount > 0) {
+ _updatePauseCount(-pauseCount);
+ if (!_isPaused && _hasPendingEvent) {
+ _state |= _STREAM_PENDING_RESUME;
+ }
+ }
}
-
- /** Hook called when the subscription has been created. */
- void _onListen(StreamSubscription subscription) {}
}
-typedef _PendingEvents _EventGenerator();
/** Stream that generates its own events. */
-class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
- final _EventGenerator _pending;
+class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> {
/**
- * Initializes the stream to have only the events provided by a
- * [_PendingEvents].
+ * Initializes the stream to have only the events provided by [events].
*
- * A new [_PendingEvents] must be generated for each listen.
+ * A [_PendingEvents] implementation provides events that are handled
+ * by calling [_PendingEvents.handleNext] with the [_StreamImpl].
*/
- _GeneratedStreamImpl(this._pending);
-
- StreamSubscription _createSubscription(void onData(T data),
- void onError(Object error),
- void onDone(),
- bool cancelOnError) {
- _BufferingStreamSubscription<T> subscription =
- new _BufferingStreamSubscription(
- onData, onError, onDone, cancelOnError);
- subscription._setPendingEvents(_pending());
- return subscription;
+ _GeneratedSingleStreamImpl(_PendingEvents events) {
+ _pendingEvents = events;
+ _setClosed(); // Closed for input since all events are already pending.
+ }
+
+ void _add(T value) {
+ throw new UnsupportedError("Cannot inject events into generated stream");
+ }
+
+ void _addError(value) {
+ throw new UnsupportedError("Cannot inject events into generated stream");
+ }
+
+ void _close() {
+ throw new UnsupportedError("Cannot inject events into generated stream");
}
}
/** Pending events object that gets its events from an [Iterable]. */
class _IterablePendingEvents<T> extends _PendingEvents {
- // The iterator providing data for data events.
- // Set to null when iteration has completed.
- Iterator<T> _iterator;
+ final Iterator<T> _iterator;
+ /**
+ * Whether there are no more events to be sent.
+ *
+ * This starts out as [:false:] since there is always at least
+ * a 'done' event to be sent.
+ */
+ bool _isDone = false;
_IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
- bool get isEmpty => _iterator == null;
+ bool get isEmpty => _isDone;
- void handleNext(_EventDispatch dispatch) {
- if (_iterator == null) {
- throw new StateError("No events pending.");
- }
- // Send one event per call to moveNext.
- // If moveNext returns true, send the current element as data.
- // If moveNext returns false, send a done event and clear the _iterator.
- // If moveNext throws an error, send an error and clear the _iterator.
- // After an error, no further events will be sent.
- bool isDone;
+ void handleNext(_StreamImpl<T> stream) {
+ if (_isDone) throw new StateError("No events pending.");
try {
- isDone = !_iterator.moveNext();
+ _isDone = !_iterator.moveNext();
+ if (!_isDone) {
+ stream._sendData(_iterator.current);
+ } else {
+ stream._sendDone();
+ }
} catch (e, s) {
- _iterator = null;
- dispatch._sendError(_asyncError(e, s));
- return;
- }
- if (!isDone) {
- dispatch._sendData(_iterator.current);
- } else {
- _iterator = null;
- dispatch._sendDone();
+ stream._sendError(_asyncError(e, s));
+ stream._sendDone();
+ _isDone = true;
}
}
+}
+
+
+/**
+ * The subscription class that the [StreamController] uses.
+ *
+ * The [_StreamImpl.createSubscription] method should
+ * create an object of this type, or another subclass of [_StreamListener].
+ * A subclass of [_StreamImpl] can specify which subclass
+ * of [_StreamSubscriptionImpl] it uses by overriding
+ * [_StreamImpl.createSubscription].
+ *
+ * The subscription is in one of three states:
+ * * Subscribed.
+ * * Paused-and-subscribed.
+ * * Unsubscribed.
+ * Unsubscribing also resumes any pauses started by the subscription.
+ */
+class _StreamSubscriptionImpl<T> extends _StreamListener<T>
+ implements StreamSubscription<T> {
+ final bool _cancelOnError;
+ // TODO(ahe): Restore type when feature is implemented in dart2js
+ // checked mode. http://dartbug.com/7733
+ var /* _DataHandler<T> */ _onData;
+ _ErrorHandler _onError;
+ _DoneHandler _onDone;
+ _StreamSubscriptionImpl(_StreamImpl source,
+ this._onData,
+ this._onError,
+ this._onDone,
+ this._cancelOnError) : super(source);
- void clear() {
- if (isScheduled) cancelSchedule();
- _iterator = null;
+ void onData(void handleData(T event)) {
+ if (handleData == null) handleData = _nullDataHandler;
+ _onData = handleData;
}
-}
+ void onError(void handleError(error)) {
+ if (handleError == null) handleError = _nullErrorHandler;
+ _onError = handleError;
+ }
+
+ void onDone(void handleDone()) {
+ if (handleDone == null) handleDone = _nullDoneHandler;
+ _onDone = handleDone;
+ }
+
+ void _sendData(T data) {
+ _onData(data);
+ }
+
+ void _sendError(error) {
+ _onError(error);
+ if (_cancelOnError) _source._cancel(this);
+ }
+
+ void _sendDone() {
+ _onDone();
+ }
+
+ void cancel() {
+ if (!_isSubscribed) return;
+ _source._cancel(this);
+ }
+
+ void pause([Future resumeSignal]) {
+ if (!_isSubscribed) return;
+ _source._pause(this, resumeSignal);
+ }
+
+ void resume() {
+ if (!_isSubscribed || !isPaused) return;
+ _source._resume(this, false);
+ }
+
+ Future asFuture([var futureValue]) {
+ _FutureImpl<T> result = new _FutureImpl<T>();
+
+ // Overwrite the onDone and onError handlers.
+ onDone(() { result._setValue(futureValue); });
+ onError((error) {
+ cancel();
+ result._setError(error);
+ });
+
+ return result;
+ }
+}
// Internal helpers.
@@ -558,20 +998,20 @@ void _nullErrorHandler(error) {
void _nullDoneHandler() {}
-/** A delayed event on a buffering stream subscription. */
+/** A delayed event on a stream implementation. */
abstract class _DelayedEvent {
/** Added as a linked list on the [StreamController]. */
_DelayedEvent next;
/** Execute the delayed event on the [StreamController]. */
- void perform(_EventDispatch dispatch);
+ void perform(_StreamImpl stream);
}
/** A delayed data event. */
class _DelayedData<T> extends _DelayedEvent{
final T value;
_DelayedData(this.value);
- void perform(_EventDispatch<T> dispatch) {
- dispatch._sendData(value);
+ void perform(_StreamImpl<T> stream) {
+ stream._sendData(value);
}
}
@@ -579,16 +1019,16 @@ class _DelayedData<T> extends _DelayedEvent{
class _DelayedError extends _DelayedEvent {
final error;
_DelayedError(this.error);
- void perform(_EventDispatch dispatch) {
- dispatch._sendError(error);
+ void perform(_StreamImpl stream) {
+ stream._sendError(error);
}
}
/** A delayed done event. */
class _DelayedDone implements _DelayedEvent {
const _DelayedDone();
- void perform(_EventDispatch dispatch) {
- dispatch._sendDone();
+ void perform(_StreamImpl stream) {
+ stream._sendDone();
}
_DelayedEvent get next => null;
@@ -678,66 +1118,118 @@ abstract class _InternalLinkList extends _InternalLink {
}
}
-/** Superclass for provider of pending events. */
-abstract class _PendingEvents {
- // No async event has been scheduled.
- static const int _STATE_UNSCHEDULED = 0;
- // An async event has been scheduled to run a function.
- static const int _STATE_SCHEDULED = 1;
- // An async event has been scheduled, but it will do nothing when it runs.
- // Async events can't be preempted.
- static const int _STATE_CANCELED = 3;
+/** Abstract type for an internal interface for sending events. */
+abstract class _EventOutputSink<T> {
+ _sendData(T data);
+ _sendError(error);
+ _sendDone();
+}
+
+abstract class _StreamListener<T> extends _InternalLink
+ implements _EventOutputSink<T> {
+ final _StreamImpl _source;
+ int _state = _LISTENER_UNSUBSCRIBED;
+
+ _StreamListener(this._source);
+
+ bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
+
+ bool get _isPendingUnsubscribe =>
+ (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
+
+ bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
/**
- * State of being scheduled.
+ * Whether the listener still needs to receive the currently firing event.
*
- * Set to [_STATE_SCHEDULED] when pending events are scheduled for
- * async dispatch. Since we can't cancel a [runAsync] call, if schduling
- * is "canceled", the _state is simply set to [_STATE_CANCELED] which will
- * make the async code do nothing except resetting [_state].
+ * The currently firing event is identified by a single bit, which alternates
+ * between events. The [_state] contains the previously sent event's bit in
+ * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
+ * still need the current event.
+ */
+ bool _needsEvent(int currentEventIdBit) {
+ int lastEventIdBit =
+ (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
+ return lastEventIdBit != currentEventIdBit;
+ }
+
+ /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
+ /// we are currently firing an event and the subscriber still need to receive
+ /// the event.
+ void _toggleEventReceived() {
+ _state ^= _LISTENER_EVENT_ID;
+ }
+
+ void _setSubscribed(int eventIdBit) {
+ assert(eventIdBit == 0 || eventIdBit == 1);
+ _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
+ }
+
+ void _setPendingUnsubscribe(int currentEventIdBit) {
+ assert(_isSubscribed);
+ // Sets the pending unsubscribe, and ensures that the listener
+ // won't get the current event.
+ _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID;
+ _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT;
+ assert(!_needsEvent(currentEventIdBit));
+ }
+
+ /**
+ * Marks the listener as unsubscibed.
*
- * If events are scheduled while the state is [_STATE_CANCELED], it is
- * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync]
- * is performed.
+ * Returns the number of unresumed pauses for the listener.
*/
- int _state = _STATE_UNSCHEDULED;
+ int _setUnsubscribed() {
+ assert(_isSubscribed);
+ int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
+ _state = _LISTENER_UNSUBSCRIBED;
+ return timesPaused;
+ }
- bool get isEmpty;
+ void _incrementPauseCount() {
+ _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
+ }
+
+ void _decrementPauseCount() {
+ assert(isPaused);
+ _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
+ }
- bool get isScheduled => _state == _STATE_SCHEDULED;
- bool get _eventScheduled => _state >= _STATE_SCHEDULED;
+ _sendData(T data);
+ _sendError(error);
+ _sendDone();
+}
+/** Superclass for provider of pending events. */
+abstract class _PendingEvents {
/**
- * Schedule an event to run later.
+ * Timer set when pending events are scheduled for execution.
*
- * If called more than once, it should be called with the same dispatch as
- * argument each time. It may reuse an earlier argument in some cases.
+ * When scheduling pending events for execution in a later cycle, the timer
+ * is stored here. If pending events are executed earlier than that, e.g.,
+ * due to a second event in the current cycle, the timer is canceled again.
*/
- void schedule(_EventDispatch dispatch) {
+ Timer scheduleTimer = null;
+
+ bool get isEmpty;
+
+ bool get isScheduled => scheduleTimer != null;
+
+ void schedule(_StreamImpl stream) {
if (isScheduled) return;
- assert(!isEmpty);
- if (_eventScheduled) {
- assert(_state == _STATE_CANCELED);
- _state = _STATE_SCHEDULED;
- return;
- }
- runAsync(() {
- int oldState = _state;
- _state = _STATE_UNSCHEDULED;
- if (oldState == _STATE_CANCELED) return;
- handleNext(dispatch);
+ scheduleTimer = new Timer(Duration.ZERO, () {
+ scheduleTimer = null;
+ stream._handlePendingEvents();
});
- _state = _STATE_SCHEDULED;
}
void cancelSchedule() {
- if (isScheduled) _state = _STATE_CANCELED;
+ assert(isScheduled);
+ scheduleTimer.cancel();
+ scheduleTimer = null;
}
- void handleNext(_EventDispatch dispatch);
-
- /** Throw away any pending events and cancel scheduled events. */
- void clear();
+ void handleNext(_StreamImpl stream);
}
@@ -750,6 +1242,8 @@ class _StreamImplEvents extends _PendingEvents {
bool get isEmpty => lastPendingEvent == null;
+ bool get isScheduled => scheduleTimer != null;
+
void add(_DelayedEvent event) {
if (lastPendingEvent == null) {
firstPendingEvent = lastPendingEvent = event;
@@ -758,465 +1252,122 @@ class _StreamImplEvents extends _PendingEvents {
}
}
- void handleNext(_EventDispatch dispatch) {
+ void handleNext(_StreamImpl stream) {
assert(!isScheduled);
_DelayedEvent event = firstPendingEvent;
firstPendingEvent = event.next;
if (firstPendingEvent == null) {
lastPendingEvent = null;
}
- event.perform(dispatch);
- }
-
- void clear() {
- if (isScheduled) cancelSchedule();
- firstPendingEvent = lastPendingEvent = null;
- }
-}
-
-class _MultiplexerLinkedList {
- _MultiplexerLinkedList _next;
- _MultiplexerLinkedList _previous;
-
- void _unlink() {
- _previous._next = _next;
- _next._previous = _previous;
- _next = _previous = this;
- }
-
- void _insertBefore(_MultiplexerLinkedList newNext) {
- _MultiplexerLinkedList newPrevious = newNext._previous;
- newPrevious._next = this;
- newNext._previous = _previous;
- _previous._next = newNext;
- _previous = newPrevious;
- }
-}
-
-/**
- * A subscription used by [_SingleStreamMultiplexer].
- *
- * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
- * listeners at a time. It is used to implement [Stream.asBroadcastStream].
- *
- * It is itself listening to another stream for events, and it forwards all
- * events to all of its simultanous listeners.
- *
- * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
- */
-// TODO(lrn): Change "implements" to "with" when automatic mixin constructors
-// are implemented.
-class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
- implements _MultiplexerLinkedList {
- static const int _STATE_NOT_LISTENING = 0;
- // Bit that alternates between event firings. If bit matches the one currently
- // firing, the subscription will not be notified.
- static const int _STATE_EVENT_ID_BIT = 1;
- // Whether the subscription is listening at all. This should be set while
- // it is part of the linked list of listeners of a multiplexer stream.
- static const int _STATE_LISTENING = 2;
- // State bit set while firing an event.
- static const int _STATE_IS_FIRING = 4;
- // Bit set if a subscription is canceled while it's firing (the
- // [_STATE_IS_FIRING] bit is set).
- // If the subscription is canceled while firing, it is not removed from the
- // linked list immediately (to avoid breaking iteration), but is instead
- // removed after it is done firing.
- static const int _STATE_REMOVE_AFTER_FIRING = 8;
-
- // Firing state.
- int _multiplexState;
-
- _SingleStreamMultiplexer _source;
-
- _MultiplexerSubscription(this._source,
- void onData(T data),
- void onError(Object error),
- void onDone(),
- bool cancelOnError,
- int nextEventId)
- : _multiplexState = _STATE_LISTENING | nextEventId,
- super(onData, onError, onDone, cancelOnError) {
- _next = _previous = this;
- }
-
- // Mixin workaround.
- _MultiplexerLinkedList _next;
- _MultiplexerLinkedList _previous;
-
- void _unlink() {
- _previous._next = _next;
- _next._previous = _previous;
- _next = _previous = this;
- }
-
- void _insertBefore(_MultiplexerLinkedList newNext) {
- _MultiplexerLinkedList newPrevious = newNext._previous;
- newPrevious._next = this;
- newNext._previous = _previous;
- _previous._next = newNext;
- _previous = newPrevious;
- }
- // End mixin.
-
- bool get _isListening => _multiplexState >= _STATE_LISTENING;
- bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
- bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
- int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
-
- void _setRemoveAfterFiring() {
- assert(_isFiring);
- _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
- }
-
- void _startFiring() {
- assert(!_isFiring);
- _multiplexState |= _STATE_IS_FIRING;
- }
-
- /// Marks listener as no longer firing, and toggles its event id.
- void _endFiring() {
- assert(_isFiring);
- _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
- }
-
- void _setNotListening() {
- assert(_isListening);
- _multiplexState = _STATE_NOT_LISTENING;
- }
-
- void _onCancel() {
- assert(_isListening);
- _source._removeListener(this);
+ event.perform(stream);
}
}
-/**
- * A stream that sends events from another stream to multiple listeners.
- *
- * This is used to implement [Stream.asBroadcastStream].
- *
- * This stream allows listening more than once.
- * When the first listener is added, it starts listening on its source
- * stream for events. All events from the source stream are sent to all
- * active listeners. The listeners handle their own buffering.
- * When the last listener cancels, the source stream subscription is also
- * canceled, and no further listening is possible.
- */
-// TODO(lrn): change "implements" to "with" when the VM supports it.
-class _SingleStreamMultiplexer<T> extends Stream<T>
- implements _MultiplexerLinkedList,
- _EventDispatch<T> {
- final Stream<T> _source;
- StreamSubscription<T> _subscription;
- // Alternates between zero and one for each event fired.
- // Listeners are initialized with the next event's id, and will
- // only be notified if they match the event being fired.
- // That way listeners added during event firing will not receive
- // the current event.
- int _eventId = 0;
-
- bool _isFiring = false;
- // Remember events added while firing.
- _StreamImplEvents _pending;
+class _DoneSubscription<T> implements StreamSubscription<T> {
+ _DoneHandler _handler;
+ Timer _timer;
+ int _pauseCount = 0;
- _SingleStreamMultiplexer(this._source) {
- _next = _previous = this;
+ _DoneSubscription(this._handler) {
+ _delayDone();
}
- bool get _hasPending => _pending != null && !_pending.isEmpty;
-
- // Should be mixin.
- _MultiplexerLinkedList _next;
- _MultiplexerLinkedList _previous;
-
- void _unlink() {
- _previous._next = _next;
- _next._previous = _previous;
- _next = _previous = this;
+ void _delayDone() {
+ assert(_timer == null && _pauseCount == 0);
+ _timer = new Timer(Duration.ZERO, () {
+ if (_handler != null) _handler();
+ });
}
- void _insertBefore(_MultiplexerLinkedList newNext) {
- _MultiplexerLinkedList newPrevious = newNext._previous;
- newPrevious._next = this;
- newNext._previous = _previous;
- _previous._next = newNext;
- _previous = newPrevious;
- }
- // End of mixin.
+ bool get _isComplete => _timer == null && _pauseCount == 0;
- StreamSubscription<T> listen(void onData(T data),
- { void onError(Object error),
- void onDone(),
- bool cancelOnError }) {
- if (onData == null) onData = _nullDataHandler;
- if (onError == null) onError = _nullErrorHandler;
- if (onDone == null) onDone = _nullDoneHandler;
- cancelOnError = identical(true, cancelOnError);
- _MultiplexerSubscription subscription =
- new _MultiplexerSubscription(this, onData, onError, onDone,
- cancelOnError, _eventId);
- if (_subscription == null) {
- _subscription = _source.listen(_add, onError: _addError, onDone: _close);
- }
- subscription._insertBefore(this);
- return subscription;
- }
+ void onData(void handleAction(T value)) {}
- /** Called by [_MultiplexerSubscription.remove]. */
- void _removeListener(_MultiplexerSubscription listener) {
- if (listener._isFiring) {
- listener._setRemoveAfterFiring();
- } else {
- _unlinkListener(listener);
- }
- }
+ void onError(void handleError(error)) {}
- /** Remove a listener and close the subscription after the last one. */
- void _unlinkListener(_MultiplexerSubscription listener) {
- listener._setNotListening();
- listener._unlink();
- if (identical(_next, this)) {
- // Last listener removed.
- _cancel();
- }
+ void onDone(void handleDone()) {
+ _handler = handleDone;
}
- void _cancel() {
- StreamSubscription subscription = _subscription;
- _subscription = null;
- subscription.cancel();
- if (_pending != null) _pending.cancelSchedule();
- }
-
- void _forEachListener(void action(_MultiplexerSubscription listener)) {
- int eventId = _eventId;
- _eventId ^= 1;
- _isFiring = true;
- _MultiplexerLinkedList entry = _next;
- // Call each listener in order. A listener can be removed during any
- // other listener's event. During its own event it will only be marked
- // as "to be removed", and it will be handled after the event is done.
- while (!identical(entry, this)) {
- _MultiplexerSubscription listener = entry;
- if (listener._eventId == eventId) {
- listener._startFiring();
- action(listener);
- listener._endFiring(); // Also toggles the event id.
- }
- entry = listener._next;
- if (listener._removeAfterFiring) {
- _unlinkListener(listener);
- }
+ void pause([Future signal]) {
+ if (_isComplete) return;
+ if (_timer != null) {
+ _timer.cancel();
+ _timer = null;
}
- _isFiring = false;
+ _pauseCount++;
+ if (signal != null) signal.whenComplete(resume);
}
- void _add(T data) {
- if (_isFiring || _hasPending) {
- _StreamImplEvents pending = _pending;
- if (pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(new _DelayedData(data));
- } else {
- _sendData(data);
+ void resume() {
+ if (_isComplete) return;
+ if (_pauseCount == 0) return;
+ _pauseCount--;
+ if (_pauseCount == 0) {
+ _delayDone();
}
}
- void _addError(Object error) {
- if (_isFiring || _hasPending) {
- _StreamImplEvents pending = _pending;
- if (pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(new _DelayedError(error));
- } else {
- _sendError(error);
- }
- }
+ bool get isPaused => _pauseCount > 0;
- void _close() {
- if (_isFiring || _hasPending) {
- _StreamImplEvents pending = _pending;
- if (pending == null) pending = _pending = new _StreamImplEvents();
- pending.add(const _DelayedDone());
- } else {
- _sendDone();
+ void cancel() {
+ if (_isComplete) return;
+ if (_timer != null) {
+ _timer.cancel();
+ _timer = null;
}
+ _pauseCount = 0;
}
- void _sendData(T data) {
- _forEachListener((_MultiplexerSubscription listener) {
- listener._add(data);
- });
- if (_hasPending) {
- _pending.schedule(this);
- }
- }
+ Future asFuture([var futureValue]) {
+ // TODO(floitsch): share more code.
+ _FutureImpl<T> result = new _FutureImpl<T>();
- void _sendError(Object error) {
- _forEachListener((_MultiplexerSubscription listener) {
- listener._addError(error);
+ // Overwrite the onDone and onError handlers.
+ onDone(() { result._setValue(futureValue); });
+ onError((error) {
+ cancel();
+ result._setError(error);
});
- if (_hasPending) {
- _pending.schedule(this);
- }
- }
- void _sendDone() {
- _forEachListener((_MultiplexerSubscription listener) {
- listener._setRemoveAfterFiring();
- listener._close();
- });
+ return result;
}
}
+class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> {
+ final Stream<T> _source;
+ StreamSubscription<T> _subscription;
-/**
- * Simple implementation of [StreamIterator].
- */
-class _StreamIteratorImpl<T> implements StreamIterator<T> {
- // Internal state of the stream iterator.
- // At any time, it is in one of these states.
- // The interpretation of the [_futureOrPrefecth] field depends on the state.
- // In _STATE_MOVING, the _data field holds the most recently returned
- // future.
- // When in one of the _STATE_EXTRA_* states, the it may hold the
- // next data/error object, and the subscription is paused.
-
- /// The simple state where [_data] holds the data to return, and [moveNext]
- /// is allowed. The subscription is actively listening.
- static const int _STATE_FOUND = 0;
- /// State set after [moveNext] has returned false or an error,
- /// or after calling [cancel]. The subscription is always canceled.
- static const int _STATE_DONE = 1;
- /// State set after calling [moveNext], but before its returned future has
- /// completed. Calling [moveNext] again is not allowed in this state.
- /// The subscription is actively listening.
- static const int _STATE_MOVING = 2;
- /// States set when another event occurs while in _STATE_FOUND.
- /// This extra overflow event is cached until the next call to [moveNext],
- /// which will complete as if it received the event normally.
- /// The subscription is paused in these states, so we only ever get one
- /// event too many.
- static const int _STATE_EXTRA_DATA = 3;
- static const int _STATE_EXTRA_ERROR = 4;
- static const int _STATE_EXTRA_DONE = 5;
-
- /// Subscription being listened to.
- StreamSubscription _subscription;
-
- /// The current element represented by the most recent call to moveNext.
- ///
- /// Is null between the time moveNext is called and its future completes.
- T _current = null;
-
- /// The future returned by the most recent call to [moveNext].
- ///
- /// Also used to store the next value/error in case the stream provides an
- /// event before [moveNext] is called again. In that case, the stream will
- /// be paused to prevent further events.
- var _futureOrPrefetch = null;
-
- /// The current state.
- int _state = _STATE_FOUND;
-
- _StreamIteratorImpl(final Stream<T> stream) {
- _subscription = stream.listen(_onData,
- onError: _onError,
- onDone: _onDone,
- cancelOnError: true);
- }
-
- T get current => _current;
-
- Future<bool> moveNext() {
- if (_state == _STATE_DONE) {
- return new _FutureImpl<bool>.immediate(false);
- }
- if (_state == _STATE_MOVING) {
- throw new StateError("Already waiting for next.");
- }
- if (_state == _STATE_FOUND) {
- _state = _STATE_MOVING;
- _futureOrPrefetch = new _FutureImpl<bool>();
- return _futureOrPrefetch;
+ _SingleStreamMultiplexer(this._source);
+
+ void _callOnPauseStateChange() {
+ if (_isPaused) {
+ if (_subscription != null) {
+ _subscription.pause();
+ }
} else {
- assert(_state >= _STATE_EXTRA_DATA);
- switch (_state) {
- case _STATE_EXTRA_DATA:
- _state = _STATE_FOUND;
- _current = _futureOrPrefetch;
- _futureOrPrefetch = null;
- _subscription.resume();
- return new _FutureImpl<bool>.immediate(true);
- case _STATE_EXTRA_ERROR:
- Object prefetch = _futureOrPrefetch;
- _clear();
- return new _FutureImpl<bool>.immediateError(prefetch);
- case _STATE_EXTRA_DONE:
- _clear();
- return new _FutureImpl<bool>.immediate(false);
+ if (_subscription != null) {
+ _subscription.resume();
}
}
}
- /** Clears up the internal state when the iterator ends. */
- void _clear() {
- _subscription = null;
- _futureOrPrefetch = null;
- _current = null;
- _state = _STATE_DONE;
- }
-
- void cancel() {
- StreamSubscription subscription = _subscription;
- if (_state == _STATE_MOVING) {
- _FutureImpl<bool> hasNext = _futureOrPrefetch;
- _clear();
- hasNext._setValue(false);
+ /**
+ * Subscribe or unsubscribe on [_source] depending on whether
+ * [_stream] has subscribers.
+ */
+ void _onSubscriptionStateChange() {
+ if (_hasListener) {
+ assert(_subscription == null);
+ _subscription = _source.listen(this._add,
+ onError: this._addError,
+ onDone: this._close);
} else {
- _clear();
- }
- subscription.cancel();
- }
-
- void _onData(T data) {
- if (_state == _STATE_MOVING) {
- _current = data;
- _FutureImpl<bool> hasNext = _futureOrPrefetch;
- _futureOrPrefetch = null;
- _state = _STATE_FOUND;
- hasNext._setValue(true);
- return;
- }
- _subscription.pause();
- assert(_futureOrPrefetch == null);
- _futureOrPrefetch = data;
- _state = _STATE_EXTRA_DATA;
- }
-
- void _onError(Object error) {
- if (_state == _STATE_MOVING) {
- _FutureImpl<bool> hasNext = _futureOrPrefetch;
- // We have cancelOnError: true, so the subscription is canceled.
- _clear();
- hasNext._setError(error);
- return;
- }
- _subscription.pause();
- assert(_futureOrPrefetch == null);
- _futureOrPrefetch = error;
- _state = _STATE_EXTRA_ERROR;
- }
-
- void _onDone() {
- if (_state == _STATE_MOVING) {
- _FutureImpl<bool> hasNext = _futureOrPrefetch;
- _clear();
- hasNext._setValue(false);
- return;
+ // TODO(lrn): Check why this can happen.
+ if (_subscription == null) return;
+ _subscription.cancel();
+ _subscription = null;
}
- _subscription.pause();
- _futureOrPrefetch = null;
- _state = _STATE_EXTRA_DONE;
}
}
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698