Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 8fa1848d14ac0261b2943dfc41c4ba314d457162..9f39368e0a73a3961bf81578f20a730d22c85d3e 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -4,57 +4,6 @@ |
| part of dart.async; |
| -// States shared by single/multi stream implementations. |
| - |
| -// Completion state of the stream. |
| -/// Initial and default state where the stream can receive and send events. |
| -const int _STREAM_OPEN = 0; |
| -/// The stream has received a request to complete, but hasn't done so yet. |
| -/// No further events can be added to the stream. |
| -const int _STREAM_CLOSED = 1; |
| -/// The stream has completed and will no longer receive or send events. |
| -/// Also counts as closed. The stream must not be paused when it's completed. |
| -/// Always used in conjunction with [_STREAM_CLOSED]. |
| -const int _STREAM_COMPLETE = 2; |
| - |
| -/// Bit that alternates between events, and listeners are updated to the |
| -/// current value when they are notified of the event. |
| -const int _STREAM_EVENT_ID = 4; |
| -const int _STREAM_EVENT_ID_SHIFT = 2; |
| - |
| -// The activity state of the stream: What is it currently doing. |
| -/// Bit set while firing and clear while not. |
| -const int _STREAM_FIRING = 8; |
| -/// Bit set while calling a pause-state or subscription-state change callback. |
| -const int _STREAM_CALLBACK = 16; |
| - |
| -// The pause state of the stream. |
| -/// Bit set when resuming with pending events. Cleared after all pending events |
| -/// have been transmitted. Means that the controller still considers the |
| -/// stream paused, even if the listener doesn't. |
| -const int _STREAM_PENDING_RESUME = 32; |
| -/// The count of times a stream has paused is stored in the |
| -/// state, shifted by this amount. |
| -const int _STREAM_PAUSE_COUNT_SHIFT = 6; |
| - |
| -// States for listeners. |
| - |
| -/// The listener is currently not subscribed to its source stream. |
| -const int _LISTENER_UNSUBSCRIBED = 0; |
| -/// The listener is actively subscribed to its source stream. |
| -const int _LISTENER_SUBSCRIBED = 1; |
| -/// The listener is subscribed until it has been notified of the current event. |
| -/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. |
| -const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
| - |
| -/// Bit that contains the last sent event's "id bit". |
| -const int _LISTENER_EVENT_ID = 4; |
| -const int _LISTENER_EVENT_ID_SHIFT = 2; |
| - |
| -/// The count of times a listener has paused is stored in the |
| -/// state, shifted by this amount. |
| -const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
| - |
| /** Throws the given error in the next cycle. */ |
| _throwDelayed(var error, [Object stackTrace]) { |
| // We are going to reach the top-level here, but there might be a global |
| @@ -69,915 +18,527 @@ _throwDelayed(var error, [Object stackTrace]) { |
| }); |
| } |
| +/** Abstract and private interface for a place to put events. */ |
| +abstract class _EventSink<T> { |
| + void _add(T data); |
| + void _addError(Object error); |
| + void _close(); |
| +} |
| -// ------------------------------------------------------------------- |
| -// Common base class for single and multi-subscription streams. |
| -// ------------------------------------------------------------------- |
| -abstract class _StreamImpl<T> extends Stream<T> { |
| - /** Current state of the stream. */ |
| - int _state = _STREAM_OPEN; |
| - |
| - /** |
| - * List of pending events. |
| - * |
| - * If events are added to the stream (using [_add], [_addError] or [_done]) |
| - * while the stream is paused, or while another event is firing, events will |
| - * stored here. |
| - * Also supports scheduling the events for later execution. |
| - */ |
| - _PendingEvents _pendingEvents; |
| - |
| - // ------------------------------------------------------------------ |
| - // Stream interface. |
| - |
| - StreamSubscription<T> listen(void onData(T data), |
| - { void onError(error), |
| - void onDone(), |
| - bool cancelOnError }) { |
| - if (_isComplete) { |
| - return new _DoneSubscription(onDone); |
| - } |
| - if (onData == null) onData = _nullDataHandler; |
| - if (onError == null) onError = _nullErrorHandler; |
| - if (onDone == null) onDone = _nullDoneHandler; |
| - cancelOnError = identical(true, cancelOnError); |
| - _StreamSubscriptionImpl subscription = |
| - _createSubscription(onData, onError, onDone, cancelOnError); |
| - _addListener(subscription); |
| - return subscription; |
| - } |
| - |
| - // ------------------------------------------------------------------ |
| - // EventSink interface-like methods for sending events into the stream. |
| - // It's the responsibility of the caller to ensure that the stream is not |
| - // paused when adding events. If the stream is paused, the events will be |
| - // queued, but it's better to not send events at all. |
| - |
| - /** |
| - * Send or queue a data event. |
| - */ |
| - void _add(T value) { |
| - if (_isClosed) throw new StateError("Sending on closed stream"); |
| - if (!_mayFireState) { |
| - // Not the time to send events. |
| - _addPendingEvent(new _DelayedData<T>(value)); |
| - return; |
| - } |
| - if (_hasPendingEvent) { |
| - _addPendingEvent(new _DelayedData<T>(value)); |
| - } else { |
| - _sendData(value); |
| - } |
| - _handlePendingEvents(); |
| - } |
| +/** |
| + * Abstract and private interface for a place to send events. |
| + * |
| + * Used by event buffering to finally dispatch the pending event, where |
| + * [_EventSink] is where the event first enters the stream subscription, |
| + * and may yet be buffered. |
| + */ |
| +abstract class _EventDispatch<T> { |
| + void _sendData(T data); |
| + void _sendError(Object error); |
| + void _sendDone(); |
| +} |
| +/** |
| + * Default implementation of stream subscription of buffering events. |
| + * |
| + * The only public methods are those of [StreamSubscription], so instances of |
| + * [_BufferingStreamSubscription] can be returned directly as a |
| + * [StreamSubscription] without exposing internal functionality. |
| + * |
| +* The [StreamController] is a public facing version of [Stream] and this class, |
| + * with some methods made public. |
| + * |
| + * The user interface of [_BufferingStreamSubscription] are the following |
| + * methods: |
| + * * [_add]: Add a data event to the stream. |
| + * * [_addError]: Add an error event to the stream. |
| + * * [_close]: Request to close the stream. |
| + * * [_onCancel]: Called when the subscription stops listening. |
| + * * [_onPause]: Called when the subscription wants the event source to pause. |
| + * * [_onResume]: Called when allowing new events after a pause. |
| + * The user should not add new eventswhen the subscription requests a paused, |
|
floitsch
2013/05/22 16:26:29
space missing.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| + * but if it happens anyway, the subscription will enqueue the events just as |
| + * when new events arrive while still firing an old event. |
| + */ |
| +class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| + _EventSink<T>, |
| + _EventDispatch<T> { |
| + /** The `cancelOnError` flag from the `listen` call. */ |
| + static const int _STATE_CANCEL_ON_ERROR = 1; |
| /** |
| - * Send or enqueue an error event. |
| - * |
| - * If a subscription has requested to be unsubscribed on errors, |
| - * it will be unsubscribed after receiving this event. |
| + * Whether the "done" event has been received. |
| + * No further events are accepted after this. |
| */ |
| - void _addError(error) { |
| - if (_isClosed) throw new StateError("Sending on closed stream"); |
| - if (!_mayFireState) { |
| - // Not the time to send events. |
| - _addPendingEvent(new _DelayedError(error)); |
| - return; |
| - } |
| - if (_hasPendingEvent) { |
| - _addPendingEvent(new _DelayedError(error)); |
| - } else { |
| - _sendError(error); |
| - } |
| - _handlePendingEvents(); |
| - } |
| - |
| + static const int _STATE_CLOSED = 2; |
| /** |
| - * Send or enqueue a "done" message. |
| + * Set if the input has been asked not to send events. |
| * |
| - * The "done" message should be sent at most once by a stream, and it |
| - * should be the last message sent. |
| + * This is not the same as being paused, since the input will remain paused |
| + * after a call to [resume] if there are pending events. |
| */ |
| - void _close() { |
| - if (_isClosed) return; |
| - _state |= _STREAM_CLOSED; |
| - if (!_mayFireState) { |
| - // Not the time to send events. |
| - _addPendingEvent(const _DelayedDone()); |
| - return; |
| - } |
| - if (_hasPendingEvent) { |
| - _addPendingEvent(new _DelayedDone()); |
| - _handlePendingEvents(); |
| - } else { |
| - _sendDone(); |
| - assert(_isComplete); |
| - assert(!_hasPendingEvent); |
| - } |
| - } |
| - |
| - // ------------------------------------------------------------------- |
| - // Internal implementation. |
| - |
| - // State predicates. |
| - |
| - // Lifecycle state. |
| - /** Whether the stream is in the default, open, state for events. */ |
| - bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0; |
| - |
| - /** Whether the stream has been closed (a done event requested). */ |
| - bool get _isClosed => (_state & _STREAM_CLOSED) != 0; |
| - |
| - /** Whether the stream is completed. */ |
| - bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; |
| - |
| - // Pause state. |
| - |
| - /** Whether one or more active subscribers have requested a pause. */ |
| - bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); |
| - |
| - /** How many times the stream has been paused. */ |
| - int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT; |
| - |
| + static const int _STATE_INPUT_PAUSED = 4; |
| /** |
| - * Whether a controller thinks the stream is paused. |
| - * |
| - * When this changes, a pause-state change callback is performed. |
| + * Whether the subscription has been cancelled. |
| * |
| - * It may differ from [_isPaused] if there are pending events |
| - * in the queue when the listeners resume. The controller won't |
| - * be informed until all queued events have been fired. |
| + * Set by calling [cancel], or by handling a "done" event, or an "error" event |
| + * when `cancelOnError` is true. |
| */ |
| - bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME); |
| - |
| - /** Whether we have a pending resume scheduled. */ |
| - bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0; |
| - |
| - |
| - // Action state. If the stream makes a call-out to external code, |
| - // this state tracks it and avoids reentrancy problems. |
| - |
| - /** Whether the stream is not currently firing or calling a callback. */ |
| - bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0; |
| - |
| - /** Whether we are currently executing a state-chance callback. */ |
| - bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0; |
| - |
| - /** Whether we are currently firing an event. */ |
| - bool get _isFiring => (_state & _STREAM_FIRING) != 0; |
| + static const int _STATE_CANCELLED = 8; |
|
floitsch
2013/05/22 16:26:29
_STATE_CANCELED
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Bowdlerizing constants! :(
|
| + static const int _STATE_IN_CALLBACK = 16; |
| + static const int _STATE_HAS_PENDING = 32; |
| + static const int _STATE_PAUSE_COUNT = 64; |
| + static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
| + |
| + /** Event handlers provided in constructor. */ |
| + _DataHandler<T> _onData; |
| + _ErrorHandler _onError; |
| + _DoneHandler _onDone; |
| - /** Check whether the pending event queue is non-empty */ |
| - bool get _hasPendingEvent => |
| - _pendingEvents != null && !_pendingEvents.isEmpty; |
| + /** Bit vector based on state-constants above. */ |
| + int _state; |
| /** |
| - * The bit representing the current or last event fired. |
| + * Queue of pending events. |
| * |
| - * This bit matches a bit on listeners that have received the corresponding |
| - * event. It is toggled for each new event being fired. |
| + * Is created when necessary, or set in constructor for preconfigured events. |
| */ |
| - int get _currentEventIdBit => |
| - (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
| - |
| - /** Whether there is currently a subscriber on this [Stream]. */ |
| - bool get _hasListener; |
| - |
| - |
| - /** Whether the state bits allow firing. */ |
| - bool get _mayFireState { |
| - // The state allows firing unless: |
| - // - it's currently firing |
| - // - it's currently in a callback |
| - // - it's paused |
| - const int mask = |
| - _STREAM_FIRING | |
| - _STREAM_CALLBACK | |
| - ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1); |
| - return (_state & mask) == 0; |
| - } |
| - |
| - // State modification. |
| - |
| - /** Record an increases in the number of times the listener has paused. */ |
| - void _incrementPauseCount(_StreamListener<T> listener) { |
| - listener._incrementPauseCount(); |
| - _state &= ~_STREAM_PENDING_RESUME; |
| - _updatePauseCount(1); |
| - } |
| - |
| - /** Record a decrease in the number of times the listener has paused. */ |
| - void _decrementPauseCount(_StreamListener<T> listener) { |
| - assert(_isPaused); |
| - listener._decrementPauseCount(); |
| - _updatePauseCount(-1); |
| - } |
| - |
| - /** Update the stream's own pause count only. */ |
| - void _updatePauseCount(int by) { |
| - int oldState = _state; |
| - // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js |
| - // converts the result of the left-shift to a positive number. |
| - if (by >= 0) { |
| - _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT); |
| - } else { |
| - _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT); |
| + _PendingEvents _pending; |
| + |
| + _BufferingStreamSubscription(this._onData, |
| + this._onError, |
| + this._onDone, |
| + bool cancelOnError, |
| + this._pending) |
| + : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| + assert(_onData != null); |
| + assert(_onError != null); |
| + assert(_onDone != null); |
| + if (_pending != null && !_pending.isEmpty) { |
| + _state |= _STATE_HAS_PENDING; |
| + _pending.schedule(this); |
| } |
| - assert(_state >= 0); |
| - assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) == |
| - (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by); |
| } |
| - void _setClosed() { |
| - assert(!_isClosed); |
| - _state |= _STREAM_CLOSED; |
| - } |
| + // StreamSubscription interface. |
| - void _setComplete() { |
| - assert(_isClosed); |
| - _state = _state |_STREAM_COMPLETE; |
| + void onData(void handleData(T event)) { |
| + if (handleData == null) handleData = _nullDataHandler; |
| + _onData = handleData; |
| } |
| - void _startFiring() { |
| - assert(!_isFiring); |
| - assert(!_isInCallback); |
| - assert(_hasListener); |
| - assert(!_isPaused); |
| - // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
| - // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
| - // that doesn't match _STREAM_EVENT_ID, and they will receive the |
| - // event being fired. |
| - _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
| + void onError(void handleError(error)) { |
| + if (handleError == null) handleError = _nullErrorHandler; |
| + _onError = handleError; |
| } |
| - void _endFiring(bool wasInputPaused) { |
| - assert(_isFiring); |
| - _state ^= _STREAM_FIRING; |
| - // Had listeners, or we wouldn't have fired. |
| - _checkCallbacks(true, wasInputPaused); |
| + void onDone(void handleDone()) { |
| + if (handleDone == null) handleDone = _nullDoneHandler; |
| + _onDone = handleDone; |
| } |
| - /** |
| - * Record that a listener wants a pause from events. |
| - * |
| - * This methods is called from [_StreamListener.pause()]. |
| - * Subclasses can override this method, along with [isPaused] and |
| - * [createSubscription], if they want to do a different handling of paused |
| - * subscriptions, e.g., a filtering stream pausing its own source if all its |
| - * subscribers are paused. |
| - */ |
| - void _pause(_StreamListener<T> listener, Future resumeSignal) { |
| - assert(identical(listener._source, this)); |
| - if (!listener._isSubscribed) { |
| - throw new StateError("Subscription has been canceled."); |
| - } |
| - assert(!_isComplete); // There can be no subscribers when complete. |
| - bool wasInputPaused = _isInputPaused; |
| + void pause([Future resumeSignal]) { |
| + if (_cancelled) return; |
| bool wasPaused = _isPaused; |
| - _incrementPauseCount(listener); |
| - if (resumeSignal != null) { |
| - resumeSignal.whenComplete(() { this._resume(listener, true); }); |
| - } |
| - if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) { |
| - _pendingEvents.cancelSchedule(); |
| - } |
| - if (_isInactive && !wasInputPaused) { |
| - _checkCallbacks(true, false); |
| - if (!_isPaused && _hasPendingEvent) { |
| - _schedulePendingEvents(); |
| - } |
| - } |
| + bool wasInputPaused = _isInputPaused; |
| + // Increment pause count and mark input paused (if it isn't already). |
| + _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| + if (resumeSignal != null) resumeSignal.whenComplete(resume); |
| + if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
| + if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); |
| } |
| - /** Stops pausing due to one request from the given listener. */ |
| - void _resume(_StreamListener<T> listener, bool fromEvent) { |
| - if (!listener.isPaused) return; |
| - assert(listener._isSubscribed); |
| - assert(_isPaused); |
| - _decrementPauseCount(listener); |
| - if (!_isPaused) { |
| - if (_hasPendingEvent) { |
| - _state |= _STREAM_PENDING_RESUME; |
| - // Controller's pause state hasn't changed. |
| - // If we can fire events now, fire any pending events right away. |
| - if (_isInactive) { |
| - if (fromEvent) { |
| - _handlePendingEvents(); |
| - } else { |
| - _schedulePendingEvents(); |
| - } |
| - } |
| - } else if (_isInactive) { |
| - _checkCallbacks(true, true); |
| - if (!_isPaused && _hasPendingEvent) { |
| - if (fromEvent) { |
| - _handlePendingEvents(); |
| - } else { |
| - _schedulePendingEvents(); |
| - } |
| + void resume() { |
| + if (_cancelled) return; |
| + if (_isPaused) { |
| + _decrementPauseCount(); |
| + if (!_isPaused) { |
| + if (_hasPending && !_pending.isEmpty) { |
| + // Input is still paused. |
| + _pending.schedule(this); |
| + } else { |
| + assert(_mayResumeInput); |
| + _state &= ~_STATE_INPUT_PAUSED; |
| + if (!_inCallback) _guardCallback(_onResume); |
| } |
| } |
| } |
| } |
| - /** Schedule pending events to be executed. */ |
| - void _schedulePendingEvents() { |
| - assert(_hasPendingEvent); |
| - _pendingEvents.schedule(this); |
| + void cancel() { |
| + if (_cancelled) return; |
| + _cancel(); |
| + if (!_inCallback) { |
| + // otherwise checkState will be called after firing or callback completes. |
| + _state |= _STATE_IN_CALLBACK; |
| + try { |
| + _onCancel(); |
| + } catch (e, s) { |
| + print("BAD THROW: \n$s"); |
|
floitsch
2013/05/22 16:26:29
So _onCancel is not allowed to throw? Should this
Lasse Reichstein Nielsen
2013/05/24 06:02:49
_onCancel is an internal method only. It should ne
|
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| + } |
| } |
| - /** Create a subscription object. Called by [subcribe]. */ |
| - _StreamSubscriptionImpl<T> _createSubscription( |
| - void onData(T data), |
| - void onError(error), |
| - void onDone(), |
| - bool cancelOnError); |
| + Future asFuture([var futureValue]) { |
| + _FutureImpl<T> result = new _FutureImpl<T>(); |
| - /** |
| - * Adds a listener to this stream. |
| - */ |
| - void _addListener(_StreamSubscriptionImpl subscription); |
| + // Overwrite the onDone and onError handlers. |
| + _onDone = () { result._setValue(futureValue); }; |
| + _onError = (error) { |
| + cancel(); |
| + result._setError(error); |
| + }; |
| - /** |
| - * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| - * |
| - * This method is called from [_StreamSubscriptionImpl.cancel]. |
| - * |
| - * If an event is currently firing, the cancel is delayed |
| - * until after the subscribers have received the event. |
| - */ |
| - void _cancel(_StreamSubscriptionImpl subscriber); |
| + return result; |
| + } |
| - /** |
| - * Iterate over all current subscribers and perform an action on each. |
| - * |
| - * Subscribers added during the iteration will not be visited. |
| - * Subscribers unsubscribed during the iteration will only be removed |
| - * after they have been acted on. |
| - * |
| - * Any change in the pause state is only reported after all subscribers have |
| - * received the event. |
| - * |
| - * The [action] must not throw, or the controller will be left in an |
| - * invalid state. |
| - * |
| - * This method must not be called while [isFiring] is true. |
| - */ |
| - void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
| + // State management. |
| - /** |
| - * Checks whether the subscription/pause state has changed. |
| - * |
| - * Calls the appropriate callback if the state has changed from the |
| - * provided one. Repeats calling callbacks as long as the call changes |
| - * the state. |
| - */ |
| - void _checkCallbacks(bool hadListener, bool wasPaused) { |
| - assert(!_isFiring); |
| - // Will be handled after the current callback. |
| - if (_isInCallback) return; |
| - if (_hasPendingResume && !_hasPendingEvent) { |
| - _state ^= _STREAM_PENDING_RESUME; |
| - } |
| - _state |= _STREAM_CALLBACK; |
| - while (true) { |
| - bool hasListener = _hasListener; |
| - bool isPaused = _isInputPaused; |
| - if (hadListener != hasListener) { |
| - _onSubscriptionStateChange(); |
| - } else if (isPaused != wasPaused) { |
| - _onPauseStateChange(); |
| - } else { |
| - _state ^= _STREAM_CALLBACK; |
| - return; |
| - } |
| - wasPaused = isPaused; |
| - hadListener = hasListener; |
| + bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| + bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| + bool get _cancelled => (_state & _STATE_CANCELLED) != 0; |
|
floitsch
2013/05/22 16:26:29
isCanceled
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| + bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
| + bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
| + bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
| + bool get _canFire => _state < _STATE_IN_CALLBACK; |
| + bool get _mayResumeInput => |
| + !_isPaused && (_pending == null || _pending.isEmpty); |
|
floitsch
2013/05/22 16:26:29
shouldn't _hasPending do the null-check too?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
_hasPending is accessing a cached bit in the state
|
| + bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
| + |
| + bool get isPaused => _isPaused; |
| + |
| + void _cancel() { |
| + _state |= _STATE_CANCELLED; |
| + if (_hasPending) { |
| + _pending.clear(); |
| } |
| } |
| /** |
| - * Called when the first subscriber requests a pause or the last a resume. |
| + * Increment the pause count. |
| * |
| - * Read [isPaused] to see the new state. |
| + * Also marks input as paused. |
| */ |
| - void _onPauseStateChange() {} |
| - |
| - /** |
| - * Called when the first listener subscribes or the last unsubscribes. |
| - * |
| - * Read [hasListener] to see what the new state is. |
| - */ |
| - void _onSubscriptionStateChange() {} |
| + void _incrementPauseCount() { |
| + _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| + } |
| /** |
| - * Add a pending event at the end of the pending event queue. |
| + * Decrements the pause count. |
| * |
| - * Schedules events if currently not paused and inside a callback. |
| + * Does not automatically unpause the input (call [_onResume]) when |
| + * the pause count reaches zero. This is handled elsewhere, and only |
| + * if there are no pending events buffered. |
| */ |
| - void _addPendingEvent(_DelayedEvent event) { |
| - if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
| - _StreamImplEvents events = _pendingEvents; |
| - events.add(event); |
| - if (_isPaused || _isFiring) return; |
| - if (_isInCallback) { |
| - _schedulePendingEvents(); |
| - return; |
| - } |
| + void _decrementPauseCount() { |
| + assert(_isPaused); |
| + _state -= _STATE_PAUSE_COUNT; |
| } |
| - /** Fire any pending events until the pending event queue is empty. */ |
| - void _handlePendingEvents() { |
| - assert(_isInactive); |
| - if (!_hasPendingEvent) return; |
| - _PendingEvents events = _pendingEvents; |
| - do { |
| - if (_isPaused) return; |
| - if (events.isScheduled) events.cancelSchedule(); |
| - events.handleNext(this); |
| - } while (!events.isEmpty); |
| - } |
| + // _EventSink interface. |
| - /** |
| - * Send a data event directly to each subscriber. |
| - */ |
| - _sendData(T value) { |
| - assert(!_isPaused); |
| - assert(!_isComplete); |
| - if (!_hasListener) return; |
| - _forEachSubscriber((subscriber) { |
| - try { |
| - subscriber._sendData(value); |
| - } catch (e, s) { |
| - _throwDelayed(e, s); |
| - } |
| - }); |
| + void _add(T data) { |
| + assert(!_isClosed); |
| + if (_cancelled) return; |
| + if (_canFire) { |
| + _sendData(data); |
| + } else { |
| + _addPending(new _DelayedData(data)); |
| + } |
| } |
| - /** |
| - * Sends an error event directly to each subscriber. |
| - */ |
| - void _sendError(error) { |
| - assert(!_isPaused); |
| - assert(!_isComplete); |
| - if (!_hasListener) return; |
| - _forEachSubscriber((subscriber) { |
| - try { |
| - subscriber._sendError(error); |
| - } catch (e, s) { |
| - _throwDelayed(e, s); |
| - } |
| - }); |
| + void _addError(Object error) { |
| + if (_cancelled) return; |
| + if (_cancelOnError) { |
| + /// TODO: handle here? |
| + } |
| + if (_canFire) { |
| + _sendError(error); // Reports cancel after sending. |
| + } else { |
| + _addPending(new _DelayedError(error)); |
| + } |
| } |
| - /** |
| - * Sends the "done" message directly to each subscriber. |
| - * This automatically stops further subscription and |
| - * unsubscribes all subscribers. |
| - */ |
| - void _sendDone() { |
| - assert(!_isPaused); |
| - assert(_isClosed); |
| - _setComplete(); |
| - if (!_hasListener) return; |
| - _forEachSubscriber((subscriber) { |
| - _cancel(subscriber); |
| - try { |
| - subscriber._sendDone(); |
| - } catch (e, s) { |
| - _throwDelayed(e, s); |
| - } |
| - }); |
| - assert(!_hasListener); |
| + void _close() { |
| + assert(!_isClosed); |
| + if (_cancelled) return; |
| + _state |= _STATE_CLOSED; |
| + if (_canFire) { |
| + _sendDone(); |
| + } else { |
| + _addPending(const _DelayedDone()); |
| + } |
| } |
| -} |
| - |
| -// ------------------------------------------------------------------- |
| -// Default implementation of a stream with a single subscriber. |
| -// ------------------------------------------------------------------- |
| -/** |
| - * Default implementation of stream capable of sending events to one subscriber. |
| - * |
| - * Any class needing to implement [Stream] can either directly extend this |
| - * class, or extend [Stream] and delegate the subscribe method to an instance |
| - * of this class. |
| - * |
| - * The only public methods are those of [Stream], so instances of |
| - * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
| - * internal functionality. |
| - * |
| - * The [StreamController] is a public facing version of this class, with |
| - * some methods made public. |
| - * |
| - * The user interface of [_SingleStreamImpl] are the following methods: |
| - * * [_add]: Add a data event to the stream. |
| - * * [_addError]: Add an error event to the stream. |
| - * * [_close]: Request to close the stream. |
| - * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
| - * when losing the last subscriber. |
| - * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| - * * [_hasListener]: Test whether there are currently any subscribers. |
| - * * [_isInputPaused]: Test whether the stream is currently paused. |
| - * The user should not add new events while the stream is paused, but if it |
| - * happens anyway, the stream will enqueue the events just as when new events |
| - * arrive while still firing an old event. |
| - */ |
| -class _SingleStreamImpl<T> extends _StreamImpl<T> { |
| - _StreamListener _subscriber = null; |
| - |
| - /** Whether there is currently a subscriber on this [Stream]. */ |
| - bool get _hasListener => _subscriber != null; |
| - |
| - // ------------------------------------------------------------------- |
| - // Internal implementation. |
| - _SingleStreamImpl() { |
| - // Start out paused. |
| - _updatePauseCount(1); |
| + // Hooks called when the input is paused, unpaused or cancelled. |
| + // These must not throw. If overwritten to call user code, include suitable |
| + // try/catch wrapping and send any errors to [_throwDelayed]. |
| + void _onPause() { |
| + assert(_isInputPaused); |
| } |
| - /** |
| - * Create the new subscription object. |
| - */ |
| - _StreamSubscriptionImpl<T> _createSubscription( |
| - void onData(T data), |
| - void onError(error), |
| - void onDone(), |
| - bool cancelOnError) { |
| - return new _StreamSubscriptionImpl<T>( |
| - this, onData, onError, onDone, cancelOnError); |
| + void _onResume() { |
| + assert(!_isInputPaused); |
| } |
| - void _addListener(_StreamListener subscription) { |
| - assert(!_isComplete); |
| - if (_hasListener) { |
| - throw new StateError("Stream already has subscriber."); |
| - } |
| - assert(_pauseCount == 1); |
| - _updatePauseCount(-1); |
| - _subscriber = subscription; |
| - subscription._setSubscribed(0); |
| - if (_isInactive) { |
| - _checkCallbacks(false, true); |
| - if (!_isPaused && _hasPendingEvent) { |
| - _schedulePendingEvents(); |
| - } |
| - } |
| + void _onCancel() { |
| + assert(_cancelled); |
| } |
| + // Handle pending events. |
| + |
| /** |
| - * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
| + * Add a pending event. |
| * |
| - * This method is called from [_StreamSubscriptionImpl.cancel]. |
| + * If the subscription is not paused, this also schedules a firing |
| + * of pending events later (if necessary). |
| */ |
| - void _cancel(_StreamListener subscriber) { |
| - assert(identical(subscriber._source, this)); |
| - // We allow unsubscribing the currently firing subscription during |
| - // the event firing, because it is indistinguishable from delaying it since |
| - // that event has already received the event. |
| - if (!identical(_subscriber, subscriber)) { |
| - // You may unsubscribe more than once, only the first one counts. |
| - return; |
| - } |
| - _subscriber = null; |
| - // Unsubscribing a paused subscription also cancels its pauses. |
| - int resumeCount = subscriber._setUnsubscribed(); |
| - // Keep being paused while there is no subscriber and the stream is not |
| - // complete. |
| - _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1); |
| - if (_isInactive) { |
| - _checkCallbacks(true, resumeCount > 0); |
| - if (!_isPaused && _hasPendingEvent) { |
| - _schedulePendingEvents(); |
| + void _addPending(_DelayedEvent event) { |
| + _StreamImplEvents pending = _pending; |
| + if (_pending == null) pending = _pending = new _StreamImplEvents(); |
| + pending.add(event); |
| + if (!_hasPending) { |
| + _state |= _STATE_HAS_PENDING; |
| + if (!_isPaused) { |
| + _pending.schedule(this); |
| } |
| } |
| } |
| - void _forEachSubscriber( |
| - void action(_StreamListener<T> subscription)) { |
| + /* _EventDispatch interface. */ |
| + |
| + void _sendData(T data) { |
| + assert(!_cancelled); |
| assert(!_isPaused); |
| + assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| - _StreamListener subscription = _subscriber; |
| - assert(subscription != null); |
| - _startFiring(); |
| - action(subscription); |
| - _endFiring(wasInputPaused); |
| + _state |= _STATE_IN_CALLBACK; |
| + try { |
|
floitsch
2013/05/22 16:26:29
move the `try`s into a separate function?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Rather not. It would only be used for _send{Data,E
|
| + _onData(data); |
| + } catch (e, s) { |
| + _throwDelayed(e, s); |
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| + _checkState(wasInputPaused); |
| } |
| -} |
| -// ------------------------------------------------------------------- |
| -// Default implementation of a stream with subscribers. |
| -// ------------------------------------------------------------------- |
| - |
| -/** |
| - * Default implementation of stream capable of sending events to subscribers. |
| - * |
| - * Any class needing to implement [Stream] can either directly extend this |
| - * class, or extend [Stream] and delegate the subscribe method to an instance |
| - * of this class. |
| - * |
| - * The only public methods are those of [Stream], so instances of |
| - * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
| - * internal functionality. |
| - * |
| - * The [StreamController] is a public facing version of this class, with |
| - * some methods made public. |
| - * |
| - * The user interface of [_MultiStreamImpl] are the following methods: |
| - * * [_add]: Add a data event to the stream. |
| - * * [_addError]: Add an error event to the stream. |
| - * * [_close]: Request to close the stream. |
| - * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
| - * when losing the last subscriber. |
| - * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
| - * * [_hasListener]: Test whether there are currently any subscribers. |
| - * * [_isPaused]: Test whether the stream is currently paused. |
| - * The user should not add new events while the stream is paused, but if it |
| - * happens anyway, the stream will enqueue the events just as when new events |
| - * arrive while still firing an old event. |
| - */ |
| -class _MultiStreamImpl<T> extends _StreamImpl<T> |
| - implements _InternalLinkList { |
| - // Link list implementation (mixin when possible). |
| - _InternalLink _nextLink; |
| - _InternalLink _previousLink; |
| - |
| - _MultiStreamImpl() { |
| - _nextLink = _previousLink = this; |
| + void _sendError(var error) { |
| + assert(!_cancelled); |
| + assert(!_isPaused); |
| + assert(!_inCallback); |
| + bool wasInputPaused = _isInputPaused; |
| + _state |= _STATE_IN_CALLBACK; |
| + try { |
| + _onError(error); |
| + } catch (e, s) { |
| + _throwDelayed(e, s); |
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| + if (_cancelOnError) { |
| + _cancel(); |
| + } |
| + _checkState(wasInputPaused); |
| } |
| - bool get isBroadcast => true; |
| - |
| - Stream<T> asBroadcastStream() => this; |
| - |
| - // ------------------------------------------------------------------ |
| - // Helper functions that can be overridden in subclasses. |
| - |
| - /** Whether there are currently any subscribers on this [Stream]. */ |
| - bool get _hasListener => !_InternalLinkList.isEmpty(this); |
| - |
| - /** |
| - * Create the new subscription object. |
| - */ |
| - _StreamListener<T> _createSubscription( |
| - void onData(T data), |
| - void onError(error), |
| - void onDone(), |
| - bool cancelOnError) { |
| - return new _StreamSubscriptionImpl<T>( |
| - this, onData, onError, onDone, cancelOnError); |
| + void _sendDone() { |
| + assert(!_cancelled); |
| + assert(!_isPaused); |
| + assert(!_inCallback); |
| + _state |= (_STATE_CANCELLED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| + try { |
| + _onDone(); |
| + } catch (e, s) { |
| + _throwDelayed(e, s); |
| + } |
| + try { |
| + _onCancel(); // No checkState after cancel, it is always the last event. |
| + } catch (e, s) { |
| + print("BAD THROW 5: \n$s"); |
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| } |
| - // ------------------------------------------------------------------- |
| - // Internal implementation. |
| - |
| - /** |
| - * Iterate over all current subscribers and perform an action on each. |
| - * |
| - * The set of subscribers cannot be modified during this iteration. |
| - * All attempts to add or unsubscribe subscribers will be delayed until |
| - * after the iteration is complete. |
| - * |
| - * The [action] must not throw, or the controller will be left in an |
| - * invalid state. |
| + /** |
| + * Call a hook function. |
| * |
| - * This method must not be called while [isFiring] is true. |
| + * The call is properly wrapped in code to avoid other callbacks |
| + * during the call, and it checks for state changes after the call |
| + * that should cause further callbacks. |
| */ |
| - void _forEachSubscriber( |
| - void action(_StreamListener<T> subscription)) { |
| - assert(!_isFiring); |
| - if (!_hasListener) return; |
| + void _guardCallback(callback) { |
| + assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| - _startFiring(); |
| - _InternalLink cursor = this._nextLink; |
| - while (!identical(cursor, this)) { |
| - _StreamListener<T> current = cursor; |
| - if (current._needsEvent(_currentEventIdBit)) { |
| - action(current); |
| - // Marks as having received the event. |
| - current._toggleEventReceived(); |
| - } |
| - cursor = current._nextLink; |
| - if (current._isPendingUnsubscribe) { |
| - _removeListener(current); |
| - } |
| - } |
| - _endFiring(wasInputPaused); |
| + _state |= _STATE_IN_CALLBACK; |
| + try { |
| + callback(); |
| + } catch (e, s) { |
| + print("BAD THROW 2: \n$s"); |
| } |
| - |
| - void _addListener(_StreamListener listener) { |
| - listener._setSubscribed(_currentEventIdBit); |
| - bool hadListener = _hasListener; |
| - _InternalLinkList.add(this, listener); |
| - if (!hadListener && _isInactive) { |
| - _checkCallbacks(false, false); |
| - if (!_isPaused && _hasPendingEvent) { |
| - _schedulePendingEvents(); |
| - } |
| - } |
| + _state &= ~_STATE_IN_CALLBACK; |
| + _checkState(wasInputPaused); |
| } |
| /** |
| - * Handle a cancel requested from a [_StreamListener]. |
| + * Check if the input needs to be informed of state changes. |
| * |
| - * This method is called from [_StreamListener.cancel]. |
| - * |
| - * If an event is currently firing, the cancel is delayed |
| - * until after the subscribers have received the event. |
| + * State changes are pausing, resuming and cancelling. |
| + * After cancelling, no further callbacks will happen. |
|
floitsch
2013/05/22 16:26:29
separate with more new lines.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| + * The cancel callback is called after a user cancel, or after |
| + * the final done event is sent. |
| */ |
| - void _cancel(_StreamListener listener) { |
| - assert(identical(listener._source, this)); |
| - if (_InternalLink.isUnlinked(listener)) { |
| - // You may unsubscribe more than once, only the first one counts. |
| - return; |
| + void _checkState(bool wasInputPaused) { |
| + assert(!_inCallback); |
| + if (_hasPending && _pending.isEmpty) { |
| + _state &= ~_STATE_HAS_PENDING; |
| + if (_isInputPaused && _mayResumeInput) { |
| + _state &= ~_STATE_INPUT_PAUSED; |
| + } |
| } |
| - if (_isFiring) { |
| - if (listener._needsEvent(_currentEventIdBit)) { |
| - assert(listener._isSubscribed); |
| - listener._setPendingUnsubscribe(_currentEventIdBit); |
| - } else { |
| - // The listener has been notified of the event (or don't need to, |
| - // if it's still pending subscription) so it's safe to remove it. |
| - _removeListener(listener); |
| + while (true) { |
|
floitsch
2013/05/22 16:26:29
Add comment why we need a loop.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Ok.
|
| + if (_cancelled) { |
| + try { |
| + _onCancel(); |
| + } catch (e, s) { |
| + print("BAD THROW 3: \n$s"); |
| + } |
| + return; |
| } |
| - // Pause and subscription state changes are reported when we end |
| - // firing. |
| - } else { |
| - bool wasInputPaused = _isInputPaused; |
| - _removeListener(listener); |
| - if (_isInactive) { |
| - _checkCallbacks(true, wasInputPaused); |
| - if (!_isPaused && _hasPendingEvent) { |
| - _schedulePendingEvents(); |
| - } |
| + bool isInputPaused = _isInputPaused; |
| + if (wasInputPaused == isInputPaused) break; |
| + _state ^= _STATE_IN_CALLBACK; |
| + try { |
| + if (isInputPaused) { |
| + _onPause(); |
| + } else { |
| + _onResume(); |
| } |
| + } catch (e, s) { |
| + print("BAD THROW 4: \n$s"); |
| + } |
| + _state &= ~_STATE_IN_CALLBACK; |
| + wasInputPaused = isInputPaused; |
| + } |
| + if (_hasPending && !_isPaused) { |
| + _pending.schedule(this); |
| } |
| } |
| +} |
| - /** |
| - * Removes a listener from this stream and cancels its pauses. |
| - * |
| - * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
| - * or [_callOnPauseStateChange]. |
| - */ |
| - void _removeListener(_StreamListener listener) { |
| - int pauseCount = listener._setUnsubscribed(); |
| - _InternalLinkList.remove(listener); |
| - if (pauseCount > 0) { |
| - _updatePauseCount(-pauseCount); |
| - if (!_isPaused && _hasPendingEvent) { |
| - _state |= _STREAM_PENDING_RESUME; |
| - } |
| - } |
| +// ------------------------------------------------------------------- |
| +// Common base class for single and multi-subscription streams. |
| +// ------------------------------------------------------------------- |
| +abstract class _StreamImpl<T> extends Stream<T> { |
| + // ------------------------------------------------------------------ |
| + // Stream interface. |
| + |
| + StreamSubscription<T> listen(void onData(T data), |
| + { void onError(error), |
| + void onDone(), |
| + bool cancelOnError }) { |
| + if (onData == null) onData = _nullDataHandler; |
| + if (onError == null) onError = _nullErrorHandler; |
| + if (onDone == null) onDone = _nullDoneHandler; |
| + cancelOnError = identical(true, cancelOnError); |
| + StreamSubscription subscription = |
| + _createSubscription(onData, onError, onDone, cancelOnError); |
| + _onListen(subscription); |
| + return subscription; |
| + } |
| + |
| + // ------------------------------------------------------------------- |
| + /** Create a subscription object. Called by [subcribe]. */ |
| + _BufferingStreamSubscription<T> _createSubscription( |
| + void onData(T data), |
| + void onError(error), |
| + void onDone(), |
| + bool cancelOnError) { |
| + return new _BufferingStreamSubscription<T>( |
| + onData, onError, onDone, cancelOnError, null); |
| } |
| + |
| + /** Hook called when the subscription has been created. */ |
| + void _onListen(StreamSubscription subscription) {} |
| } |
| +typedef _PendingEvents _EventGenerator(); |
| /** Stream that generates its own events. */ |
| -class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
| +class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
| + final _EventGenerator _pending; |
| /** |
| - * Initializes the stream to have only the events provided by [events]. |
| + * Initializes the stream to have only the events provided by a |
| + * [_PendingEvents]. |
| * |
| - * A [_PendingEvents] implementation provides events that are handled |
| - * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. |
| + * A new [_PendingEvents] must be generated for each listen. |
| */ |
| - _GeneratedSingleStreamImpl(_PendingEvents events) { |
| - _pendingEvents = events; |
| - _setClosed(); // Closed for input since all events are already pending. |
| - } |
| - |
| - void _add(T value) { |
| - throw new UnsupportedError("Cannot inject events into generated stream"); |
| - } |
| - |
| - void _addError(value) { |
| - throw new UnsupportedError("Cannot inject events into generated stream"); |
| - } |
| + _GeneratedStreamImpl(this._pending); |
| - void _close() { |
| - throw new UnsupportedError("Cannot inject events into generated stream"); |
| + StreamSubscription _createSubscription(void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError) { |
| + return new _BufferingStreamSubscription( |
| + onData, onError, onDone, cancelOnError, _pending()); |
| } |
| } |
| /** Pending events object that gets its events from an [Iterable]. */ |
| class _IterablePendingEvents<T> extends _PendingEvents { |
| + // The stream has been cancelled by an error, but hasn't sent a final |
| + // "Done" event yet. |
| + static const int _STATE_CANCELLED = 1; |
| + // The stream is completely done. |
| + static const int _STATE_CLOSED = 3; |
| + |
| final Iterator<T> _iterator; |
| + int _iterationState = 0; |
| + |
| + _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
| + |
| /** |
| * Whether there are no more events to be sent. |
| * |
| * This starts out as [:false:] since there is always at least |
| * a 'done' event to be sent. |
| */ |
| - bool _isDone = false; |
| - |
| - _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
| + bool get _isDone => _iterationState == _STATE_CLOSED; |
| + bool get _isCancelled => _iterationState == _STATE_CANCELLED; |
| bool get isEmpty => _isDone; |
| - void handleNext(_StreamImpl<T> stream) { |
| - if (_isDone) throw new StateError("No events pending."); |
| + void handleNext(_EventDispatch dispatch) { |
| + if (_isCancelled) { |
| + _iterationState = _STATE_CLOSED; |
| + dispatch._sendDone(); |
| + return; |
| + } |
| + if (_isDone) { |
| + throw new StateError("No events pending."); |
| + } |
| + bool isDone; |
| try { |
| - _isDone = !_iterator.moveNext(); |
| - if (!_isDone) { |
| - stream._sendData(_iterator.current); |
| - } else { |
| - stream._sendDone(); |
| - } |
| + isDone = !_iterator.moveNext(); |
| } catch (e, s) { |
| - stream._sendError(_asyncError(e, s)); |
| - stream._sendDone(); |
| - _isDone = true; |
| + _iterationState = _STATE_CANCELLED; // Will send a single done after this. |
|
floitsch
2013/05/22 16:26:29
long line.
I don't think it makes sense to send a
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Good point. Some listeners might be left hanging,
|
| + dispatch._sendError(_asyncError(e, s)); |
| + return; |
| + } |
| + if (!isDone) { |
| + dispatch._sendData(_iterator.current); |
| + } else { |
| + _iterationState = _STATE_CLOSED; |
| + dispatch._sendDone(); |
| } |
| } |
| -} |
| - |
| - |
| -/** |
| - * The subscription class that the [StreamController] uses. |
| - * |
| - * The [_StreamImpl.createSubscription] method should |
| - * create an object of this type, or another subclass of [_StreamListener]. |
| - * A subclass of [_StreamImpl] can specify which subclass |
| - * of [_StreamSubscriptionImpl] it uses by overriding |
| - * [_StreamImpl.createSubscription]. |
| - * |
| - * The subscription is in one of three states: |
| - * * Subscribed. |
| - * * Paused-and-subscribed. |
| - * * Unsubscribed. |
| - * Unsubscribing also resumes any pauses started by the subscription. |
| - */ |
| -class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
| - implements StreamSubscription<T> { |
| - final bool _cancelOnError; |
| - // TODO(ahe): Restore type when feature is implemented in dart2js |
| - // checked mode. http://dartbug.com/7733 |
| - var /* _DataHandler<T> */ _onData; |
| - _ErrorHandler _onError; |
| - _DoneHandler _onDone; |
| - _StreamSubscriptionImpl(_StreamImpl source, |
| - this._onData, |
| - this._onError, |
| - this._onDone, |
| - this._cancelOnError) : super(source); |
| - |
| - void onData(void handleData(T event)) { |
| - if (handleData == null) handleData = _nullDataHandler; |
| - _onData = handleData; |
| - } |
| - |
| - void onError(void handleError(error)) { |
| - if (handleError == null) handleError = _nullErrorHandler; |
| - _onError = handleError; |
| - } |
| - |
| - void onDone(void handleDone()) { |
| - if (handleDone == null) handleDone = _nullDoneHandler; |
| - _onDone = handleDone; |
| - } |
| - |
| - void _sendData(T data) { |
| - _onData(data); |
| - } |
| - |
| - void _sendError(error) { |
| - _onError(error); |
| - if (_cancelOnError) _source._cancel(this); |
| - } |
| - |
| - void _sendDone() { |
| - _onDone(); |
| - } |
| - |
| - void cancel() { |
| - if (!_isSubscribed) return; |
| - _source._cancel(this); |
| - } |
| - |
| - void pause([Future resumeSignal]) { |
| - if (!_isSubscribed) return; |
| - _source._pause(this, resumeSignal); |
| - } |
| - |
| - void resume() { |
| - if (!_isSubscribed || !isPaused) return; |
| - _source._resume(this, false); |
| - } |
| - |
| - Future asFuture([var futureValue]) { |
| - _FutureImpl<T> result = new _FutureImpl<T>(); |
| - // Overwrite the onDone and onError handlers. |
| - onDone(() { result._setValue(futureValue); }); |
| - onError((error) { |
| - cancel(); |
| - result._setError(error); |
| - }); |
| - |
| - return result; |
| + void clear() { |
| + if (isScheduled) cancelSchedule(); |
| + _state = _STATE_CLOSED; |
| } |
| } |
| + |
| // Internal helpers. |
| // Types of the different handlers on a stream. Types used to type fields. |
| @@ -998,20 +559,20 @@ void _nullErrorHandler(error) { |
| void _nullDoneHandler() {} |
| -/** A delayed event on a stream implementation. */ |
| +/** A delayed event on a buffering stream subscription. */ |
| abstract class _DelayedEvent { |
| /** Added as a linked list on the [StreamController]. */ |
| _DelayedEvent next; |
| /** Execute the delayed event on the [StreamController]. */ |
| - void perform(_StreamImpl stream); |
| + void perform(_EventDispatch dispatch); |
| } |
| /** A delayed data event. */ |
| class _DelayedData<T> extends _DelayedEvent{ |
| final T value; |
| _DelayedData(this.value); |
| - void perform(_StreamImpl<T> stream) { |
| - stream._sendData(value); |
| + void perform(_EventDispatch<T> dispatch) { |
| + dispatch._sendData(value); |
| } |
| } |
| @@ -1019,16 +580,16 @@ class _DelayedData<T> extends _DelayedEvent{ |
| class _DelayedError extends _DelayedEvent { |
| final error; |
| _DelayedError(this.error); |
| - void perform(_StreamImpl stream) { |
| - stream._sendError(error); |
| + void perform(_EventDispatch dispatch) { |
| + dispatch._sendError(error); |
| } |
| } |
| /** A delayed done event. */ |
| class _DelayedDone implements _DelayedEvent { |
| const _DelayedDone(); |
| - void perform(_StreamImpl stream) { |
| - stream._sendDone(); |
| + void perform(_EventDispatch dispatch) { |
| + dispatch._sendDone(); |
| } |
| _DelayedEvent get next => null; |
| @@ -1118,90 +679,16 @@ abstract class _InternalLinkList extends _InternalLink { |
| } |
| } |
| -/** Abstract type for an internal interface for sending events. */ |
| -abstract class _EventOutputSink<T> { |
| - _sendData(T data); |
| - _sendError(error); |
| - _sendDone(); |
| -} |
| - |
| -abstract class _StreamListener<T> extends _InternalLink |
| - implements _EventOutputSink<T> { |
| - final _StreamImpl _source; |
| - int _state = _LISTENER_UNSUBSCRIBED; |
| - |
| - _StreamListener(this._source); |
| - |
| - bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
| - |
| - bool get _isPendingUnsubscribe => |
| - (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
| - |
| - bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; |
| - |
| - /** |
| - * Whether the listener still needs to receive the currently firing event. |
| - * |
| - * The currently firing event is identified by a single bit, which alternates |
| - * between events. The [_state] contains the previously sent event's bit in |
| - * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener |
| - * still need the current event. |
| - */ |
| - bool _needsEvent(int currentEventIdBit) { |
| - int lastEventIdBit = |
| - (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; |
| - return lastEventIdBit != currentEventIdBit; |
| - } |
| - |
| - /// If a subscriber's "firing bit" doesn't match the stream's firing bit, |
| - /// we are currently firing an event and the subscriber still need to receive |
| - /// the event. |
| - void _toggleEventReceived() { |
| - _state ^= _LISTENER_EVENT_ID; |
| - } |
| - |
| - void _setSubscribed(int eventIdBit) { |
| - assert(eventIdBit == 0 || eventIdBit == 1); |
| - _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); |
| - } |
| - |
| - void _setPendingUnsubscribe(int currentEventIdBit) { |
| - assert(_isSubscribed); |
| - // Sets the pending unsubscribe, and ensures that the listener |
| - // won't get the current event. |
| - _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID; |
| - _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT; |
| - assert(!_needsEvent(currentEventIdBit)); |
| - } |
| - |
| - /** |
| - * Marks the listener as unsubscibed. |
| - * |
| - * Returns the number of unresumed pauses for the listener. |
| - */ |
| - int _setUnsubscribed() { |
| - assert(_isSubscribed); |
| - int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; |
| - _state = _LISTENER_UNSUBSCRIBED; |
| - return timesPaused; |
| - } |
| - |
| - void _incrementPauseCount() { |
| - _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| - } |
| - |
| - void _decrementPauseCount() { |
| - assert(isPaused); |
| - _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
| - } |
| - |
| - _sendData(T data); |
| - _sendError(error); |
| - _sendDone(); |
| -} |
| - |
| /** Superclass for provider of pending events. */ |
| abstract class _PendingEvents { |
| + // No async event has been scheduled. |
| + static const int _STATE_UNSCHEDULED = 0; |
| + // An async event has been scheduled to run a function. |
| + static const int _STATE_SCHEDULED = 1; |
| + // An async event has been scheduled, but it will do nothing when it runs. |
| + // Async events can't be preempted. |
| + static const int _STATE_CANCELLED = 3; |
| + |
| /** |
| * Timer set when pending events are scheduled for execution. |
|
floitsch
2013/05/22 16:26:29
Update comments.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| * |
| @@ -1209,27 +696,44 @@ abstract class _PendingEvents { |
| * is stored here. If pending events are executed earlier than that, e.g., |
| * due to a second event in the current cycle, the timer is canceled again. |
| */ |
| - Timer scheduleTimer = null; |
| + int _state = _STATE_UNSCHEDULED; |
| bool get isEmpty; |
| - bool get isScheduled => scheduleTimer != null; |
| + bool get isScheduled => _state == _STATE_SCHEDULED; |
| + bool get _eventScheduled => _state >= _STATE_SCHEDULED; |
| - void schedule(_StreamImpl stream) { |
| + /** |
| + * Schedule an event to run later. |
| + * |
| + * If called more than once, it should be called with the same dispatch as |
| + * argument each time. It may reuse an earlier argument in some cases. |
| + */ |
| + void schedule(_EventDispatch dispatch) { |
| if (isScheduled) return; |
| - scheduleTimer = new Timer(Duration.ZERO, () { |
| - scheduleTimer = null; |
| - stream._handlePendingEvents(); |
| + assert(!isEmpty); |
| + if (_eventScheduled) { |
| + assert(_state == _STATE_CANCELLED); |
| + _state = _STATE_SCHEDULED; |
| + return; |
| + } |
| + runAsync(() { |
| + int oldState = _state; |
| + _state = _STATE_UNSCHEDULED; |
| + if (oldState == _STATE_CANCELLED) return; |
| + handleNext(dispatch); |
| }); |
| + _state = _STATE_SCHEDULED; |
| } |
| void cancelSchedule() { |
| - assert(isScheduled); |
| - scheduleTimer.cancel(); |
| - scheduleTimer = null; |
| + if (isScheduled) _state = _STATE_CANCELLED; |
| } |
| - void handleNext(_StreamImpl stream); |
| + void handleNext(_EventDispatch dispatch); |
| + |
| + /** Throw away any pending events and cancel scheduled events. */ |
| + void clear(); |
| } |
| @@ -1242,8 +746,6 @@ class _StreamImplEvents extends _PendingEvents { |
| bool get isEmpty => lastPendingEvent == null; |
| - bool get isScheduled => scheduleTimer != null; |
| - |
| void add(_DelayedEvent event) { |
| if (lastPendingEvent == null) { |
| firstPendingEvent = lastPendingEvent = event; |
| @@ -1252,122 +754,353 @@ class _StreamImplEvents extends _PendingEvents { |
| } |
| } |
| - void handleNext(_StreamImpl stream) { |
| + void handleNext(_EventDispatch dispatch) { |
| assert(!isScheduled); |
| _DelayedEvent event = firstPendingEvent; |
| firstPendingEvent = event.next; |
| if (firstPendingEvent == null) { |
| lastPendingEvent = null; |
| } |
| - event.perform(stream); |
| + event.perform(dispatch); |
| + } |
| + |
| + void clear() { |
| + if (isScheduled) cancelSchedule(); |
| + firstPendingEvent = lastPendingEvent = null; |
| } |
| } |
| +class _MultiplexerLinkedList { |
| + _MultiplexerLinkedList _next; |
| + _MultiplexerLinkedList _previous; |
| -class _DoneSubscription<T> implements StreamSubscription<T> { |
| - _DoneHandler _handler; |
| - Timer _timer; |
| - int _pauseCount = 0; |
| + void _unlink() { |
| + _previous._next = _next; |
| + _next._previous = _previous; |
| + _next = _previous = this; |
| + } |
| - _DoneSubscription(this._handler) { |
| - _delayDone(); |
| + void _insertBefore(_MultiplexerLinkedList newNext) { |
| + _MultiplexerLinkedList newPrevious = newNext._previous; |
| + newPrevious._next = this; |
| + newNext._previous = _previous; |
| + _previous._next = newNext; |
| + _previous = newPrevious; |
| } |
| +} |
| - void _delayDone() { |
| - assert(_timer == null && _pauseCount == 0); |
| - _timer = new Timer(Duration.ZERO, () { |
| - if (_handler != null) _handler(); |
| - }); |
| +// TODO(lrn): Change "implements" to "with" when automatic mixin constructors |
| +// are implemented. |
| +class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> |
|
floitsch
2013/05/22 16:26:29
Add comment what this class does.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| + implements _MultiplexerLinkedList { |
| + static const int _STATE_NOT_LISTENING = 0; |
| + // Bit that alternates between event firings. If bit matches the one currently |
| + // firing, the subscription will not be notified. |
| + static const int _STATE_EVENT_ID_BIT = 1; |
| + static const int _STATE_LISTENING = 2; |
| + static const int _STATE_IS_FIRING = 4; |
| + static const int _STATE_REMOVE_AFTER_FIRING = 8; |
| + |
| + // Firing state. |
| + int _multiplexState; |
| + |
| + _SingleStreamMultiplexer _source; |
| + |
| + _MultiplexerSubscription(this._source, |
| + void onData(T data), |
| + void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError, |
| + int nextEventId) |
| + : _multiplexState = _STATE_LISTENING | nextEventId, |
| + super(onData, onError, onDone, cancelOnError, null) { |
| + _next = _previous = this; |
| + } |
| + |
| + // Mixin workaround. |
| + _MultiplexerLinkedList _next; |
| + _MultiplexerLinkedList _previous; |
| + |
| + void _unlink() { |
| + _previous._next = _next; |
| + _next._previous = _previous; |
| + _next = _previous = this; |
| + } |
| + |
| + void _insertBefore(_MultiplexerLinkedList newNext) { |
| + _MultiplexerLinkedList newPrevious = newNext._previous; |
| + newPrevious._next = this; |
| + newNext._previous = _previous; |
| + _previous._next = newNext; |
| + _previous = newPrevious; |
| + } |
| + // End mixin. |
| + |
| + bool get _isListening => _multiplexState >= _STATE_LISTENING; |
| + bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; |
| + bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; |
| + int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; |
| + |
| + void _setRemoveAfterFiring() { |
| + assert(_isFiring); |
| + _multiplexState |= _STATE_REMOVE_AFTER_FIRING; |
| + } |
| + |
| + void _startFiring() { |
| + assert(!_isFiring); |
| + _multiplexState |= _STATE_IS_FIRING; |
| } |
| - bool get _isComplete => _timer == null && _pauseCount == 0; |
| + /// Marks listener as no longer firing, and toggles its event id. |
| + void _endFiring() { |
| + assert(_isFiring); |
| + _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); |
| + } |
| - void onData(void handleAction(T value)) {} |
| + void _setNotListening() { |
| + assert(_isListening); |
| + _multiplexState = _STATE_NOT_LISTENING; |
| + } |
| - void onError(void handleError(error)) {} |
| + void _onCancel() { |
| + assert(_isListening); |
| + _source._removeListener(this); |
| + } |
| +} |
| - void onDone(void handleDone()) { |
| - _handler = handleDone; |
| +// TODO(lrn): change "implements" to "with" when the VM supports it. |
| +class _SingleStreamMultiplexer<T> extends Stream<T> |
|
floitsch
2013/05/22 16:26:29
Add comment.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| + implements _MultiplexerLinkedList, |
| + _EventDispatch<T> { |
| + final Stream<T> _source; |
| + StreamSubscription<T> _subscription; |
| + // Alternates between zero and one for each event fired. |
| + // Listeners are initialized with the next event's id, and will |
| + // only be notified if they match the event being fired. |
| + // That way listeners added during event firing will not receive |
| + // the current event. |
| + int _eventId = 0; |
| + |
| + bool _isFiring = false; |
| + |
| + // Remember events added while firing. |
| + _StreamImplEvents _pending; |
| + |
| + _SingleStreamMultiplexer(this._source) { |
| + _next = _previous = this; |
| + } |
| + |
| + bool get _hasPending => _pending != null && !_pending.isEmpty; |
| + |
| + // Should be mixin. |
| + _MultiplexerLinkedList _next; |
| + _MultiplexerLinkedList _previous; |
| + |
| + void _unlink() { |
| + _previous._next = _next; |
| + _next._previous = _previous; |
| + _next = _previous = this; |
| + } |
| + |
| + void _insertBefore(_MultiplexerLinkedList newNext) { |
| + _MultiplexerLinkedList newPrevious = newNext._previous; |
| + newPrevious._next = this; |
| + newNext._previous = _previous; |
| + _previous._next = newNext; |
| + _previous = newPrevious; |
| } |
| + // End of mixin. |
| - void pause([Future signal]) { |
| - if (_isComplete) return; |
| - if (_timer != null) { |
| - _timer.cancel(); |
| - _timer = null; |
| + StreamSubscription<T> listen(void onData(T data), |
| + { void onError(Object error), |
| + void onDone(), |
| + bool cancelOnError }) { |
| + if (onData == null) onData = _nullDataHandler; |
| + if (onError == null) onError = _nullErrorHandler; |
| + if (onDone == null) onDone = _nullDoneHandler; |
| + cancelOnError = identical(true, cancelOnError); |
| + _MultiplexerSubscription subscription = |
| + new _MultiplexerSubscription(this, onData, onError, onDone, |
| + cancelOnError, _eventId); |
| + if (_subscription == null) { |
| + _subscription = _source.listen(_add, onError: _addError, onDone: _close); |
| } |
| - _pauseCount++; |
| - if (signal != null) signal.whenComplete(resume); |
| + subscription._insertBefore(this); |
| + return subscription; |
| } |
| - void resume() { |
| - if (_isComplete) return; |
| - if (_pauseCount == 0) return; |
| - _pauseCount--; |
| - if (_pauseCount == 0) { |
| - _delayDone(); |
| + /** Called by [_MultiplexerSubscription.remove]. */ |
| + void _removeListener(_MultiplexerSubscription listener) { |
| + if (listener._isFiring) { |
| + listener._setRemoveAfterFiring(); |
| + } else { |
| + _unlinkListener(listener); |
| } |
| } |
| - bool get isPaused => _pauseCount > 0; |
| + /** Remove a listener and close the subscription after the last one. */ |
| + void _unlinkListener(_MultiplexerSubscription listener) { |
| + listener._setNotListening(); |
| + listener._unlink(); |
| + if (identical(_next, this)) { |
| + // Last listener removed. |
| + _cancel(); |
| + } |
| + } |
| - void cancel() { |
| - if (_isComplete) return; |
| - if (_timer != null) { |
| - _timer.cancel(); |
| - _timer = null; |
| + void _cancel() { |
| + StreamSubscription subscription = _subscription; |
| + _subscription = null; |
| + subscription.cancel(); |
| + if (_pending != null) _pending.cancelSchedule(); |
| + } |
| + |
| + void _forEachListener(void action(_MultiplexerSubscription listener)) { |
| + int eventId = _eventId; |
| + _eventId ^= 1; |
| + _isFiring = true; |
| + _MultiplexerLinkedList entry = _next; |
| + // Call each listener in order. A listener can be removed during any |
| + // other listener's event. During its own event it will only be marked |
| + // as "to be removed", and it will be handled after the event is done. |
| + while (!identical(entry, this)) { |
| + _MultiplexerSubscription listener = entry; |
| + if (listener._eventId == eventId) { |
| + listener._startFiring(); |
| + action(listener); |
| + listener._endFiring(); // Also toggles the event id. |
| + } |
| + entry = listener._next; |
| + if (listener._removeAfterFiring) { |
| + _unlinkListener(listener); |
| + } |
| } |
| - _pauseCount = 0; |
| + _isFiring = false; |
| } |
| - Future asFuture([var futureValue]) { |
| - // TODO(floitsch): share more code. |
| - _FutureImpl<T> result = new _FutureImpl<T>(); |
| + void _add(T data) { |
|
floitsch
2013/05/22 16:26:29
Why is this not handled by the bufferingStreamSubs
Lasse Reichstein Nielsen
2013/05/24 06:02:49
This isn't a StreamSubscription - it's a Stream, m
|
| + if (_isFiring || _hasPending) { |
| + _StreamImplEvents pending = _pending; |
| + if (pending == null) pending = _pending = new _StreamImplEvents(); |
| + pending.add(new _DelayedData(data)); |
| + } else { |
| + _sendData(data); |
| + } |
| + } |
| - // Overwrite the onDone and onError handlers. |
| - onDone(() { result._setValue(futureValue); }); |
| - onError((error) { |
| - cancel(); |
| - result._setError(error); |
| + void _addError(Object error) { |
| + if (_isFiring || _hasPending) { |
|
floitsch
2013/05/22 16:26:29
ditto.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ditto too.
|
| + _StreamImplEvents pending = _pending; |
| + if (pending == null) pending = _pending = new _StreamImplEvents(); |
| + pending.add(new _DelayedError(error)); |
| + } else { |
| + _sendError(error); |
| + } |
| + } |
| + |
| + void _close() { |
| + if (_isFiring || _hasPending) { |
|
floitsch
2013/05/22 16:26:29
ditto.
|
| + _StreamImplEvents pending = _pending; |
| + if (pending == null) pending = _pending = new _StreamImplEvents(); |
| + pending.add(const _DelayedDone()); |
| + } else { |
| + _sendDone(); |
| + } |
| + } |
| + |
| + void _sendData(T data) { |
| + _forEachListener((_MultiplexerSubscription listener) { |
| + listener._add(data); |
| }); |
| + if (_hasPending) { |
| + _pending.schedule(this); |
| + } |
| + } |
| - return result; |
| + void _sendError(Object error) { |
| + _forEachListener((_MultiplexerSubscription listener) { |
| + listener._addError(error); |
| + }); |
| + if (_hasPending) { |
| + _pending.schedule(this); |
| + } |
| } |
| -} |
| -class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
| - final Stream<T> _source; |
| - StreamSubscription<T> _subscription; |
| + void _sendDone() { |
| + _forEachListener((_MultiplexerSubscription listener) { |
| + listener._setRemoveAfterFiring(); |
| + listener._close(); |
| + }); |
| + } |
| +} |
| - _SingleStreamMultiplexer(this._source); |
| - void _callOnPauseStateChange() { |
| - if (_isPaused) { |
| - if (_subscription != null) { |
| - _subscription.pause(); |
| - } |
| - } else { |
| - if (_subscription != null) { |
| - _subscription.resume(); |
| - } |
| +/** |
| + * Simple implementation of [StreamIterator]. |
| + */ |
| +class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| + // TODO(lrn): Keep a one (or two) element buffer to avoid pausing when not |
|
floitsch
2013/05/22 16:26:29
I guess the ideal solution would be if the constru
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I don't want this to be about buffering, it should
|
| + // necessary. |
| + StreamSubscription _subscription; |
| + _FutureImpl<bool> _hasNext; |
| + T _current; |
| + |
| + _StreamIteratorImpl(final Stream<T> stream) { |
| + _subscription = stream.listen(_onData, |
| + onError: _onError, |
| + onDone: _onDone, |
| + cancelOnError: true); |
| + _subscription.pause(); |
| + } |
| + |
| + Future<bool> moveNext() { |
| + if (_hasNext != null) throw new StateError("Already waiting for next."); |
| + if (_subscription == null) { |
| + return new _FutureImpl<bool>.immediate(false); |
| } |
| + _current = null; |
| + _subscription.resume(); |
| + _hasNext = new _FutureImpl<bool>(); |
| + return _hasNext; |
| } |
| - /** |
| - * Subscribe or unsubscribe on [_source] depending on whether |
| - * [_stream] has subscribers. |
| - */ |
| - void _onSubscriptionStateChange() { |
| - if (_hasListener) { |
| - assert(_subscription == null); |
| - _subscription = _source.listen(this._add, |
| - onError: this._addError, |
| - onDone: this._close); |
| - } else { |
| - // TODO(lrn): Check why this can happen. |
| - if (_subscription == null) return; |
| - _subscription.cancel(); |
| - _subscription = null; |
| + T get current => _current; |
| + |
| + void cancel() { |
| + StreamSubscription subscription = _subscription; |
| + _subscription = null; |
| + _current = null; |
| + subscription.cancel(); |
| + if (_hasNext != null) { |
| + _FutureImpl<bool> hasNext = _hasNext; |
| + _hasNext = null; |
| + hasNext._setValue(false); |
| } |
| } |
| + |
| + void _onData(T data) { |
| + assert(_hasNext != null); |
| + _FutureImpl<bool> hasNext = _hasNext; |
| + _hasNext = null; |
| + _current = data; |
| + _subscription.pause(); |
| + hasNext._setValue(true); |
| + } |
| + |
| + void _onError(Object error) { |
| + assert(_hasNext != null); |
| + _FutureImpl<bool> hasNext = _hasNext; |
| + _hasNext = null; |
| + _subscription = null; |
| + hasNext._setError(error); |
| + // We have cancelOnError: true, so the subscription is cancelled. |
|
floitsch
2013/05/22 16:26:29
canceled.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
| + } |
| + |
| + void _onDone() { |
| + assert(_hasNext != null); |
| + _FutureImpl<bool> hasNext = _hasNext; |
| + _hasNext = null; |
| + _subscription = null; |
| + hasNext._setValue(false); |
| + } |
| } |