Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 5987050a586da99de0d00b8f31a99d06c08531ad..8fa1848d14ac0261b2943dfc41c4ba314d457162 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -4,6 +4,57 @@ |
part of dart.async; |
+// States shared by single/multi stream implementations. |
+ |
+// Completion state of the stream. |
+/// Initial and default state where the stream can receive and send events. |
+const int _STREAM_OPEN = 0; |
+/// The stream has received a request to complete, but hasn't done so yet. |
+/// No further events can be added to the stream. |
+const int _STREAM_CLOSED = 1; |
+/// The stream has completed and will no longer receive or send events. |
+/// Also counts as closed. The stream must not be paused when it's completed. |
+/// Always used in conjunction with [_STREAM_CLOSED]. |
+const int _STREAM_COMPLETE = 2; |
+ |
+/// Bit that alternates between events, and listeners are updated to the |
+/// current value when they are notified of the event. |
+const int _STREAM_EVENT_ID = 4; |
+const int _STREAM_EVENT_ID_SHIFT = 2; |
+ |
+// The activity state of the stream: What is it currently doing. |
+/// Bit set while firing and clear while not. |
+const int _STREAM_FIRING = 8; |
+/// Bit set while calling a pause-state or subscription-state change callback. |
+const int _STREAM_CALLBACK = 16; |
+ |
+// The pause state of the stream. |
+/// Bit set when resuming with pending events. Cleared after all pending events |
+/// have been transmitted. Means that the controller still considers the |
+/// stream paused, even if the listener doesn't. |
+const int _STREAM_PENDING_RESUME = 32; |
+/// The count of times a stream has paused is stored in the |
+/// state, shifted by this amount. |
+const int _STREAM_PAUSE_COUNT_SHIFT = 6; |
+ |
+// States for listeners. |
+ |
+/// The listener is currently not subscribed to its source stream. |
+const int _LISTENER_UNSUBSCRIBED = 0; |
+/// The listener is actively subscribed to its source stream. |
+const int _LISTENER_SUBSCRIBED = 1; |
+/// The listener is subscribed until it has been notified of the current event. |
+/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. |
+const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
+ |
+/// Bit that contains the last sent event's "id bit". |
+const int _LISTENER_EVENT_ID = 4; |
+const int _LISTENER_EVENT_ID_SHIFT = 2; |
+ |
+/// The count of times a listener has paused is stored in the |
+/// state, shifted by this amount. |
+const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
+ |
/** Throws the given error in the next cycle. */ |
_throwDelayed(var error, [Object stackTrace]) { |
// We are going to reach the top-level here, but there might be a global |
@@ -18,525 +69,914 @@ _throwDelayed(var error, [Object stackTrace]) { |
}); |
} |
-/** Abstract and private interface for a place to put events. */ |
-abstract class _EventSink<T> { |
- void _add(T data); |
- void _addError(Object error); |
- void _close(); |
-} |
-/** |
- * Abstract and private interface for a place to send events. |
- * |
- * Used by event buffering to finally dispatch the pending event, where |
- * [_EventSink] is where the event first enters the stream subscription, |
- * and may yet be buffered. |
- */ |
-abstract class _EventDispatch<T> { |
- void _sendData(T data); |
- void _sendError(Object error); |
- void _sendDone(); |
-} |
+// ------------------------------------------------------------------- |
+// Common base class for single and multi-subscription streams. |
+// ------------------------------------------------------------------- |
+abstract class _StreamImpl<T> extends Stream<T> { |
+ /** Current state of the stream. */ |
+ int _state = _STREAM_OPEN; |
-/** |
- * Default implementation of stream subscription of buffering events. |
- * |
- * The only public methods are those of [StreamSubscription], so instances of |
- * [_BufferingStreamSubscription] can be returned directly as a |
- * [StreamSubscription] without exposing internal functionality. |
- * |
- * The [StreamController] is a public facing version of [Stream] and this class, |
- * with some methods made public. |
- * |
- * The user interface of [_BufferingStreamSubscription] are the following |
- * methods: |
- * * [_add]: Add a data event to the stream. |
- * * [_addError]: Add an error event to the stream. |
- * * [_close]: Request to close the stream. |
- * * [_onCancel]: Called when the subscription will provide no more events, |
- * either due to being actively canceled, or after sending a done event. |
- * * [_onPause]: Called when the subscription wants the event source to pause. |
- * * [_onResume]: Called when allowing new events after a pause. |
- * The user should not add new events when the subscription requests a paused, |
- * but if it happens anyway, the subscription will enqueue the events just as |
- * when new events arrive while still firing an old event. |
- */ |
-class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
- _EventSink<T>, |
- _EventDispatch<T> { |
- /** The `cancelOnError` flag from the `listen` call. */ |
- static const int _STATE_CANCEL_ON_ERROR = 1; |
/** |
- * Whether the "done" event has been received. |
- * No further events are accepted after this. |
- */ |
- static const int _STATE_CLOSED = 2; |
- /** |
- * Set if the input has been asked not to send events. |
+ * List of pending events. |
* |
- * This is not the same as being paused, since the input will remain paused |
- * after a call to [resume] if there are pending events. |
+ * If events are added to the stream (using [_add], [_addError] or [_done]) |
+ * while the stream is paused, or while another event is firing, events will |
+ * stored here. |
+ * Also supports scheduling the events for later execution. |
*/ |
- static const int _STATE_INPUT_PAUSED = 4; |
+ _PendingEvents _pendingEvents; |
+ |
+ // ------------------------------------------------------------------ |
+ // Stream interface. |
+ |
+ StreamSubscription<T> listen(void onData(T data), |
+ { void onError(error), |
+ void onDone(), |
+ bool cancelOnError }) { |
+ if (_isComplete) { |
+ return new _DoneSubscription(onDone); |
+ } |
+ if (onData == null) onData = _nullDataHandler; |
+ if (onError == null) onError = _nullErrorHandler; |
+ if (onDone == null) onDone = _nullDoneHandler; |
+ cancelOnError = identical(true, cancelOnError); |
+ _StreamSubscriptionImpl subscription = |
+ _createSubscription(onData, onError, onDone, cancelOnError); |
+ _addListener(subscription); |
+ return subscription; |
+ } |
+ |
+ // ------------------------------------------------------------------ |
+ // EventSink interface-like methods for sending events into the stream. |
+ // It's the responsibility of the caller to ensure that the stream is not |
+ // paused when adding events. If the stream is paused, the events will be |
+ // queued, but it's better to not send events at all. |
+ |
/** |
- * Whether the subscription has been canceled. |
- * |
- * Set by calling [cancel], or by handling a "done" event, or an "error" event |
- * when `cancelOnError` is true. |
+ * Send or queue a data event. |
*/ |
- static const int _STATE_CANCELED = 8; |
- static const int _STATE_IN_CALLBACK = 16; |
- static const int _STATE_HAS_PENDING = 32; |
- static const int _STATE_PAUSE_COUNT = 64; |
- static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
- |
- /* Event handlers provided in constructor. */ |
- /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands |
- * parameterized function types. */ |
- Function _onData; |
- _ErrorHandler _onError; |
- _DoneHandler _onDone; |
- |
- /** Bit vector based on state-constants above. */ |
- int _state; |
+ void _add(T value) { |
+ if (_isClosed) throw new StateError("Sending on closed stream"); |
+ if (!_mayFireState) { |
+ // Not the time to send events. |
+ _addPendingEvent(new _DelayedData<T>(value)); |
+ return; |
+ } |
+ if (_hasPendingEvent) { |
+ _addPendingEvent(new _DelayedData<T>(value)); |
+ } else { |
+ _sendData(value); |
+ } |
+ _handlePendingEvents(); |
+ } |
/** |
- * Queue of pending events. |
+ * Send or enqueue an error event. |
* |
- * Is created when necessary, or set in constructor for preconfigured events. |
+ * If a subscription has requested to be unsubscribed on errors, |
+ * it will be unsubscribed after receiving this event. |
*/ |
- _PendingEvents _pending; |
- |
- _BufferingStreamSubscription(this._onData, |
- this._onError, |
- this._onDone, |
- bool cancelOnError) |
- : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
- assert(_onData != null); |
- assert(_onError != null); |
- assert(_onDone != null); |
+ void _addError(error) { |
+ if (_isClosed) throw new StateError("Sending on closed stream"); |
+ if (!_mayFireState) { |
+ // Not the time to send events. |
+ _addPendingEvent(new _DelayedError(error)); |
+ return; |
+ } |
+ if (_hasPendingEvent) { |
+ _addPendingEvent(new _DelayedError(error)); |
+ } else { |
+ _sendError(error); |
+ } |
+ _handlePendingEvents(); |
} |
/** |
- * Sets the subscription's pending events object. |
+ * Send or enqueue a "done" message. |
* |
- * This can only be done once. The pending events object is used for the |
- * rest of the subscription's life cycle. |
+ * The "done" message should be sent at most once by a stream, and it |
+ * should be the last message sent. |
*/ |
- void _setPendingEvents(_PendingEvents pendingEvents) { |
- assert(_pending == null); |
- if (pendingEvents == null) return; |
- _pending = pendingEvents; |
- if (!pendingEvents.isEmpty) { |
- _state |= _STATE_HAS_PENDING; |
- _pending.schedule(this); |
+ void _close() { |
+ if (_isClosed) return; |
+ _state |= _STREAM_CLOSED; |
+ if (!_mayFireState) { |
+ // Not the time to send events. |
+ _addPendingEvent(const _DelayedDone()); |
+ return; |
+ } |
+ if (_hasPendingEvent) { |
+ _addPendingEvent(new _DelayedDone()); |
+ _handlePendingEvents(); |
+ } else { |
+ _sendDone(); |
+ assert(_isComplete); |
+ assert(!_hasPendingEvent); |
} |
} |
+ // ------------------------------------------------------------------- |
+ // Internal implementation. |
+ |
+ // State predicates. |
+ |
+ // Lifecycle state. |
+ /** Whether the stream is in the default, open, state for events. */ |
+ bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0; |
+ |
+ /** Whether the stream has been closed (a done event requested). */ |
+ bool get _isClosed => (_state & _STREAM_CLOSED) != 0; |
+ |
+ /** Whether the stream is completed. */ |
+ bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; |
+ |
+ // Pause state. |
+ |
+ /** Whether one or more active subscribers have requested a pause. */ |
+ bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); |
+ |
+ /** How many times the stream has been paused. */ |
+ int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT; |
+ |
/** |
- * Extracts the pending events from a canceled stream. |
+ * Whether a controller thinks the stream is paused. |
* |
- * This can only be done during the [_onCancel] method call. After that, |
- * any remaining pending events will be cleared. |
+ * When this changes, a pause-state change callback is performed. |
+ * |
+ * It may differ from [_isPaused] if there are pending events |
+ * in the queue when the listeners resume. The controller won't |
+ * be informed until all queued events have been fired. |
*/ |
- _PendingEvents _extractPending() { |
- assert(_isCanceled); |
- _PendingEvents events = _pending; |
- _pending = null; |
- return events; |
+ bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME); |
+ |
+ /** Whether we have a pending resume scheduled. */ |
+ bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0; |
+ |
+ |
+ // Action state. If the stream makes a call-out to external code, |
+ // this state tracks it and avoids reentrancy problems. |
+ |
+ /** Whether the stream is not currently firing or calling a callback. */ |
+ bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0; |
+ |
+ /** Whether we are currently executing a state-chance callback. */ |
+ bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0; |
+ |
+ /** Whether we are currently firing an event. */ |
+ bool get _isFiring => (_state & _STREAM_FIRING) != 0; |
+ |
+ /** Check whether the pending event queue is non-empty */ |
+ bool get _hasPendingEvent => |
+ _pendingEvents != null && !_pendingEvents.isEmpty; |
+ |
+ /** |
+ * The bit representing the current or last event fired. |
+ * |
+ * This bit matches a bit on listeners that have received the corresponding |
+ * event. It is toggled for each new event being fired. |
+ */ |
+ int get _currentEventIdBit => |
+ (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
+ |
+ /** Whether there is currently a subscriber on this [Stream]. */ |
+ bool get _hasListener; |
+ |
+ |
+ /** Whether the state bits allow firing. */ |
+ bool get _mayFireState { |
+ // The state allows firing unless: |
+ // - it's currently firing |
+ // - it's currently in a callback |
+ // - it's paused |
+ const int mask = |
+ _STREAM_FIRING | |
+ _STREAM_CALLBACK | |
+ ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1); |
+ return (_state & mask) == 0; |
} |
- // StreamSubscription interface. |
+ // State modification. |
- void onData(void handleData(T event)) { |
- if (handleData == null) handleData = _nullDataHandler; |
- _onData = handleData; |
+ /** Record an increases in the number of times the listener has paused. */ |
+ void _incrementPauseCount(_StreamListener<T> listener) { |
+ listener._incrementPauseCount(); |
+ _state &= ~_STREAM_PENDING_RESUME; |
+ _updatePauseCount(1); |
} |
- void onError(void handleError(error)) { |
- if (handleError == null) handleError = _nullErrorHandler; |
- _onError = handleError; |
+ /** Record a decrease in the number of times the listener has paused. */ |
+ void _decrementPauseCount(_StreamListener<T> listener) { |
+ assert(_isPaused); |
+ listener._decrementPauseCount(); |
+ _updatePauseCount(-1); |
} |
- void onDone(void handleDone()) { |
- if (handleDone == null) handleDone = _nullDoneHandler; |
- _onDone = handleDone; |
+ /** Update the stream's own pause count only. */ |
+ void _updatePauseCount(int by) { |
+ int oldState = _state; |
+ // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js |
+ // converts the result of the left-shift to a positive number. |
+ if (by >= 0) { |
+ _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT); |
+ } else { |
+ _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT); |
+ } |
+ assert(_state >= 0); |
+ assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) == |
+ (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by); |
} |
- void pause([Future resumeSignal]) { |
- if (_isCanceled) return; |
- bool wasPaused = _isPaused; |
- bool wasInputPaused = _isInputPaused; |
- // Increment pause count and mark input paused (if it isn't already). |
- _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
- if (resumeSignal != null) resumeSignal.whenComplete(resume); |
- if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
- if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); |
+ void _setClosed() { |
+ assert(!_isClosed); |
+ _state |= _STREAM_CLOSED; |
} |
- void resume() { |
- if (_isCanceled) return; |
- if (_isPaused) { |
- _decrementPauseCount(); |
- if (!_isPaused) { |
- if (_hasPending && !_pending.isEmpty) { |
- // Input is still paused. |
- _pending.schedule(this); |
- } else { |
- assert(_mayResumeInput); |
- _state &= ~_STATE_INPUT_PAUSED; |
- if (!_inCallback) _guardCallback(_onResume); |
- } |
+ void _setComplete() { |
+ assert(_isClosed); |
+ _state = _state |_STREAM_COMPLETE; |
+ } |
+ |
+ void _startFiring() { |
+ assert(!_isFiring); |
+ assert(!_isInCallback); |
+ assert(_hasListener); |
+ assert(!_isPaused); |
+ // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
+ // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
+ // that doesn't match _STREAM_EVENT_ID, and they will receive the |
+ // event being fired. |
+ _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
+ } |
+ |
+ void _endFiring(bool wasInputPaused) { |
+ assert(_isFiring); |
+ _state ^= _STREAM_FIRING; |
+ // Had listeners, or we wouldn't have fired. |
+ _checkCallbacks(true, wasInputPaused); |
+ } |
+ |
+ /** |
+ * Record that a listener wants a pause from events. |
+ * |
+ * This methods is called from [_StreamListener.pause()]. |
+ * Subclasses can override this method, along with [isPaused] and |
+ * [createSubscription], if they want to do a different handling of paused |
+ * subscriptions, e.g., a filtering stream pausing its own source if all its |
+ * subscribers are paused. |
+ */ |
+ void _pause(_StreamListener<T> listener, Future resumeSignal) { |
+ assert(identical(listener._source, this)); |
+ if (!listener._isSubscribed) { |
+ throw new StateError("Subscription has been canceled."); |
+ } |
+ assert(!_isComplete); // There can be no subscribers when complete. |
+ bool wasInputPaused = _isInputPaused; |
+ bool wasPaused = _isPaused; |
+ _incrementPauseCount(listener); |
+ if (resumeSignal != null) { |
+ resumeSignal.whenComplete(() { this._resume(listener, true); }); |
+ } |
+ if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) { |
+ _pendingEvents.cancelSchedule(); |
+ } |
+ if (_isInactive && !wasInputPaused) { |
+ _checkCallbacks(true, false); |
+ if (!_isPaused && _hasPendingEvent) { |
+ _schedulePendingEvents(); |
} |
} |
} |
- void cancel() { |
- if (_isCanceled) return; |
- _cancel(); |
- if (!_inCallback) { |
- // otherwise checkState will be called after firing or callback completes. |
- _state |= _STATE_IN_CALLBACK; |
- _onCancel(); |
- _pending = null; |
- _state &= ~_STATE_IN_CALLBACK; |
+ /** Stops pausing due to one request from the given listener. */ |
+ void _resume(_StreamListener<T> listener, bool fromEvent) { |
+ if (!listener.isPaused) return; |
+ assert(listener._isSubscribed); |
+ assert(_isPaused); |
+ _decrementPauseCount(listener); |
+ if (!_isPaused) { |
+ if (_hasPendingEvent) { |
+ _state |= _STREAM_PENDING_RESUME; |
+ // Controller's pause state hasn't changed. |
+ // If we can fire events now, fire any pending events right away. |
+ if (_isInactive) { |
+ if (fromEvent) { |
+ _handlePendingEvents(); |
+ } else { |
+ _schedulePendingEvents(); |
+ } |
+ } |
+ } else if (_isInactive) { |
+ _checkCallbacks(true, true); |
+ if (!_isPaused && _hasPendingEvent) { |
+ if (fromEvent) { |
+ _handlePendingEvents(); |
+ } else { |
+ _schedulePendingEvents(); |
+ } |
+ } |
+ } |
} |
} |
- Future asFuture([var futureValue]) { |
- _FutureImpl<T> result = new _FutureImpl<T>(); |
- |
- // Overwrite the onDone and onError handlers. |
- _onDone = () { result._setValue(futureValue); }; |
- _onError = (error) { |
- cancel(); |
- result._setError(error); |
- }; |
- |
- return result; |
+ /** Schedule pending events to be executed. */ |
+ void _schedulePendingEvents() { |
+ assert(_hasPendingEvent); |
+ _pendingEvents.schedule(this); |
} |
- // State management. |
+ /** Create a subscription object. Called by [subcribe]. */ |
+ _StreamSubscriptionImpl<T> _createSubscription( |
+ void onData(T data), |
+ void onError(error), |
+ void onDone(), |
+ bool cancelOnError); |
+ |
+ /** |
+ * Adds a listener to this stream. |
+ */ |
+ void _addListener(_StreamSubscriptionImpl subscription); |
- bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
- bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
- bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
- bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
- bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
- bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
- bool get _canFire => _state < _STATE_IN_CALLBACK; |
- bool get _mayResumeInput => |
- !_isPaused && (_pending == null || _pending.isEmpty); |
- bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
+ /** |
+ * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
+ * |
+ * This method is called from [_StreamSubscriptionImpl.cancel]. |
+ * |
+ * If an event is currently firing, the cancel is delayed |
+ * until after the subscribers have received the event. |
+ */ |
+ void _cancel(_StreamSubscriptionImpl subscriber); |
- bool get isPaused => _isPaused; |
+ /** |
+ * Iterate over all current subscribers and perform an action on each. |
+ * |
+ * Subscribers added during the iteration will not be visited. |
+ * Subscribers unsubscribed during the iteration will only be removed |
+ * after they have been acted on. |
+ * |
+ * Any change in the pause state is only reported after all subscribers have |
+ * received the event. |
+ * |
+ * The [action] must not throw, or the controller will be left in an |
+ * invalid state. |
+ * |
+ * This method must not be called while [isFiring] is true. |
+ */ |
+ void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
- void _cancel() { |
- _state |= _STATE_CANCELED; |
- if (_hasPending) { |
- _pending.cancelSchedule(); |
+ /** |
+ * Checks whether the subscription/pause state has changed. |
+ * |
+ * Calls the appropriate callback if the state has changed from the |
+ * provided one. Repeats calling callbacks as long as the call changes |
+ * the state. |
+ */ |
+ void _checkCallbacks(bool hadListener, bool wasPaused) { |
+ assert(!_isFiring); |
+ // Will be handled after the current callback. |
+ if (_isInCallback) return; |
+ if (_hasPendingResume && !_hasPendingEvent) { |
+ _state ^= _STREAM_PENDING_RESUME; |
+ } |
+ _state |= _STREAM_CALLBACK; |
+ while (true) { |
+ bool hasListener = _hasListener; |
+ bool isPaused = _isInputPaused; |
+ if (hadListener != hasListener) { |
+ _onSubscriptionStateChange(); |
+ } else if (isPaused != wasPaused) { |
+ _onPauseStateChange(); |
+ } else { |
+ _state ^= _STREAM_CALLBACK; |
+ return; |
+ } |
+ wasPaused = isPaused; |
+ hadListener = hasListener; |
} |
} |
/** |
- * Increment the pause count. |
+ * Called when the first subscriber requests a pause or the last a resume. |
* |
- * Also marks input as paused. |
+ * Read [isPaused] to see the new state. |
*/ |
- void _incrementPauseCount() { |
- _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
- } |
+ void _onPauseStateChange() {} |
/** |
- * Decrements the pause count. |
+ * Called when the first listener subscribes or the last unsubscribes. |
* |
- * Does not automatically unpause the input (call [_onResume]) when |
- * the pause count reaches zero. This is handled elsewhere, and only |
- * if there are no pending events buffered. |
+ * Read [hasListener] to see what the new state is. |
*/ |
- void _decrementPauseCount() { |
- assert(_isPaused); |
- _state -= _STATE_PAUSE_COUNT; |
- } |
+ void _onSubscriptionStateChange() {} |
- // _EventSink interface. |
- |
- void _add(T data) { |
- assert(!_isClosed); |
- if (_isCanceled) return; |
- if (_canFire) { |
- _sendData(data); |
- } else { |
- _addPending(new _DelayedData(data)); |
+ /** |
+ * Add a pending event at the end of the pending event queue. |
+ * |
+ * Schedules events if currently not paused and inside a callback. |
+ */ |
+ void _addPendingEvent(_DelayedEvent event) { |
+ if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
+ _StreamImplEvents events = _pendingEvents; |
+ events.add(event); |
+ if (_isPaused || _isFiring) return; |
+ if (_isInCallback) { |
+ _schedulePendingEvents(); |
+ return; |
} |
} |
- void _addError(Object error) { |
- if (_isCanceled) return; |
- if (_canFire) { |
- _sendError(error); // Reports cancel after sending. |
- } else { |
- _addPending(new _DelayedError(error)); |
- } |
+ /** Fire any pending events until the pending event queue is empty. */ |
+ void _handlePendingEvents() { |
+ assert(_isInactive); |
+ if (!_hasPendingEvent) return; |
+ _PendingEvents events = _pendingEvents; |
+ do { |
+ if (_isPaused) return; |
+ if (events.isScheduled) events.cancelSchedule(); |
+ events.handleNext(this); |
+ } while (!events.isEmpty); |
} |
- void _close() { |
- assert(!_isClosed); |
- if (_isCanceled) return; |
- _state |= _STATE_CLOSED; |
- if (_canFire) { |
- _sendDone(); |
- } else { |
- _addPending(const _DelayedDone()); |
- } |
+ /** |
+ * Send a data event directly to each subscriber. |
+ */ |
+ _sendData(T value) { |
+ assert(!_isPaused); |
+ assert(!_isComplete); |
+ if (!_hasListener) return; |
+ _forEachSubscriber((subscriber) { |
+ try { |
+ subscriber._sendData(value); |
+ } catch (e, s) { |
+ _throwDelayed(e, s); |
+ } |
+ }); |
} |
- // Hooks called when the input is paused, unpaused or canceled. |
- // These must not throw. If overwritten to call user code, include suitable |
- // try/catch wrapping and send any errors to [_throwDelayed]. |
- void _onPause() { |
- assert(_isInputPaused); |
+ /** |
+ * Sends an error event directly to each subscriber. |
+ */ |
+ void _sendError(error) { |
+ assert(!_isPaused); |
+ assert(!_isComplete); |
+ if (!_hasListener) return; |
+ _forEachSubscriber((subscriber) { |
+ try { |
+ subscriber._sendError(error); |
+ } catch (e, s) { |
+ _throwDelayed(e, s); |
+ } |
+ }); |
} |
- void _onResume() { |
- assert(!_isInputPaused); |
+ /** |
+ * Sends the "done" message directly to each subscriber. |
+ * This automatically stops further subscription and |
+ * unsubscribes all subscribers. |
+ */ |
+ void _sendDone() { |
+ assert(!_isPaused); |
+ assert(_isClosed); |
+ _setComplete(); |
+ if (!_hasListener) return; |
+ _forEachSubscriber((subscriber) { |
+ _cancel(subscriber); |
+ try { |
+ subscriber._sendDone(); |
+ } catch (e, s) { |
+ _throwDelayed(e, s); |
+ } |
+ }); |
+ assert(!_hasListener); |
} |
+} |
- void _onCancel() { |
- assert(_isCanceled); |
+// ------------------------------------------------------------------- |
+// Default implementation of a stream with a single subscriber. |
+// ------------------------------------------------------------------- |
+/** |
+ * Default implementation of stream capable of sending events to one subscriber. |
+ * |
+ * Any class needing to implement [Stream] can either directly extend this |
+ * class, or extend [Stream] and delegate the subscribe method to an instance |
+ * of this class. |
+ * |
+ * The only public methods are those of [Stream], so instances of |
+ * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
+ * internal functionality. |
+ * |
+ * The [StreamController] is a public facing version of this class, with |
+ * some methods made public. |
+ * |
+ * The user interface of [_SingleStreamImpl] are the following methods: |
+ * * [_add]: Add a data event to the stream. |
+ * * [_addError]: Add an error event to the stream. |
+ * * [_close]: Request to close the stream. |
+ * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
+ * when losing the last subscriber. |
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
+ * * [_hasListener]: Test whether there are currently any subscribers. |
+ * * [_isInputPaused]: Test whether the stream is currently paused. |
+ * The user should not add new events while the stream is paused, but if it |
+ * happens anyway, the stream will enqueue the events just as when new events |
+ * arrive while still firing an old event. |
+ */ |
+class _SingleStreamImpl<T> extends _StreamImpl<T> { |
+ _StreamListener _subscriber = null; |
+ |
+ /** Whether there is currently a subscriber on this [Stream]. */ |
+ bool get _hasListener => _subscriber != null; |
+ |
+ // ------------------------------------------------------------------- |
+ // Internal implementation. |
+ |
+ _SingleStreamImpl() { |
+ // Start out paused. |
+ _updatePauseCount(1); |
+ } |
+ |
+ /** |
+ * Create the new subscription object. |
+ */ |
+ _StreamSubscriptionImpl<T> _createSubscription( |
+ void onData(T data), |
+ void onError(error), |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _StreamSubscriptionImpl<T>( |
+ this, onData, onError, onDone, cancelOnError); |
} |
- // Handle pending events. |
+ void _addListener(_StreamListener subscription) { |
+ assert(!_isComplete); |
+ if (_hasListener) { |
+ throw new StateError("Stream already has subscriber."); |
+ } |
+ assert(_pauseCount == 1); |
+ _updatePauseCount(-1); |
+ _subscriber = subscription; |
+ subscription._setSubscribed(0); |
+ if (_isInactive) { |
+ _checkCallbacks(false, true); |
+ if (!_isPaused && _hasPendingEvent) { |
+ _schedulePendingEvents(); |
+ } |
+ } |
+ } |
/** |
- * Add a pending event. |
+ * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
* |
- * If the subscription is not paused, this also schedules a firing |
- * of pending events later (if necessary). |
+ * This method is called from [_StreamSubscriptionImpl.cancel]. |
*/ |
- void _addPending(_DelayedEvent event) { |
- _StreamImplEvents pending = _pending; |
- if (_pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(event); |
- if (!_hasPending) { |
- _state |= _STATE_HAS_PENDING; |
- if (!_isPaused) { |
- _pending.schedule(this); |
+ void _cancel(_StreamListener subscriber) { |
+ assert(identical(subscriber._source, this)); |
+ // We allow unsubscribing the currently firing subscription during |
+ // the event firing, because it is indistinguishable from delaying it since |
+ // that event has already received the event. |
+ if (!identical(_subscriber, subscriber)) { |
+ // You may unsubscribe more than once, only the first one counts. |
+ return; |
+ } |
+ _subscriber = null; |
+ // Unsubscribing a paused subscription also cancels its pauses. |
+ int resumeCount = subscriber._setUnsubscribed(); |
+ // Keep being paused while there is no subscriber and the stream is not |
+ // complete. |
+ _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1); |
+ if (_isInactive) { |
+ _checkCallbacks(true, resumeCount > 0); |
+ if (!_isPaused && _hasPendingEvent) { |
+ _schedulePendingEvents(); |
} |
} |
} |
- /* _EventDispatch interface. */ |
- |
- void _sendData(T data) { |
- assert(!_isCanceled); |
+ void _forEachSubscriber( |
+ void action(_StreamListener<T> subscription)) { |
assert(!_isPaused); |
- assert(!_inCallback); |
bool wasInputPaused = _isInputPaused; |
- _state |= _STATE_IN_CALLBACK; |
- try { |
- _onData(data); |
- } catch (e, s) { |
- _throwDelayed(e, s); |
- } |
- _state &= ~_STATE_IN_CALLBACK; |
- _checkState(wasInputPaused); |
+ _StreamListener subscription = _subscriber; |
+ assert(subscription != null); |
+ _startFiring(); |
+ action(subscription); |
+ _endFiring(wasInputPaused); |
} |
+} |
- void _sendError(var error) { |
- assert(!_isCanceled); |
- assert(!_isPaused); |
- assert(!_inCallback); |
- bool wasInputPaused = _isInputPaused; |
- _state |= _STATE_IN_CALLBACK; |
- try { |
- _onError(error); |
- } catch (e, s) { |
- _throwDelayed(e, s); |
- } |
- _state &= ~_STATE_IN_CALLBACK; |
- if (_cancelOnError) { |
- _cancel(); |
- } |
- _checkState(wasInputPaused); |
+// ------------------------------------------------------------------- |
+// Default implementation of a stream with subscribers. |
+// ------------------------------------------------------------------- |
+ |
+/** |
+ * Default implementation of stream capable of sending events to subscribers. |
+ * |
+ * Any class needing to implement [Stream] can either directly extend this |
+ * class, or extend [Stream] and delegate the subscribe method to an instance |
+ * of this class. |
+ * |
+ * The only public methods are those of [Stream], so instances of |
+ * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
+ * internal functionality. |
+ * |
+ * The [StreamController] is a public facing version of this class, with |
+ * some methods made public. |
+ * |
+ * The user interface of [_MultiStreamImpl] are the following methods: |
+ * * [_add]: Add a data event to the stream. |
+ * * [_addError]: Add an error event to the stream. |
+ * * [_close]: Request to close the stream. |
+ * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
+ * when losing the last subscriber. |
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
+ * * [_hasListener]: Test whether there are currently any subscribers. |
+ * * [_isPaused]: Test whether the stream is currently paused. |
+ * The user should not add new events while the stream is paused, but if it |
+ * happens anyway, the stream will enqueue the events just as when new events |
+ * arrive while still firing an old event. |
+ */ |
+class _MultiStreamImpl<T> extends _StreamImpl<T> |
+ implements _InternalLinkList { |
+ // Link list implementation (mixin when possible). |
+ _InternalLink _nextLink; |
+ _InternalLink _previousLink; |
+ |
+ _MultiStreamImpl() { |
+ _nextLink = _previousLink = this; |
} |
- void _sendDone() { |
- assert(!_isCanceled); |
- assert(!_isPaused); |
- assert(!_inCallback); |
- _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
- try { |
- _onDone(); |
- } catch (e, s) { |
- _throwDelayed(e, s); |
+ bool get isBroadcast => true; |
+ |
+ Stream<T> asBroadcastStream() => this; |
+ |
+ // ------------------------------------------------------------------ |
+ // Helper functions that can be overridden in subclasses. |
+ |
+ /** Whether there are currently any subscribers on this [Stream]. */ |
+ bool get _hasListener => !_InternalLinkList.isEmpty(this); |
+ |
+ /** |
+ * Create the new subscription object. |
+ */ |
+ _StreamListener<T> _createSubscription( |
+ void onData(T data), |
+ void onError(error), |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _StreamSubscriptionImpl<T>( |
+ this, onData, onError, onDone, cancelOnError); |
+ } |
+ |
+ // ------------------------------------------------------------------- |
+ // Internal implementation. |
+ |
+ /** |
+ * Iterate over all current subscribers and perform an action on each. |
+ * |
+ * The set of subscribers cannot be modified during this iteration. |
+ * All attempts to add or unsubscribe subscribers will be delayed until |
+ * after the iteration is complete. |
+ * |
+ * The [action] must not throw, or the controller will be left in an |
+ * invalid state. |
+ * |
+ * This method must not be called while [isFiring] is true. |
+ */ |
+ void _forEachSubscriber( |
+ void action(_StreamListener<T> subscription)) { |
+ assert(!_isFiring); |
+ if (!_hasListener) return; |
+ bool wasInputPaused = _isInputPaused; |
+ _startFiring(); |
+ _InternalLink cursor = this._nextLink; |
+ while (!identical(cursor, this)) { |
+ _StreamListener<T> current = cursor; |
+ if (current._needsEvent(_currentEventIdBit)) { |
+ action(current); |
+ // Marks as having received the event. |
+ current._toggleEventReceived(); |
+ } |
+ cursor = current._nextLink; |
+ if (current._isPendingUnsubscribe) { |
+ _removeListener(current); |
+ } |
} |
- _onCancel(); // No checkState after cancel, it is always the last event. |
- _state &= ~_STATE_IN_CALLBACK; |
+ _endFiring(wasInputPaused); |
} |
- /** |
- * Call a hook function. |
- * |
- * The call is properly wrapped in code to avoid other callbacks |
- * during the call, and it checks for state changes after the call |
- * that should cause further callbacks. |
- */ |
- void _guardCallback(callback) { |
- assert(!_inCallback); |
- bool wasInputPaused = _isInputPaused; |
- _state |= _STATE_IN_CALLBACK; |
- callback(); |
- _state &= ~_STATE_IN_CALLBACK; |
- _checkState(wasInputPaused); |
+ void _addListener(_StreamListener listener) { |
+ listener._setSubscribed(_currentEventIdBit); |
+ bool hadListener = _hasListener; |
+ _InternalLinkList.add(this, listener); |
+ if (!hadListener && _isInactive) { |
+ _checkCallbacks(false, false); |
+ if (!_isPaused && _hasPendingEvent) { |
+ _schedulePendingEvents(); |
+ } |
+ } |
} |
/** |
- * Check if the input needs to be informed of state changes. |
- * |
- * State changes are pausing, resuming and canceling. |
+ * Handle a cancel requested from a [_StreamListener]. |
* |
- * After canceling, no further callbacks will happen. |
+ * This method is called from [_StreamListener.cancel]. |
* |
- * The cancel callback is called after a user cancel, or after |
- * the final done event is sent. |
+ * If an event is currently firing, the cancel is delayed |
+ * until after the subscribers have received the event. |
*/ |
- void _checkState(bool wasInputPaused) { |
- assert(!_inCallback); |
- if (_hasPending && _pending.isEmpty) { |
- _state &= ~_STATE_HAS_PENDING; |
- if (_isInputPaused && _mayResumeInput) { |
- _state &= ~_STATE_INPUT_PAUSED; |
- } |
+ void _cancel(_StreamListener listener) { |
+ assert(identical(listener._source, this)); |
+ if (_InternalLink.isUnlinked(listener)) { |
+ // You may unsubscribe more than once, only the first one counts. |
+ return; |
} |
- // If the state changes during a callback, we immediately |
- // make a new state-change callback. Loop until the state didn't change. |
- while (true) { |
- if (_isCanceled) { |
- _onCancel(); |
- _pending = null; |
- return; |
- } |
- bool isInputPaused = _isInputPaused; |
- if (wasInputPaused == isInputPaused) break; |
- _state ^= _STATE_IN_CALLBACK; |
- if (isInputPaused) { |
- _onPause(); |
+ if (_isFiring) { |
+ if (listener._needsEvent(_currentEventIdBit)) { |
+ assert(listener._isSubscribed); |
+ listener._setPendingUnsubscribe(_currentEventIdBit); |
} else { |
- _onResume(); |
+ // The listener has been notified of the event (or don't need to, |
+ // if it's still pending subscription) so it's safe to remove it. |
+ _removeListener(listener); |
+ } |
+ // Pause and subscription state changes are reported when we end |
+ // firing. |
+ } else { |
+ bool wasInputPaused = _isInputPaused; |
+ _removeListener(listener); |
+ if (_isInactive) { |
+ _checkCallbacks(true, wasInputPaused); |
+ if (!_isPaused && _hasPendingEvent) { |
+ _schedulePendingEvents(); |
+ } |
} |
- _state &= ~_STATE_IN_CALLBACK; |
- wasInputPaused = isInputPaused; |
- } |
- if (_hasPending && !_isPaused) { |
- _pending.schedule(this); |
} |
} |
-} |
- |
-// ------------------------------------------------------------------- |
-// Common base class for single and multi-subscription streams. |
-// ------------------------------------------------------------------- |
-abstract class _StreamImpl<T> extends Stream<T> { |
- // ------------------------------------------------------------------ |
- // Stream interface. |
- |
- StreamSubscription<T> listen(void onData(T data), |
- { void onError(error), |
- void onDone(), |
- bool cancelOnError }) { |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
- cancelOnError = identical(true, cancelOnError); |
- StreamSubscription subscription = |
- _createSubscription(onData, onError, onDone, cancelOnError); |
- _onListen(subscription); |
- return subscription; |
- } |
- // ------------------------------------------------------------------- |
- /** Create a subscription object. Called by [subcribe]. */ |
- _BufferingStreamSubscription<T> _createSubscription( |
- void onData(T data), |
- void onError(error), |
- void onDone(), |
- bool cancelOnError) { |
- return new _BufferingStreamSubscription<T>( |
- onData, onError, onDone, cancelOnError); |
+ /** |
+ * Removes a listener from this stream and cancels its pauses. |
+ * |
+ * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
+ * or [_callOnPauseStateChange]. |
+ */ |
+ void _removeListener(_StreamListener listener) { |
+ int pauseCount = listener._setUnsubscribed(); |
+ _InternalLinkList.remove(listener); |
+ if (pauseCount > 0) { |
+ _updatePauseCount(-pauseCount); |
+ if (!_isPaused && _hasPendingEvent) { |
+ _state |= _STREAM_PENDING_RESUME; |
+ } |
+ } |
} |
- |
- /** Hook called when the subscription has been created. */ |
- void _onListen(StreamSubscription subscription) {} |
} |
-typedef _PendingEvents _EventGenerator(); |
/** Stream that generates its own events. */ |
-class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
- final _EventGenerator _pending; |
+class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
/** |
- * Initializes the stream to have only the events provided by a |
- * [_PendingEvents]. |
+ * Initializes the stream to have only the events provided by [events]. |
* |
- * A new [_PendingEvents] must be generated for each listen. |
+ * A [_PendingEvents] implementation provides events that are handled |
+ * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. |
*/ |
- _GeneratedStreamImpl(this._pending); |
- |
- StreamSubscription _createSubscription(void onData(T data), |
- void onError(Object error), |
- void onDone(), |
- bool cancelOnError) { |
- _BufferingStreamSubscription<T> subscription = |
- new _BufferingStreamSubscription( |
- onData, onError, onDone, cancelOnError); |
- subscription._setPendingEvents(_pending()); |
- return subscription; |
+ _GeneratedSingleStreamImpl(_PendingEvents events) { |
+ _pendingEvents = events; |
+ _setClosed(); // Closed for input since all events are already pending. |
+ } |
+ |
+ void _add(T value) { |
+ throw new UnsupportedError("Cannot inject events into generated stream"); |
+ } |
+ |
+ void _addError(value) { |
+ throw new UnsupportedError("Cannot inject events into generated stream"); |
+ } |
+ |
+ void _close() { |
+ throw new UnsupportedError("Cannot inject events into generated stream"); |
} |
} |
/** Pending events object that gets its events from an [Iterable]. */ |
class _IterablePendingEvents<T> extends _PendingEvents { |
- // The iterator providing data for data events. |
- // Set to null when iteration has completed. |
- Iterator<T> _iterator; |
+ final Iterator<T> _iterator; |
+ /** |
+ * Whether there are no more events to be sent. |
+ * |
+ * This starts out as [:false:] since there is always at least |
+ * a 'done' event to be sent. |
+ */ |
+ bool _isDone = false; |
_IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
- bool get isEmpty => _iterator == null; |
+ bool get isEmpty => _isDone; |
- void handleNext(_EventDispatch dispatch) { |
- if (_iterator == null) { |
- throw new StateError("No events pending."); |
- } |
- // Send one event per call to moveNext. |
- // If moveNext returns true, send the current element as data. |
- // If moveNext returns false, send a done event and clear the _iterator. |
- // If moveNext throws an error, send an error and clear the _iterator. |
- // After an error, no further events will be sent. |
- bool isDone; |
+ void handleNext(_StreamImpl<T> stream) { |
+ if (_isDone) throw new StateError("No events pending."); |
try { |
- isDone = !_iterator.moveNext(); |
+ _isDone = !_iterator.moveNext(); |
+ if (!_isDone) { |
+ stream._sendData(_iterator.current); |
+ } else { |
+ stream._sendDone(); |
+ } |
} catch (e, s) { |
- _iterator = null; |
- dispatch._sendError(_asyncError(e, s)); |
- return; |
- } |
- if (!isDone) { |
- dispatch._sendData(_iterator.current); |
- } else { |
- _iterator = null; |
- dispatch._sendDone(); |
+ stream._sendError(_asyncError(e, s)); |
+ stream._sendDone(); |
+ _isDone = true; |
} |
} |
+} |
+ |
+ |
+/** |
+ * The subscription class that the [StreamController] uses. |
+ * |
+ * The [_StreamImpl.createSubscription] method should |
+ * create an object of this type, or another subclass of [_StreamListener]. |
+ * A subclass of [_StreamImpl] can specify which subclass |
+ * of [_StreamSubscriptionImpl] it uses by overriding |
+ * [_StreamImpl.createSubscription]. |
+ * |
+ * The subscription is in one of three states: |
+ * * Subscribed. |
+ * * Paused-and-subscribed. |
+ * * Unsubscribed. |
+ * Unsubscribing also resumes any pauses started by the subscription. |
+ */ |
+class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
+ implements StreamSubscription<T> { |
+ final bool _cancelOnError; |
+ // TODO(ahe): Restore type when feature is implemented in dart2js |
+ // checked mode. http://dartbug.com/7733 |
+ var /* _DataHandler<T> */ _onData; |
+ _ErrorHandler _onError; |
+ _DoneHandler _onDone; |
+ _StreamSubscriptionImpl(_StreamImpl source, |
+ this._onData, |
+ this._onError, |
+ this._onDone, |
+ this._cancelOnError) : super(source); |
- void clear() { |
- if (isScheduled) cancelSchedule(); |
- _iterator = null; |
+ void onData(void handleData(T event)) { |
+ if (handleData == null) handleData = _nullDataHandler; |
+ _onData = handleData; |
} |
-} |
+ void onError(void handleError(error)) { |
+ if (handleError == null) handleError = _nullErrorHandler; |
+ _onError = handleError; |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ if (handleDone == null) handleDone = _nullDoneHandler; |
+ _onDone = handleDone; |
+ } |
+ |
+ void _sendData(T data) { |
+ _onData(data); |
+ } |
+ |
+ void _sendError(error) { |
+ _onError(error); |
+ if (_cancelOnError) _source._cancel(this); |
+ } |
+ |
+ void _sendDone() { |
+ _onDone(); |
+ } |
+ |
+ void cancel() { |
+ if (!_isSubscribed) return; |
+ _source._cancel(this); |
+ } |
+ |
+ void pause([Future resumeSignal]) { |
+ if (!_isSubscribed) return; |
+ _source._pause(this, resumeSignal); |
+ } |
+ |
+ void resume() { |
+ if (!_isSubscribed || !isPaused) return; |
+ _source._resume(this, false); |
+ } |
+ |
+ Future asFuture([var futureValue]) { |
+ _FutureImpl<T> result = new _FutureImpl<T>(); |
+ |
+ // Overwrite the onDone and onError handlers. |
+ onDone(() { result._setValue(futureValue); }); |
+ onError((error) { |
+ cancel(); |
+ result._setError(error); |
+ }); |
+ |
+ return result; |
+ } |
+} |
// Internal helpers. |
@@ -558,20 +998,20 @@ void _nullErrorHandler(error) { |
void _nullDoneHandler() {} |
-/** A delayed event on a buffering stream subscription. */ |
+/** A delayed event on a stream implementation. */ |
abstract class _DelayedEvent { |
/** Added as a linked list on the [StreamController]. */ |
_DelayedEvent next; |
/** Execute the delayed event on the [StreamController]. */ |
- void perform(_EventDispatch dispatch); |
+ void perform(_StreamImpl stream); |
} |
/** A delayed data event. */ |
class _DelayedData<T> extends _DelayedEvent{ |
final T value; |
_DelayedData(this.value); |
- void perform(_EventDispatch<T> dispatch) { |
- dispatch._sendData(value); |
+ void perform(_StreamImpl<T> stream) { |
+ stream._sendData(value); |
} |
} |
@@ -579,16 +1019,16 @@ class _DelayedData<T> extends _DelayedEvent{ |
class _DelayedError extends _DelayedEvent { |
final error; |
_DelayedError(this.error); |
- void perform(_EventDispatch dispatch) { |
- dispatch._sendError(error); |
+ void perform(_StreamImpl stream) { |
+ stream._sendError(error); |
} |
} |
/** A delayed done event. */ |
class _DelayedDone implements _DelayedEvent { |
const _DelayedDone(); |
- void perform(_EventDispatch dispatch) { |
- dispatch._sendDone(); |
+ void perform(_StreamImpl stream) { |
+ stream._sendDone(); |
} |
_DelayedEvent get next => null; |
@@ -678,66 +1118,118 @@ abstract class _InternalLinkList extends _InternalLink { |
} |
} |
-/** Superclass for provider of pending events. */ |
-abstract class _PendingEvents { |
- // No async event has been scheduled. |
- static const int _STATE_UNSCHEDULED = 0; |
- // An async event has been scheduled to run a function. |
- static const int _STATE_SCHEDULED = 1; |
- // An async event has been scheduled, but it will do nothing when it runs. |
- // Async events can't be preempted. |
- static const int _STATE_CANCELED = 3; |
+/** Abstract type for an internal interface for sending events. */ |
+abstract class _EventOutputSink<T> { |
+ _sendData(T data); |
+ _sendError(error); |
+ _sendDone(); |
+} |
+ |
+abstract class _StreamListener<T> extends _InternalLink |
+ implements _EventOutputSink<T> { |
+ final _StreamImpl _source; |
+ int _state = _LISTENER_UNSUBSCRIBED; |
+ |
+ _StreamListener(this._source); |
+ |
+ bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
+ |
+ bool get _isPendingUnsubscribe => |
+ (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
+ |
+ bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; |
/** |
- * State of being scheduled. |
+ * Whether the listener still needs to receive the currently firing event. |
* |
- * Set to [_STATE_SCHEDULED] when pending events are scheduled for |
- * async dispatch. Since we can't cancel a [runAsync] call, if schduling |
- * is "canceled", the _state is simply set to [_STATE_CANCELED] which will |
- * make the async code do nothing except resetting [_state]. |
+ * The currently firing event is identified by a single bit, which alternates |
+ * between events. The [_state] contains the previously sent event's bit in |
+ * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener |
+ * still need the current event. |
+ */ |
+ bool _needsEvent(int currentEventIdBit) { |
+ int lastEventIdBit = |
+ (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; |
+ return lastEventIdBit != currentEventIdBit; |
+ } |
+ |
+ /// If a subscriber's "firing bit" doesn't match the stream's firing bit, |
+ /// we are currently firing an event and the subscriber still need to receive |
+ /// the event. |
+ void _toggleEventReceived() { |
+ _state ^= _LISTENER_EVENT_ID; |
+ } |
+ |
+ void _setSubscribed(int eventIdBit) { |
+ assert(eventIdBit == 0 || eventIdBit == 1); |
+ _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); |
+ } |
+ |
+ void _setPendingUnsubscribe(int currentEventIdBit) { |
+ assert(_isSubscribed); |
+ // Sets the pending unsubscribe, and ensures that the listener |
+ // won't get the current event. |
+ _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID; |
+ _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT; |
+ assert(!_needsEvent(currentEventIdBit)); |
+ } |
+ |
+ /** |
+ * Marks the listener as unsubscibed. |
* |
- * If events are scheduled while the state is [_STATE_CANCELED], it is |
- * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync] |
- * is performed. |
+ * Returns the number of unresumed pauses for the listener. |
*/ |
- int _state = _STATE_UNSCHEDULED; |
+ int _setUnsubscribed() { |
+ assert(_isSubscribed); |
+ int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; |
+ _state = _LISTENER_UNSUBSCRIBED; |
+ return timesPaused; |
+ } |
- bool get isEmpty; |
+ void _incrementPauseCount() { |
+ _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
+ } |
+ |
+ void _decrementPauseCount() { |
+ assert(isPaused); |
+ _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
+ } |
- bool get isScheduled => _state == _STATE_SCHEDULED; |
- bool get _eventScheduled => _state >= _STATE_SCHEDULED; |
+ _sendData(T data); |
+ _sendError(error); |
+ _sendDone(); |
+} |
+/** Superclass for provider of pending events. */ |
+abstract class _PendingEvents { |
/** |
- * Schedule an event to run later. |
+ * Timer set when pending events are scheduled for execution. |
* |
- * If called more than once, it should be called with the same dispatch as |
- * argument each time. It may reuse an earlier argument in some cases. |
+ * When scheduling pending events for execution in a later cycle, the timer |
+ * is stored here. If pending events are executed earlier than that, e.g., |
+ * due to a second event in the current cycle, the timer is canceled again. |
*/ |
- void schedule(_EventDispatch dispatch) { |
+ Timer scheduleTimer = null; |
+ |
+ bool get isEmpty; |
+ |
+ bool get isScheduled => scheduleTimer != null; |
+ |
+ void schedule(_StreamImpl stream) { |
if (isScheduled) return; |
- assert(!isEmpty); |
- if (_eventScheduled) { |
- assert(_state == _STATE_CANCELED); |
- _state = _STATE_SCHEDULED; |
- return; |
- } |
- runAsync(() { |
- int oldState = _state; |
- _state = _STATE_UNSCHEDULED; |
- if (oldState == _STATE_CANCELED) return; |
- handleNext(dispatch); |
+ scheduleTimer = new Timer(Duration.ZERO, () { |
+ scheduleTimer = null; |
+ stream._handlePendingEvents(); |
}); |
- _state = _STATE_SCHEDULED; |
} |
void cancelSchedule() { |
- if (isScheduled) _state = _STATE_CANCELED; |
+ assert(isScheduled); |
+ scheduleTimer.cancel(); |
+ scheduleTimer = null; |
} |
- void handleNext(_EventDispatch dispatch); |
- |
- /** Throw away any pending events and cancel scheduled events. */ |
- void clear(); |
+ void handleNext(_StreamImpl stream); |
} |
@@ -750,6 +1242,8 @@ class _StreamImplEvents extends _PendingEvents { |
bool get isEmpty => lastPendingEvent == null; |
+ bool get isScheduled => scheduleTimer != null; |
+ |
void add(_DelayedEvent event) { |
if (lastPendingEvent == null) { |
firstPendingEvent = lastPendingEvent = event; |
@@ -758,465 +1252,122 @@ class _StreamImplEvents extends _PendingEvents { |
} |
} |
- void handleNext(_EventDispatch dispatch) { |
+ void handleNext(_StreamImpl stream) { |
assert(!isScheduled); |
_DelayedEvent event = firstPendingEvent; |
firstPendingEvent = event.next; |
if (firstPendingEvent == null) { |
lastPendingEvent = null; |
} |
- event.perform(dispatch); |
- } |
- |
- void clear() { |
- if (isScheduled) cancelSchedule(); |
- firstPendingEvent = lastPendingEvent = null; |
- } |
-} |
- |
-class _MultiplexerLinkedList { |
- _MultiplexerLinkedList _next; |
- _MultiplexerLinkedList _previous; |
- |
- void _unlink() { |
- _previous._next = _next; |
- _next._previous = _previous; |
- _next = _previous = this; |
- } |
- |
- void _insertBefore(_MultiplexerLinkedList newNext) { |
- _MultiplexerLinkedList newPrevious = newNext._previous; |
- newPrevious._next = this; |
- newNext._previous = _previous; |
- _previous._next = newNext; |
- _previous = newPrevious; |
- } |
-} |
- |
-/** |
- * A subscription used by [_SingleStreamMultiplexer]. |
- * |
- * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple |
- * listeners at a time. It is used to implement [Stream.asBroadcastStream]. |
- * |
- * It is itself listening to another stream for events, and it forwards all |
- * events to all of its simultanous listeners. |
- * |
- * The listeners are [_MultiplexerSubscription]s and are kept as a linked list. |
- */ |
-// TODO(lrn): Change "implements" to "with" when automatic mixin constructors |
-// are implemented. |
-class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> |
- implements _MultiplexerLinkedList { |
- static const int _STATE_NOT_LISTENING = 0; |
- // Bit that alternates between event firings. If bit matches the one currently |
- // firing, the subscription will not be notified. |
- static const int _STATE_EVENT_ID_BIT = 1; |
- // Whether the subscription is listening at all. This should be set while |
- // it is part of the linked list of listeners of a multiplexer stream. |
- static const int _STATE_LISTENING = 2; |
- // State bit set while firing an event. |
- static const int _STATE_IS_FIRING = 4; |
- // Bit set if a subscription is canceled while it's firing (the |
- // [_STATE_IS_FIRING] bit is set). |
- // If the subscription is canceled while firing, it is not removed from the |
- // linked list immediately (to avoid breaking iteration), but is instead |
- // removed after it is done firing. |
- static const int _STATE_REMOVE_AFTER_FIRING = 8; |
- |
- // Firing state. |
- int _multiplexState; |
- |
- _SingleStreamMultiplexer _source; |
- |
- _MultiplexerSubscription(this._source, |
- void onData(T data), |
- void onError(Object error), |
- void onDone(), |
- bool cancelOnError, |
- int nextEventId) |
- : _multiplexState = _STATE_LISTENING | nextEventId, |
- super(onData, onError, onDone, cancelOnError) { |
- _next = _previous = this; |
- } |
- |
- // Mixin workaround. |
- _MultiplexerLinkedList _next; |
- _MultiplexerLinkedList _previous; |
- |
- void _unlink() { |
- _previous._next = _next; |
- _next._previous = _previous; |
- _next = _previous = this; |
- } |
- |
- void _insertBefore(_MultiplexerLinkedList newNext) { |
- _MultiplexerLinkedList newPrevious = newNext._previous; |
- newPrevious._next = this; |
- newNext._previous = _previous; |
- _previous._next = newNext; |
- _previous = newPrevious; |
- } |
- // End mixin. |
- |
- bool get _isListening => _multiplexState >= _STATE_LISTENING; |
- bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; |
- bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; |
- int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; |
- |
- void _setRemoveAfterFiring() { |
- assert(_isFiring); |
- _multiplexState |= _STATE_REMOVE_AFTER_FIRING; |
- } |
- |
- void _startFiring() { |
- assert(!_isFiring); |
- _multiplexState |= _STATE_IS_FIRING; |
- } |
- |
- /// Marks listener as no longer firing, and toggles its event id. |
- void _endFiring() { |
- assert(_isFiring); |
- _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); |
- } |
- |
- void _setNotListening() { |
- assert(_isListening); |
- _multiplexState = _STATE_NOT_LISTENING; |
- } |
- |
- void _onCancel() { |
- assert(_isListening); |
- _source._removeListener(this); |
+ event.perform(stream); |
} |
} |
-/** |
- * A stream that sends events from another stream to multiple listeners. |
- * |
- * This is used to implement [Stream.asBroadcastStream]. |
- * |
- * This stream allows listening more than once. |
- * When the first listener is added, it starts listening on its source |
- * stream for events. All events from the source stream are sent to all |
- * active listeners. The listeners handle their own buffering. |
- * When the last listener cancels, the source stream subscription is also |
- * canceled, and no further listening is possible. |
- */ |
-// TODO(lrn): change "implements" to "with" when the VM supports it. |
-class _SingleStreamMultiplexer<T> extends Stream<T> |
- implements _MultiplexerLinkedList, |
- _EventDispatch<T> { |
- final Stream<T> _source; |
- StreamSubscription<T> _subscription; |
- // Alternates between zero and one for each event fired. |
- // Listeners are initialized with the next event's id, and will |
- // only be notified if they match the event being fired. |
- // That way listeners added during event firing will not receive |
- // the current event. |
- int _eventId = 0; |
- |
- bool _isFiring = false; |
- // Remember events added while firing. |
- _StreamImplEvents _pending; |
+class _DoneSubscription<T> implements StreamSubscription<T> { |
+ _DoneHandler _handler; |
+ Timer _timer; |
+ int _pauseCount = 0; |
- _SingleStreamMultiplexer(this._source) { |
- _next = _previous = this; |
+ _DoneSubscription(this._handler) { |
+ _delayDone(); |
} |
- bool get _hasPending => _pending != null && !_pending.isEmpty; |
- |
- // Should be mixin. |
- _MultiplexerLinkedList _next; |
- _MultiplexerLinkedList _previous; |
- |
- void _unlink() { |
- _previous._next = _next; |
- _next._previous = _previous; |
- _next = _previous = this; |
+ void _delayDone() { |
+ assert(_timer == null && _pauseCount == 0); |
+ _timer = new Timer(Duration.ZERO, () { |
+ if (_handler != null) _handler(); |
+ }); |
} |
- void _insertBefore(_MultiplexerLinkedList newNext) { |
- _MultiplexerLinkedList newPrevious = newNext._previous; |
- newPrevious._next = this; |
- newNext._previous = _previous; |
- _previous._next = newNext; |
- _previous = newPrevious; |
- } |
- // End of mixin. |
+ bool get _isComplete => _timer == null && _pauseCount == 0; |
- StreamSubscription<T> listen(void onData(T data), |
- { void onError(Object error), |
- void onDone(), |
- bool cancelOnError }) { |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
- cancelOnError = identical(true, cancelOnError); |
- _MultiplexerSubscription subscription = |
- new _MultiplexerSubscription(this, onData, onError, onDone, |
- cancelOnError, _eventId); |
- if (_subscription == null) { |
- _subscription = _source.listen(_add, onError: _addError, onDone: _close); |
- } |
- subscription._insertBefore(this); |
- return subscription; |
- } |
+ void onData(void handleAction(T value)) {} |
- /** Called by [_MultiplexerSubscription.remove]. */ |
- void _removeListener(_MultiplexerSubscription listener) { |
- if (listener._isFiring) { |
- listener._setRemoveAfterFiring(); |
- } else { |
- _unlinkListener(listener); |
- } |
- } |
+ void onError(void handleError(error)) {} |
- /** Remove a listener and close the subscription after the last one. */ |
- void _unlinkListener(_MultiplexerSubscription listener) { |
- listener._setNotListening(); |
- listener._unlink(); |
- if (identical(_next, this)) { |
- // Last listener removed. |
- _cancel(); |
- } |
+ void onDone(void handleDone()) { |
+ _handler = handleDone; |
} |
- void _cancel() { |
- StreamSubscription subscription = _subscription; |
- _subscription = null; |
- subscription.cancel(); |
- if (_pending != null) _pending.cancelSchedule(); |
- } |
- |
- void _forEachListener(void action(_MultiplexerSubscription listener)) { |
- int eventId = _eventId; |
- _eventId ^= 1; |
- _isFiring = true; |
- _MultiplexerLinkedList entry = _next; |
- // Call each listener in order. A listener can be removed during any |
- // other listener's event. During its own event it will only be marked |
- // as "to be removed", and it will be handled after the event is done. |
- while (!identical(entry, this)) { |
- _MultiplexerSubscription listener = entry; |
- if (listener._eventId == eventId) { |
- listener._startFiring(); |
- action(listener); |
- listener._endFiring(); // Also toggles the event id. |
- } |
- entry = listener._next; |
- if (listener._removeAfterFiring) { |
- _unlinkListener(listener); |
- } |
+ void pause([Future signal]) { |
+ if (_isComplete) return; |
+ if (_timer != null) { |
+ _timer.cancel(); |
+ _timer = null; |
} |
- _isFiring = false; |
+ _pauseCount++; |
+ if (signal != null) signal.whenComplete(resume); |
} |
- void _add(T data) { |
- if (_isFiring || _hasPending) { |
- _StreamImplEvents pending = _pending; |
- if (pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(new _DelayedData(data)); |
- } else { |
- _sendData(data); |
+ void resume() { |
+ if (_isComplete) return; |
+ if (_pauseCount == 0) return; |
+ _pauseCount--; |
+ if (_pauseCount == 0) { |
+ _delayDone(); |
} |
} |
- void _addError(Object error) { |
- if (_isFiring || _hasPending) { |
- _StreamImplEvents pending = _pending; |
- if (pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(new _DelayedError(error)); |
- } else { |
- _sendError(error); |
- } |
- } |
+ bool get isPaused => _pauseCount > 0; |
- void _close() { |
- if (_isFiring || _hasPending) { |
- _StreamImplEvents pending = _pending; |
- if (pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(const _DelayedDone()); |
- } else { |
- _sendDone(); |
+ void cancel() { |
+ if (_isComplete) return; |
+ if (_timer != null) { |
+ _timer.cancel(); |
+ _timer = null; |
} |
+ _pauseCount = 0; |
} |
- void _sendData(T data) { |
- _forEachListener((_MultiplexerSubscription listener) { |
- listener._add(data); |
- }); |
- if (_hasPending) { |
- _pending.schedule(this); |
- } |
- } |
+ Future asFuture([var futureValue]) { |
+ // TODO(floitsch): share more code. |
+ _FutureImpl<T> result = new _FutureImpl<T>(); |
- void _sendError(Object error) { |
- _forEachListener((_MultiplexerSubscription listener) { |
- listener._addError(error); |
+ // Overwrite the onDone and onError handlers. |
+ onDone(() { result._setValue(futureValue); }); |
+ onError((error) { |
+ cancel(); |
+ result._setError(error); |
}); |
- if (_hasPending) { |
- _pending.schedule(this); |
- } |
- } |
- void _sendDone() { |
- _forEachListener((_MultiplexerSubscription listener) { |
- listener._setRemoveAfterFiring(); |
- listener._close(); |
- }); |
+ return result; |
} |
} |
+class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
+ final Stream<T> _source; |
+ StreamSubscription<T> _subscription; |
-/** |
- * Simple implementation of [StreamIterator]. |
- */ |
-class _StreamIteratorImpl<T> implements StreamIterator<T> { |
- // Internal state of the stream iterator. |
- // At any time, it is in one of these states. |
- // The interpretation of the [_futureOrPrefecth] field depends on the state. |
- // In _STATE_MOVING, the _data field holds the most recently returned |
- // future. |
- // When in one of the _STATE_EXTRA_* states, the it may hold the |
- // next data/error object, and the subscription is paused. |
- |
- /// The simple state where [_data] holds the data to return, and [moveNext] |
- /// is allowed. The subscription is actively listening. |
- static const int _STATE_FOUND = 0; |
- /// State set after [moveNext] has returned false or an error, |
- /// or after calling [cancel]. The subscription is always canceled. |
- static const int _STATE_DONE = 1; |
- /// State set after calling [moveNext], but before its returned future has |
- /// completed. Calling [moveNext] again is not allowed in this state. |
- /// The subscription is actively listening. |
- static const int _STATE_MOVING = 2; |
- /// States set when another event occurs while in _STATE_FOUND. |
- /// This extra overflow event is cached until the next call to [moveNext], |
- /// which will complete as if it received the event normally. |
- /// The subscription is paused in these states, so we only ever get one |
- /// event too many. |
- static const int _STATE_EXTRA_DATA = 3; |
- static const int _STATE_EXTRA_ERROR = 4; |
- static const int _STATE_EXTRA_DONE = 5; |
- |
- /// Subscription being listened to. |
- StreamSubscription _subscription; |
- |
- /// The current element represented by the most recent call to moveNext. |
- /// |
- /// Is null between the time moveNext is called and its future completes. |
- T _current = null; |
- |
- /// The future returned by the most recent call to [moveNext]. |
- /// |
- /// Also used to store the next value/error in case the stream provides an |
- /// event before [moveNext] is called again. In that case, the stream will |
- /// be paused to prevent further events. |
- var _futureOrPrefetch = null; |
- |
- /// The current state. |
- int _state = _STATE_FOUND; |
- |
- _StreamIteratorImpl(final Stream<T> stream) { |
- _subscription = stream.listen(_onData, |
- onError: _onError, |
- onDone: _onDone, |
- cancelOnError: true); |
- } |
- |
- T get current => _current; |
- |
- Future<bool> moveNext() { |
- if (_state == _STATE_DONE) { |
- return new _FutureImpl<bool>.immediate(false); |
- } |
- if (_state == _STATE_MOVING) { |
- throw new StateError("Already waiting for next."); |
- } |
- if (_state == _STATE_FOUND) { |
- _state = _STATE_MOVING; |
- _futureOrPrefetch = new _FutureImpl<bool>(); |
- return _futureOrPrefetch; |
+ _SingleStreamMultiplexer(this._source); |
+ |
+ void _callOnPauseStateChange() { |
+ if (_isPaused) { |
+ if (_subscription != null) { |
+ _subscription.pause(); |
+ } |
} else { |
- assert(_state >= _STATE_EXTRA_DATA); |
- switch (_state) { |
- case _STATE_EXTRA_DATA: |
- _state = _STATE_FOUND; |
- _current = _futureOrPrefetch; |
- _futureOrPrefetch = null; |
- _subscription.resume(); |
- return new _FutureImpl<bool>.immediate(true); |
- case _STATE_EXTRA_ERROR: |
- Object prefetch = _futureOrPrefetch; |
- _clear(); |
- return new _FutureImpl<bool>.immediateError(prefetch); |
- case _STATE_EXTRA_DONE: |
- _clear(); |
- return new _FutureImpl<bool>.immediate(false); |
+ if (_subscription != null) { |
+ _subscription.resume(); |
} |
} |
} |
- /** Clears up the internal state when the iterator ends. */ |
- void _clear() { |
- _subscription = null; |
- _futureOrPrefetch = null; |
- _current = null; |
- _state = _STATE_DONE; |
- } |
- |
- void cancel() { |
- StreamSubscription subscription = _subscription; |
- if (_state == _STATE_MOVING) { |
- _FutureImpl<bool> hasNext = _futureOrPrefetch; |
- _clear(); |
- hasNext._setValue(false); |
+ /** |
+ * Subscribe or unsubscribe on [_source] depending on whether |
+ * [_stream] has subscribers. |
+ */ |
+ void _onSubscriptionStateChange() { |
+ if (_hasListener) { |
+ assert(_subscription == null); |
+ _subscription = _source.listen(this._add, |
+ onError: this._addError, |
+ onDone: this._close); |
} else { |
- _clear(); |
- } |
- subscription.cancel(); |
- } |
- |
- void _onData(T data) { |
- if (_state == _STATE_MOVING) { |
- _current = data; |
- _FutureImpl<bool> hasNext = _futureOrPrefetch; |
- _futureOrPrefetch = null; |
- _state = _STATE_FOUND; |
- hasNext._setValue(true); |
- return; |
- } |
- _subscription.pause(); |
- assert(_futureOrPrefetch == null); |
- _futureOrPrefetch = data; |
- _state = _STATE_EXTRA_DATA; |
- } |
- |
- void _onError(Object error) { |
- if (_state == _STATE_MOVING) { |
- _FutureImpl<bool> hasNext = _futureOrPrefetch; |
- // We have cancelOnError: true, so the subscription is canceled. |
- _clear(); |
- hasNext._setError(error); |
- return; |
- } |
- _subscription.pause(); |
- assert(_futureOrPrefetch == null); |
- _futureOrPrefetch = error; |
- _state = _STATE_EXTRA_ERROR; |
- } |
- |
- void _onDone() { |
- if (_state == _STATE_MOVING) { |
- _FutureImpl<bool> hasNext = _futureOrPrefetch; |
- _clear(); |
- hasNext._setValue(false); |
- return; |
+ // TODO(lrn): Check why this can happen. |
+ if (_subscription == null) return; |
+ _subscription.cancel(); |
+ _subscription = null; |
} |
- _subscription.pause(); |
- _futureOrPrefetch = null; |
- _state = _STATE_EXTRA_DONE; |
} |
} |