Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 8fa1848d14ac0261b2943dfc41c4ba314d457162..9f39368e0a73a3961bf81578f20a730d22c85d3e 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -4,57 +4,6 @@ |
part of dart.async; |
-// States shared by single/multi stream implementations. |
- |
-// Completion state of the stream. |
-/// Initial and default state where the stream can receive and send events. |
-const int _STREAM_OPEN = 0; |
-/// The stream has received a request to complete, but hasn't done so yet. |
-/// No further events can be added to the stream. |
-const int _STREAM_CLOSED = 1; |
-/// The stream has completed and will no longer receive or send events. |
-/// Also counts as closed. The stream must not be paused when it's completed. |
-/// Always used in conjunction with [_STREAM_CLOSED]. |
-const int _STREAM_COMPLETE = 2; |
- |
-/// Bit that alternates between events, and listeners are updated to the |
-/// current value when they are notified of the event. |
-const int _STREAM_EVENT_ID = 4; |
-const int _STREAM_EVENT_ID_SHIFT = 2; |
- |
-// The activity state of the stream: What is it currently doing. |
-/// Bit set while firing and clear while not. |
-const int _STREAM_FIRING = 8; |
-/// Bit set while calling a pause-state or subscription-state change callback. |
-const int _STREAM_CALLBACK = 16; |
- |
-// The pause state of the stream. |
-/// Bit set when resuming with pending events. Cleared after all pending events |
-/// have been transmitted. Means that the controller still considers the |
-/// stream paused, even if the listener doesn't. |
-const int _STREAM_PENDING_RESUME = 32; |
-/// The count of times a stream has paused is stored in the |
-/// state, shifted by this amount. |
-const int _STREAM_PAUSE_COUNT_SHIFT = 6; |
- |
-// States for listeners. |
- |
-/// The listener is currently not subscribed to its source stream. |
-const int _LISTENER_UNSUBSCRIBED = 0; |
-/// The listener is actively subscribed to its source stream. |
-const int _LISTENER_SUBSCRIBED = 1; |
-/// The listener is subscribed until it has been notified of the current event. |
-/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. |
-const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
- |
-/// Bit that contains the last sent event's "id bit". |
-const int _LISTENER_EVENT_ID = 4; |
-const int _LISTENER_EVENT_ID_SHIFT = 2; |
- |
-/// The count of times a listener has paused is stored in the |
-/// state, shifted by this amount. |
-const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
- |
/** Throws the given error in the next cycle. */ |
_throwDelayed(var error, [Object stackTrace]) { |
// We are going to reach the top-level here, but there might be a global |
@@ -69,915 +18,527 @@ _throwDelayed(var error, [Object stackTrace]) { |
}); |
} |
+/** Abstract and private interface for a place to put events. */ |
+abstract class _EventSink<T> { |
+ void _add(T data); |
+ void _addError(Object error); |
+ void _close(); |
+} |
-// ------------------------------------------------------------------- |
-// Common base class for single and multi-subscription streams. |
-// ------------------------------------------------------------------- |
-abstract class _StreamImpl<T> extends Stream<T> { |
- /** Current state of the stream. */ |
- int _state = _STREAM_OPEN; |
- |
- /** |
- * List of pending events. |
- * |
- * If events are added to the stream (using [_add], [_addError] or [_done]) |
- * while the stream is paused, or while another event is firing, events will |
- * stored here. |
- * Also supports scheduling the events for later execution. |
- */ |
- _PendingEvents _pendingEvents; |
- |
- // ------------------------------------------------------------------ |
- // Stream interface. |
- |
- StreamSubscription<T> listen(void onData(T data), |
- { void onError(error), |
- void onDone(), |
- bool cancelOnError }) { |
- if (_isComplete) { |
- return new _DoneSubscription(onDone); |
- } |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
- cancelOnError = identical(true, cancelOnError); |
- _StreamSubscriptionImpl subscription = |
- _createSubscription(onData, onError, onDone, cancelOnError); |
- _addListener(subscription); |
- return subscription; |
- } |
- |
- // ------------------------------------------------------------------ |
- // EventSink interface-like methods for sending events into the stream. |
- // It's the responsibility of the caller to ensure that the stream is not |
- // paused when adding events. If the stream is paused, the events will be |
- // queued, but it's better to not send events at all. |
- |
- /** |
- * Send or queue a data event. |
- */ |
- void _add(T value) { |
- if (_isClosed) throw new StateError("Sending on closed stream"); |
- if (!_mayFireState) { |
- // Not the time to send events. |
- _addPendingEvent(new _DelayedData<T>(value)); |
- return; |
- } |
- if (_hasPendingEvent) { |
- _addPendingEvent(new _DelayedData<T>(value)); |
- } else { |
- _sendData(value); |
- } |
- _handlePendingEvents(); |
- } |
+/** |
+ * Abstract and private interface for a place to send events. |
+ * |
+ * Used by event buffering to finally dispatch the pending event, where |
+ * [_EventSink] is where the event first enters the stream subscription, |
+ * and may yet be buffered. |
+ */ |
+abstract class _EventDispatch<T> { |
+ void _sendData(T data); |
+ void _sendError(Object error); |
+ void _sendDone(); |
+} |
+/** |
+ * Default implementation of stream subscription of buffering events. |
+ * |
+ * The only public methods are those of [StreamSubscription], so instances of |
+ * [_BufferingStreamSubscription] can be returned directly as a |
+ * [StreamSubscription] without exposing internal functionality. |
+ * |
+* The [StreamController] is a public facing version of [Stream] and this class, |
+ * with some methods made public. |
+ * |
+ * The user interface of [_BufferingStreamSubscription] are the following |
+ * methods: |
+ * * [_add]: Add a data event to the stream. |
+ * * [_addError]: Add an error event to the stream. |
+ * * [_close]: Request to close the stream. |
+ * * [_onCancel]: Called when the subscription stops listening. |
+ * * [_onPause]: Called when the subscription wants the event source to pause. |
+ * * [_onResume]: Called when allowing new events after a pause. |
+ * The user should not add new eventswhen the subscription requests a paused, |
floitsch
2013/05/22 16:26:29
space missing.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ * but if it happens anyway, the subscription will enqueue the events just as |
+ * when new events arrive while still firing an old event. |
+ */ |
+class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
+ _EventSink<T>, |
+ _EventDispatch<T> { |
+ /** The `cancelOnError` flag from the `listen` call. */ |
+ static const int _STATE_CANCEL_ON_ERROR = 1; |
/** |
- * Send or enqueue an error event. |
- * |
- * If a subscription has requested to be unsubscribed on errors, |
- * it will be unsubscribed after receiving this event. |
+ * Whether the "done" event has been received. |
+ * No further events are accepted after this. |
*/ |
- void _addError(error) { |
- if (_isClosed) throw new StateError("Sending on closed stream"); |
- if (!_mayFireState) { |
- // Not the time to send events. |
- _addPendingEvent(new _DelayedError(error)); |
- return; |
- } |
- if (_hasPendingEvent) { |
- _addPendingEvent(new _DelayedError(error)); |
- } else { |
- _sendError(error); |
- } |
- _handlePendingEvents(); |
- } |
- |
+ static const int _STATE_CLOSED = 2; |
/** |
- * Send or enqueue a "done" message. |
+ * Set if the input has been asked not to send events. |
* |
- * The "done" message should be sent at most once by a stream, and it |
- * should be the last message sent. |
+ * This is not the same as being paused, since the input will remain paused |
+ * after a call to [resume] if there are pending events. |
*/ |
- void _close() { |
- if (_isClosed) return; |
- _state |= _STREAM_CLOSED; |
- if (!_mayFireState) { |
- // Not the time to send events. |
- _addPendingEvent(const _DelayedDone()); |
- return; |
- } |
- if (_hasPendingEvent) { |
- _addPendingEvent(new _DelayedDone()); |
- _handlePendingEvents(); |
- } else { |
- _sendDone(); |
- assert(_isComplete); |
- assert(!_hasPendingEvent); |
- } |
- } |
- |
- // ------------------------------------------------------------------- |
- // Internal implementation. |
- |
- // State predicates. |
- |
- // Lifecycle state. |
- /** Whether the stream is in the default, open, state for events. */ |
- bool get _isOpen => (_state & (_STREAM_CLOSED | _STREAM_COMPLETE)) == 0; |
- |
- /** Whether the stream has been closed (a done event requested). */ |
- bool get _isClosed => (_state & _STREAM_CLOSED) != 0; |
- |
- /** Whether the stream is completed. */ |
- bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; |
- |
- // Pause state. |
- |
- /** Whether one or more active subscribers have requested a pause. */ |
- bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); |
- |
- /** How many times the stream has been paused. */ |
- int get _pauseCount => _state >> _STREAM_PAUSE_COUNT_SHIFT; |
- |
+ static const int _STATE_INPUT_PAUSED = 4; |
/** |
- * Whether a controller thinks the stream is paused. |
- * |
- * When this changes, a pause-state change callback is performed. |
+ * Whether the subscription has been cancelled. |
* |
- * It may differ from [_isPaused] if there are pending events |
- * in the queue when the listeners resume. The controller won't |
- * be informed until all queued events have been fired. |
+ * Set by calling [cancel], or by handling a "done" event, or an "error" event |
+ * when `cancelOnError` is true. |
*/ |
- bool get _isInputPaused => _state >= (_STREAM_PENDING_RESUME); |
- |
- /** Whether we have a pending resume scheduled. */ |
- bool get _hasPendingResume => (_state & _STREAM_PENDING_RESUME) != 0; |
- |
- |
- // Action state. If the stream makes a call-out to external code, |
- // this state tracks it and avoids reentrancy problems. |
- |
- /** Whether the stream is not currently firing or calling a callback. */ |
- bool get _isInactive => (_state & (_STREAM_CALLBACK | _STREAM_FIRING)) == 0; |
- |
- /** Whether we are currently executing a state-chance callback. */ |
- bool get _isInCallback => (_state & _STREAM_CALLBACK) != 0; |
- |
- /** Whether we are currently firing an event. */ |
- bool get _isFiring => (_state & _STREAM_FIRING) != 0; |
+ static const int _STATE_CANCELLED = 8; |
floitsch
2013/05/22 16:26:29
_STATE_CANCELED
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Bowdlerizing constants! :(
|
+ static const int _STATE_IN_CALLBACK = 16; |
+ static const int _STATE_HAS_PENDING = 32; |
+ static const int _STATE_PAUSE_COUNT = 64; |
+ static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
+ |
+ /** Event handlers provided in constructor. */ |
+ _DataHandler<T> _onData; |
+ _ErrorHandler _onError; |
+ _DoneHandler _onDone; |
- /** Check whether the pending event queue is non-empty */ |
- bool get _hasPendingEvent => |
- _pendingEvents != null && !_pendingEvents.isEmpty; |
+ /** Bit vector based on state-constants above. */ |
+ int _state; |
/** |
- * The bit representing the current or last event fired. |
+ * Queue of pending events. |
* |
- * This bit matches a bit on listeners that have received the corresponding |
- * event. It is toggled for each new event being fired. |
+ * Is created when necessary, or set in constructor for preconfigured events. |
*/ |
- int get _currentEventIdBit => |
- (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
- |
- /** Whether there is currently a subscriber on this [Stream]. */ |
- bool get _hasListener; |
- |
- |
- /** Whether the state bits allow firing. */ |
- bool get _mayFireState { |
- // The state allows firing unless: |
- // - it's currently firing |
- // - it's currently in a callback |
- // - it's paused |
- const int mask = |
- _STREAM_FIRING | |
- _STREAM_CALLBACK | |
- ~((1 << _STREAM_PAUSE_COUNT_SHIFT) - 1); |
- return (_state & mask) == 0; |
- } |
- |
- // State modification. |
- |
- /** Record an increases in the number of times the listener has paused. */ |
- void _incrementPauseCount(_StreamListener<T> listener) { |
- listener._incrementPauseCount(); |
- _state &= ~_STREAM_PENDING_RESUME; |
- _updatePauseCount(1); |
- } |
- |
- /** Record a decrease in the number of times the listener has paused. */ |
- void _decrementPauseCount(_StreamListener<T> listener) { |
- assert(_isPaused); |
- listener._decrementPauseCount(); |
- _updatePauseCount(-1); |
- } |
- |
- /** Update the stream's own pause count only. */ |
- void _updatePauseCount(int by) { |
- int oldState = _state; |
- // We can't just _state += by << _STREAM_PAUSE_COUNT_SHIFT, since dart2js |
- // converts the result of the left-shift to a positive number. |
- if (by >= 0) { |
- _state = oldState + (by << _STREAM_PAUSE_COUNT_SHIFT); |
- } else { |
- _state = oldState - ((-by) << _STREAM_PAUSE_COUNT_SHIFT); |
+ _PendingEvents _pending; |
+ |
+ _BufferingStreamSubscription(this._onData, |
+ this._onError, |
+ this._onDone, |
+ bool cancelOnError, |
+ this._pending) |
+ : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
+ assert(_onData != null); |
+ assert(_onError != null); |
+ assert(_onDone != null); |
+ if (_pending != null && !_pending.isEmpty) { |
+ _state |= _STATE_HAS_PENDING; |
+ _pending.schedule(this); |
} |
- assert(_state >= 0); |
- assert((_state >> _STREAM_PAUSE_COUNT_SHIFT) == |
- (oldState >> _STREAM_PAUSE_COUNT_SHIFT) + by); |
} |
- void _setClosed() { |
- assert(!_isClosed); |
- _state |= _STREAM_CLOSED; |
- } |
+ // StreamSubscription interface. |
- void _setComplete() { |
- assert(_isClosed); |
- _state = _state |_STREAM_COMPLETE; |
+ void onData(void handleData(T event)) { |
+ if (handleData == null) handleData = _nullDataHandler; |
+ _onData = handleData; |
} |
- void _startFiring() { |
- assert(!_isFiring); |
- assert(!_isInCallback); |
- assert(_hasListener); |
- assert(!_isPaused); |
- // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
- // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
- // that doesn't match _STREAM_EVENT_ID, and they will receive the |
- // event being fired. |
- _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
+ void onError(void handleError(error)) { |
+ if (handleError == null) handleError = _nullErrorHandler; |
+ _onError = handleError; |
} |
- void _endFiring(bool wasInputPaused) { |
- assert(_isFiring); |
- _state ^= _STREAM_FIRING; |
- // Had listeners, or we wouldn't have fired. |
- _checkCallbacks(true, wasInputPaused); |
+ void onDone(void handleDone()) { |
+ if (handleDone == null) handleDone = _nullDoneHandler; |
+ _onDone = handleDone; |
} |
- /** |
- * Record that a listener wants a pause from events. |
- * |
- * This methods is called from [_StreamListener.pause()]. |
- * Subclasses can override this method, along with [isPaused] and |
- * [createSubscription], if they want to do a different handling of paused |
- * subscriptions, e.g., a filtering stream pausing its own source if all its |
- * subscribers are paused. |
- */ |
- void _pause(_StreamListener<T> listener, Future resumeSignal) { |
- assert(identical(listener._source, this)); |
- if (!listener._isSubscribed) { |
- throw new StateError("Subscription has been canceled."); |
- } |
- assert(!_isComplete); // There can be no subscribers when complete. |
- bool wasInputPaused = _isInputPaused; |
+ void pause([Future resumeSignal]) { |
+ if (_cancelled) return; |
bool wasPaused = _isPaused; |
- _incrementPauseCount(listener); |
- if (resumeSignal != null) { |
- resumeSignal.whenComplete(() { this._resume(listener, true); }); |
- } |
- if (!wasPaused && _hasPendingEvent && _pendingEvents.isScheduled) { |
- _pendingEvents.cancelSchedule(); |
- } |
- if (_isInactive && !wasInputPaused) { |
- _checkCallbacks(true, false); |
- if (!_isPaused && _hasPendingEvent) { |
- _schedulePendingEvents(); |
- } |
- } |
+ bool wasInputPaused = _isInputPaused; |
+ // Increment pause count and mark input paused (if it isn't already). |
+ _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
+ if (resumeSignal != null) resumeSignal.whenComplete(resume); |
+ if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
+ if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); |
} |
- /** Stops pausing due to one request from the given listener. */ |
- void _resume(_StreamListener<T> listener, bool fromEvent) { |
- if (!listener.isPaused) return; |
- assert(listener._isSubscribed); |
- assert(_isPaused); |
- _decrementPauseCount(listener); |
- if (!_isPaused) { |
- if (_hasPendingEvent) { |
- _state |= _STREAM_PENDING_RESUME; |
- // Controller's pause state hasn't changed. |
- // If we can fire events now, fire any pending events right away. |
- if (_isInactive) { |
- if (fromEvent) { |
- _handlePendingEvents(); |
- } else { |
- _schedulePendingEvents(); |
- } |
- } |
- } else if (_isInactive) { |
- _checkCallbacks(true, true); |
- if (!_isPaused && _hasPendingEvent) { |
- if (fromEvent) { |
- _handlePendingEvents(); |
- } else { |
- _schedulePendingEvents(); |
- } |
+ void resume() { |
+ if (_cancelled) return; |
+ if (_isPaused) { |
+ _decrementPauseCount(); |
+ if (!_isPaused) { |
+ if (_hasPending && !_pending.isEmpty) { |
+ // Input is still paused. |
+ _pending.schedule(this); |
+ } else { |
+ assert(_mayResumeInput); |
+ _state &= ~_STATE_INPUT_PAUSED; |
+ if (!_inCallback) _guardCallback(_onResume); |
} |
} |
} |
} |
- /** Schedule pending events to be executed. */ |
- void _schedulePendingEvents() { |
- assert(_hasPendingEvent); |
- _pendingEvents.schedule(this); |
+ void cancel() { |
+ if (_cancelled) return; |
+ _cancel(); |
+ if (!_inCallback) { |
+ // otherwise checkState will be called after firing or callback completes. |
+ _state |= _STATE_IN_CALLBACK; |
+ try { |
+ _onCancel(); |
+ } catch (e, s) { |
+ print("BAD THROW: \n$s"); |
floitsch
2013/05/22 16:26:29
So _onCancel is not allowed to throw? Should this
Lasse Reichstein Nielsen
2013/05/24 06:02:49
_onCancel is an internal method only. It should ne
|
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
+ } |
} |
- /** Create a subscription object. Called by [subcribe]. */ |
- _StreamSubscriptionImpl<T> _createSubscription( |
- void onData(T data), |
- void onError(error), |
- void onDone(), |
- bool cancelOnError); |
+ Future asFuture([var futureValue]) { |
+ _FutureImpl<T> result = new _FutureImpl<T>(); |
- /** |
- * Adds a listener to this stream. |
- */ |
- void _addListener(_StreamSubscriptionImpl subscription); |
+ // Overwrite the onDone and onError handlers. |
+ _onDone = () { result._setValue(futureValue); }; |
+ _onError = (error) { |
+ cancel(); |
+ result._setError(error); |
+ }; |
- /** |
- * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
- * |
- * This method is called from [_StreamSubscriptionImpl.cancel]. |
- * |
- * If an event is currently firing, the cancel is delayed |
- * until after the subscribers have received the event. |
- */ |
- void _cancel(_StreamSubscriptionImpl subscriber); |
+ return result; |
+ } |
- /** |
- * Iterate over all current subscribers and perform an action on each. |
- * |
- * Subscribers added during the iteration will not be visited. |
- * Subscribers unsubscribed during the iteration will only be removed |
- * after they have been acted on. |
- * |
- * Any change in the pause state is only reported after all subscribers have |
- * received the event. |
- * |
- * The [action] must not throw, or the controller will be left in an |
- * invalid state. |
- * |
- * This method must not be called while [isFiring] is true. |
- */ |
- void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
+ // State management. |
- /** |
- * Checks whether the subscription/pause state has changed. |
- * |
- * Calls the appropriate callback if the state has changed from the |
- * provided one. Repeats calling callbacks as long as the call changes |
- * the state. |
- */ |
- void _checkCallbacks(bool hadListener, bool wasPaused) { |
- assert(!_isFiring); |
- // Will be handled after the current callback. |
- if (_isInCallback) return; |
- if (_hasPendingResume && !_hasPendingEvent) { |
- _state ^= _STREAM_PENDING_RESUME; |
- } |
- _state |= _STREAM_CALLBACK; |
- while (true) { |
- bool hasListener = _hasListener; |
- bool isPaused = _isInputPaused; |
- if (hadListener != hasListener) { |
- _onSubscriptionStateChange(); |
- } else if (isPaused != wasPaused) { |
- _onPauseStateChange(); |
- } else { |
- _state ^= _STREAM_CALLBACK; |
- return; |
- } |
- wasPaused = isPaused; |
- hadListener = hasListener; |
+ bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
+ bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
+ bool get _cancelled => (_state & _STATE_CANCELLED) != 0; |
floitsch
2013/05/22 16:26:29
isCanceled
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
+ bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
+ bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
+ bool get _canFire => _state < _STATE_IN_CALLBACK; |
+ bool get _mayResumeInput => |
+ !_isPaused && (_pending == null || _pending.isEmpty); |
floitsch
2013/05/22 16:26:29
shouldn't _hasPending do the null-check too?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
_hasPending is accessing a cached bit in the state
|
+ bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
+ |
+ bool get isPaused => _isPaused; |
+ |
+ void _cancel() { |
+ _state |= _STATE_CANCELLED; |
+ if (_hasPending) { |
+ _pending.clear(); |
} |
} |
/** |
- * Called when the first subscriber requests a pause or the last a resume. |
+ * Increment the pause count. |
* |
- * Read [isPaused] to see the new state. |
+ * Also marks input as paused. |
*/ |
- void _onPauseStateChange() {} |
- |
- /** |
- * Called when the first listener subscribes or the last unsubscribes. |
- * |
- * Read [hasListener] to see what the new state is. |
- */ |
- void _onSubscriptionStateChange() {} |
+ void _incrementPauseCount() { |
+ _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
+ } |
/** |
- * Add a pending event at the end of the pending event queue. |
+ * Decrements the pause count. |
* |
- * Schedules events if currently not paused and inside a callback. |
+ * Does not automatically unpause the input (call [_onResume]) when |
+ * the pause count reaches zero. This is handled elsewhere, and only |
+ * if there are no pending events buffered. |
*/ |
- void _addPendingEvent(_DelayedEvent event) { |
- if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
- _StreamImplEvents events = _pendingEvents; |
- events.add(event); |
- if (_isPaused || _isFiring) return; |
- if (_isInCallback) { |
- _schedulePendingEvents(); |
- return; |
- } |
+ void _decrementPauseCount() { |
+ assert(_isPaused); |
+ _state -= _STATE_PAUSE_COUNT; |
} |
- /** Fire any pending events until the pending event queue is empty. */ |
- void _handlePendingEvents() { |
- assert(_isInactive); |
- if (!_hasPendingEvent) return; |
- _PendingEvents events = _pendingEvents; |
- do { |
- if (_isPaused) return; |
- if (events.isScheduled) events.cancelSchedule(); |
- events.handleNext(this); |
- } while (!events.isEmpty); |
- } |
+ // _EventSink interface. |
- /** |
- * Send a data event directly to each subscriber. |
- */ |
- _sendData(T value) { |
- assert(!_isPaused); |
- assert(!_isComplete); |
- if (!_hasListener) return; |
- _forEachSubscriber((subscriber) { |
- try { |
- subscriber._sendData(value); |
- } catch (e, s) { |
- _throwDelayed(e, s); |
- } |
- }); |
+ void _add(T data) { |
+ assert(!_isClosed); |
+ if (_cancelled) return; |
+ if (_canFire) { |
+ _sendData(data); |
+ } else { |
+ _addPending(new _DelayedData(data)); |
+ } |
} |
- /** |
- * Sends an error event directly to each subscriber. |
- */ |
- void _sendError(error) { |
- assert(!_isPaused); |
- assert(!_isComplete); |
- if (!_hasListener) return; |
- _forEachSubscriber((subscriber) { |
- try { |
- subscriber._sendError(error); |
- } catch (e, s) { |
- _throwDelayed(e, s); |
- } |
- }); |
+ void _addError(Object error) { |
+ if (_cancelled) return; |
+ if (_cancelOnError) { |
+ /// TODO: handle here? |
+ } |
+ if (_canFire) { |
+ _sendError(error); // Reports cancel after sending. |
+ } else { |
+ _addPending(new _DelayedError(error)); |
+ } |
} |
- /** |
- * Sends the "done" message directly to each subscriber. |
- * This automatically stops further subscription and |
- * unsubscribes all subscribers. |
- */ |
- void _sendDone() { |
- assert(!_isPaused); |
- assert(_isClosed); |
- _setComplete(); |
- if (!_hasListener) return; |
- _forEachSubscriber((subscriber) { |
- _cancel(subscriber); |
- try { |
- subscriber._sendDone(); |
- } catch (e, s) { |
- _throwDelayed(e, s); |
- } |
- }); |
- assert(!_hasListener); |
+ void _close() { |
+ assert(!_isClosed); |
+ if (_cancelled) return; |
+ _state |= _STATE_CLOSED; |
+ if (_canFire) { |
+ _sendDone(); |
+ } else { |
+ _addPending(const _DelayedDone()); |
+ } |
} |
-} |
- |
-// ------------------------------------------------------------------- |
-// Default implementation of a stream with a single subscriber. |
-// ------------------------------------------------------------------- |
-/** |
- * Default implementation of stream capable of sending events to one subscriber. |
- * |
- * Any class needing to implement [Stream] can either directly extend this |
- * class, or extend [Stream] and delegate the subscribe method to an instance |
- * of this class. |
- * |
- * The only public methods are those of [Stream], so instances of |
- * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
- * internal functionality. |
- * |
- * The [StreamController] is a public facing version of this class, with |
- * some methods made public. |
- * |
- * The user interface of [_SingleStreamImpl] are the following methods: |
- * * [_add]: Add a data event to the stream. |
- * * [_addError]: Add an error event to the stream. |
- * * [_close]: Request to close the stream. |
- * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
- * when losing the last subscriber. |
- * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
- * * [_hasListener]: Test whether there are currently any subscribers. |
- * * [_isInputPaused]: Test whether the stream is currently paused. |
- * The user should not add new events while the stream is paused, but if it |
- * happens anyway, the stream will enqueue the events just as when new events |
- * arrive while still firing an old event. |
- */ |
-class _SingleStreamImpl<T> extends _StreamImpl<T> { |
- _StreamListener _subscriber = null; |
- |
- /** Whether there is currently a subscriber on this [Stream]. */ |
- bool get _hasListener => _subscriber != null; |
- |
- // ------------------------------------------------------------------- |
- // Internal implementation. |
- _SingleStreamImpl() { |
- // Start out paused. |
- _updatePauseCount(1); |
+ // Hooks called when the input is paused, unpaused or cancelled. |
+ // These must not throw. If overwritten to call user code, include suitable |
+ // try/catch wrapping and send any errors to [_throwDelayed]. |
+ void _onPause() { |
+ assert(_isInputPaused); |
} |
- /** |
- * Create the new subscription object. |
- */ |
- _StreamSubscriptionImpl<T> _createSubscription( |
- void onData(T data), |
- void onError(error), |
- void onDone(), |
- bool cancelOnError) { |
- return new _StreamSubscriptionImpl<T>( |
- this, onData, onError, onDone, cancelOnError); |
+ void _onResume() { |
+ assert(!_isInputPaused); |
} |
- void _addListener(_StreamListener subscription) { |
- assert(!_isComplete); |
- if (_hasListener) { |
- throw new StateError("Stream already has subscriber."); |
- } |
- assert(_pauseCount == 1); |
- _updatePauseCount(-1); |
- _subscriber = subscription; |
- subscription._setSubscribed(0); |
- if (_isInactive) { |
- _checkCallbacks(false, true); |
- if (!_isPaused && _hasPendingEvent) { |
- _schedulePendingEvents(); |
- } |
- } |
+ void _onCancel() { |
+ assert(_cancelled); |
} |
+ // Handle pending events. |
+ |
/** |
- * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
+ * Add a pending event. |
* |
- * This method is called from [_StreamSubscriptionImpl.cancel]. |
+ * If the subscription is not paused, this also schedules a firing |
+ * of pending events later (if necessary). |
*/ |
- void _cancel(_StreamListener subscriber) { |
- assert(identical(subscriber._source, this)); |
- // We allow unsubscribing the currently firing subscription during |
- // the event firing, because it is indistinguishable from delaying it since |
- // that event has already received the event. |
- if (!identical(_subscriber, subscriber)) { |
- // You may unsubscribe more than once, only the first one counts. |
- return; |
- } |
- _subscriber = null; |
- // Unsubscribing a paused subscription also cancels its pauses. |
- int resumeCount = subscriber._setUnsubscribed(); |
- // Keep being paused while there is no subscriber and the stream is not |
- // complete. |
- _updatePauseCount(_isComplete ? -resumeCount : -resumeCount + 1); |
- if (_isInactive) { |
- _checkCallbacks(true, resumeCount > 0); |
- if (!_isPaused && _hasPendingEvent) { |
- _schedulePendingEvents(); |
+ void _addPending(_DelayedEvent event) { |
+ _StreamImplEvents pending = _pending; |
+ if (_pending == null) pending = _pending = new _StreamImplEvents(); |
+ pending.add(event); |
+ if (!_hasPending) { |
+ _state |= _STATE_HAS_PENDING; |
+ if (!_isPaused) { |
+ _pending.schedule(this); |
} |
} |
} |
- void _forEachSubscriber( |
- void action(_StreamListener<T> subscription)) { |
+ /* _EventDispatch interface. */ |
+ |
+ void _sendData(T data) { |
+ assert(!_cancelled); |
assert(!_isPaused); |
+ assert(!_inCallback); |
bool wasInputPaused = _isInputPaused; |
- _StreamListener subscription = _subscriber; |
- assert(subscription != null); |
- _startFiring(); |
- action(subscription); |
- _endFiring(wasInputPaused); |
+ _state |= _STATE_IN_CALLBACK; |
+ try { |
floitsch
2013/05/22 16:26:29
move the `try`s into a separate function?
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Rather not. It would only be used for _send{Data,E
|
+ _onData(data); |
+ } catch (e, s) { |
+ _throwDelayed(e, s); |
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
+ _checkState(wasInputPaused); |
} |
-} |
-// ------------------------------------------------------------------- |
-// Default implementation of a stream with subscribers. |
-// ------------------------------------------------------------------- |
- |
-/** |
- * Default implementation of stream capable of sending events to subscribers. |
- * |
- * Any class needing to implement [Stream] can either directly extend this |
- * class, or extend [Stream] and delegate the subscribe method to an instance |
- * of this class. |
- * |
- * The only public methods are those of [Stream], so instances of |
- * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
- * internal functionality. |
- * |
- * The [StreamController] is a public facing version of this class, with |
- * some methods made public. |
- * |
- * The user interface of [_MultiStreamImpl] are the following methods: |
- * * [_add]: Add a data event to the stream. |
- * * [_addError]: Add an error event to the stream. |
- * * [_close]: Request to close the stream. |
- * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
- * when losing the last subscriber. |
- * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
- * * [_hasListener]: Test whether there are currently any subscribers. |
- * * [_isPaused]: Test whether the stream is currently paused. |
- * The user should not add new events while the stream is paused, but if it |
- * happens anyway, the stream will enqueue the events just as when new events |
- * arrive while still firing an old event. |
- */ |
-class _MultiStreamImpl<T> extends _StreamImpl<T> |
- implements _InternalLinkList { |
- // Link list implementation (mixin when possible). |
- _InternalLink _nextLink; |
- _InternalLink _previousLink; |
- |
- _MultiStreamImpl() { |
- _nextLink = _previousLink = this; |
+ void _sendError(var error) { |
+ assert(!_cancelled); |
+ assert(!_isPaused); |
+ assert(!_inCallback); |
+ bool wasInputPaused = _isInputPaused; |
+ _state |= _STATE_IN_CALLBACK; |
+ try { |
+ _onError(error); |
+ } catch (e, s) { |
+ _throwDelayed(e, s); |
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
+ if (_cancelOnError) { |
+ _cancel(); |
+ } |
+ _checkState(wasInputPaused); |
} |
- bool get isBroadcast => true; |
- |
- Stream<T> asBroadcastStream() => this; |
- |
- // ------------------------------------------------------------------ |
- // Helper functions that can be overridden in subclasses. |
- |
- /** Whether there are currently any subscribers on this [Stream]. */ |
- bool get _hasListener => !_InternalLinkList.isEmpty(this); |
- |
- /** |
- * Create the new subscription object. |
- */ |
- _StreamListener<T> _createSubscription( |
- void onData(T data), |
- void onError(error), |
- void onDone(), |
- bool cancelOnError) { |
- return new _StreamSubscriptionImpl<T>( |
- this, onData, onError, onDone, cancelOnError); |
+ void _sendDone() { |
+ assert(!_cancelled); |
+ assert(!_isPaused); |
+ assert(!_inCallback); |
+ _state |= (_STATE_CANCELLED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
+ try { |
+ _onDone(); |
+ } catch (e, s) { |
+ _throwDelayed(e, s); |
+ } |
+ try { |
+ _onCancel(); // No checkState after cancel, it is always the last event. |
+ } catch (e, s) { |
+ print("BAD THROW 5: \n$s"); |
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
} |
- // ------------------------------------------------------------------- |
- // Internal implementation. |
- |
- /** |
- * Iterate over all current subscribers and perform an action on each. |
- * |
- * The set of subscribers cannot be modified during this iteration. |
- * All attempts to add or unsubscribe subscribers will be delayed until |
- * after the iteration is complete. |
- * |
- * The [action] must not throw, or the controller will be left in an |
- * invalid state. |
+ /** |
+ * Call a hook function. |
* |
- * This method must not be called while [isFiring] is true. |
+ * The call is properly wrapped in code to avoid other callbacks |
+ * during the call, and it checks for state changes after the call |
+ * that should cause further callbacks. |
*/ |
- void _forEachSubscriber( |
- void action(_StreamListener<T> subscription)) { |
- assert(!_isFiring); |
- if (!_hasListener) return; |
+ void _guardCallback(callback) { |
+ assert(!_inCallback); |
bool wasInputPaused = _isInputPaused; |
- _startFiring(); |
- _InternalLink cursor = this._nextLink; |
- while (!identical(cursor, this)) { |
- _StreamListener<T> current = cursor; |
- if (current._needsEvent(_currentEventIdBit)) { |
- action(current); |
- // Marks as having received the event. |
- current._toggleEventReceived(); |
- } |
- cursor = current._nextLink; |
- if (current._isPendingUnsubscribe) { |
- _removeListener(current); |
- } |
- } |
- _endFiring(wasInputPaused); |
+ _state |= _STATE_IN_CALLBACK; |
+ try { |
+ callback(); |
+ } catch (e, s) { |
+ print("BAD THROW 2: \n$s"); |
} |
- |
- void _addListener(_StreamListener listener) { |
- listener._setSubscribed(_currentEventIdBit); |
- bool hadListener = _hasListener; |
- _InternalLinkList.add(this, listener); |
- if (!hadListener && _isInactive) { |
- _checkCallbacks(false, false); |
- if (!_isPaused && _hasPendingEvent) { |
- _schedulePendingEvents(); |
- } |
- } |
+ _state &= ~_STATE_IN_CALLBACK; |
+ _checkState(wasInputPaused); |
} |
/** |
- * Handle a cancel requested from a [_StreamListener]. |
+ * Check if the input needs to be informed of state changes. |
* |
- * This method is called from [_StreamListener.cancel]. |
- * |
- * If an event is currently firing, the cancel is delayed |
- * until after the subscribers have received the event. |
+ * State changes are pausing, resuming and cancelling. |
+ * After cancelling, no further callbacks will happen. |
floitsch
2013/05/22 16:26:29
separate with more new lines.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ * The cancel callback is called after a user cancel, or after |
+ * the final done event is sent. |
*/ |
- void _cancel(_StreamListener listener) { |
- assert(identical(listener._source, this)); |
- if (_InternalLink.isUnlinked(listener)) { |
- // You may unsubscribe more than once, only the first one counts. |
- return; |
+ void _checkState(bool wasInputPaused) { |
+ assert(!_inCallback); |
+ if (_hasPending && _pending.isEmpty) { |
+ _state &= ~_STATE_HAS_PENDING; |
+ if (_isInputPaused && _mayResumeInput) { |
+ _state &= ~_STATE_INPUT_PAUSED; |
+ } |
} |
- if (_isFiring) { |
- if (listener._needsEvent(_currentEventIdBit)) { |
- assert(listener._isSubscribed); |
- listener._setPendingUnsubscribe(_currentEventIdBit); |
- } else { |
- // The listener has been notified of the event (or don't need to, |
- // if it's still pending subscription) so it's safe to remove it. |
- _removeListener(listener); |
+ while (true) { |
floitsch
2013/05/22 16:26:29
Add comment why we need a loop.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Ok.
|
+ if (_cancelled) { |
+ try { |
+ _onCancel(); |
+ } catch (e, s) { |
+ print("BAD THROW 3: \n$s"); |
+ } |
+ return; |
} |
- // Pause and subscription state changes are reported when we end |
- // firing. |
- } else { |
- bool wasInputPaused = _isInputPaused; |
- _removeListener(listener); |
- if (_isInactive) { |
- _checkCallbacks(true, wasInputPaused); |
- if (!_isPaused && _hasPendingEvent) { |
- _schedulePendingEvents(); |
- } |
+ bool isInputPaused = _isInputPaused; |
+ if (wasInputPaused == isInputPaused) break; |
+ _state ^= _STATE_IN_CALLBACK; |
+ try { |
+ if (isInputPaused) { |
+ _onPause(); |
+ } else { |
+ _onResume(); |
} |
+ } catch (e, s) { |
+ print("BAD THROW 4: \n$s"); |
+ } |
+ _state &= ~_STATE_IN_CALLBACK; |
+ wasInputPaused = isInputPaused; |
+ } |
+ if (_hasPending && !_isPaused) { |
+ _pending.schedule(this); |
} |
} |
+} |
- /** |
- * Removes a listener from this stream and cancels its pauses. |
- * |
- * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
- * or [_callOnPauseStateChange]. |
- */ |
- void _removeListener(_StreamListener listener) { |
- int pauseCount = listener._setUnsubscribed(); |
- _InternalLinkList.remove(listener); |
- if (pauseCount > 0) { |
- _updatePauseCount(-pauseCount); |
- if (!_isPaused && _hasPendingEvent) { |
- _state |= _STREAM_PENDING_RESUME; |
- } |
- } |
+// ------------------------------------------------------------------- |
+// Common base class for single and multi-subscription streams. |
+// ------------------------------------------------------------------- |
+abstract class _StreamImpl<T> extends Stream<T> { |
+ // ------------------------------------------------------------------ |
+ // Stream interface. |
+ |
+ StreamSubscription<T> listen(void onData(T data), |
+ { void onError(error), |
+ void onDone(), |
+ bool cancelOnError }) { |
+ if (onData == null) onData = _nullDataHandler; |
+ if (onError == null) onError = _nullErrorHandler; |
+ if (onDone == null) onDone = _nullDoneHandler; |
+ cancelOnError = identical(true, cancelOnError); |
+ StreamSubscription subscription = |
+ _createSubscription(onData, onError, onDone, cancelOnError); |
+ _onListen(subscription); |
+ return subscription; |
+ } |
+ |
+ // ------------------------------------------------------------------- |
+ /** Create a subscription object. Called by [subcribe]. */ |
+ _BufferingStreamSubscription<T> _createSubscription( |
+ void onData(T data), |
+ void onError(error), |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _BufferingStreamSubscription<T>( |
+ onData, onError, onDone, cancelOnError, null); |
} |
+ |
+ /** Hook called when the subscription has been created. */ |
+ void _onListen(StreamSubscription subscription) {} |
} |
+typedef _PendingEvents _EventGenerator(); |
/** Stream that generates its own events. */ |
-class _GeneratedSingleStreamImpl<T> extends _SingleStreamImpl<T> { |
+class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
+ final _EventGenerator _pending; |
/** |
- * Initializes the stream to have only the events provided by [events]. |
+ * Initializes the stream to have only the events provided by a |
+ * [_PendingEvents]. |
* |
- * A [_PendingEvents] implementation provides events that are handled |
- * by calling [_PendingEvents.handleNext] with the [_StreamImpl]. |
+ * A new [_PendingEvents] must be generated for each listen. |
*/ |
- _GeneratedSingleStreamImpl(_PendingEvents events) { |
- _pendingEvents = events; |
- _setClosed(); // Closed for input since all events are already pending. |
- } |
- |
- void _add(T value) { |
- throw new UnsupportedError("Cannot inject events into generated stream"); |
- } |
- |
- void _addError(value) { |
- throw new UnsupportedError("Cannot inject events into generated stream"); |
- } |
+ _GeneratedStreamImpl(this._pending); |
- void _close() { |
- throw new UnsupportedError("Cannot inject events into generated stream"); |
+ StreamSubscription _createSubscription(void onData(T data), |
+ void onError(Object error), |
+ void onDone(), |
+ bool cancelOnError) { |
+ return new _BufferingStreamSubscription( |
+ onData, onError, onDone, cancelOnError, _pending()); |
} |
} |
/** Pending events object that gets its events from an [Iterable]. */ |
class _IterablePendingEvents<T> extends _PendingEvents { |
+ // The stream has been cancelled by an error, but hasn't sent a final |
+ // "Done" event yet. |
+ static const int _STATE_CANCELLED = 1; |
+ // The stream is completely done. |
+ static const int _STATE_CLOSED = 3; |
+ |
final Iterator<T> _iterator; |
+ int _iterationState = 0; |
+ |
+ _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
+ |
/** |
* Whether there are no more events to be sent. |
* |
* This starts out as [:false:] since there is always at least |
* a 'done' event to be sent. |
*/ |
- bool _isDone = false; |
- |
- _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
+ bool get _isDone => _iterationState == _STATE_CLOSED; |
+ bool get _isCancelled => _iterationState == _STATE_CANCELLED; |
bool get isEmpty => _isDone; |
- void handleNext(_StreamImpl<T> stream) { |
- if (_isDone) throw new StateError("No events pending."); |
+ void handleNext(_EventDispatch dispatch) { |
+ if (_isCancelled) { |
+ _iterationState = _STATE_CLOSED; |
+ dispatch._sendDone(); |
+ return; |
+ } |
+ if (_isDone) { |
+ throw new StateError("No events pending."); |
+ } |
+ bool isDone; |
try { |
- _isDone = !_iterator.moveNext(); |
- if (!_isDone) { |
- stream._sendData(_iterator.current); |
- } else { |
- stream._sendDone(); |
- } |
+ isDone = !_iterator.moveNext(); |
} catch (e, s) { |
- stream._sendError(_asyncError(e, s)); |
- stream._sendDone(); |
- _isDone = true; |
+ _iterationState = _STATE_CANCELLED; // Will send a single done after this. |
floitsch
2013/05/22 16:26:29
long line.
I don't think it makes sense to send a
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Good point. Some listeners might be left hanging,
|
+ dispatch._sendError(_asyncError(e, s)); |
+ return; |
+ } |
+ if (!isDone) { |
+ dispatch._sendData(_iterator.current); |
+ } else { |
+ _iterationState = _STATE_CLOSED; |
+ dispatch._sendDone(); |
} |
} |
-} |
- |
- |
-/** |
- * The subscription class that the [StreamController] uses. |
- * |
- * The [_StreamImpl.createSubscription] method should |
- * create an object of this type, or another subclass of [_StreamListener]. |
- * A subclass of [_StreamImpl] can specify which subclass |
- * of [_StreamSubscriptionImpl] it uses by overriding |
- * [_StreamImpl.createSubscription]. |
- * |
- * The subscription is in one of three states: |
- * * Subscribed. |
- * * Paused-and-subscribed. |
- * * Unsubscribed. |
- * Unsubscribing also resumes any pauses started by the subscription. |
- */ |
-class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
- implements StreamSubscription<T> { |
- final bool _cancelOnError; |
- // TODO(ahe): Restore type when feature is implemented in dart2js |
- // checked mode. http://dartbug.com/7733 |
- var /* _DataHandler<T> */ _onData; |
- _ErrorHandler _onError; |
- _DoneHandler _onDone; |
- _StreamSubscriptionImpl(_StreamImpl source, |
- this._onData, |
- this._onError, |
- this._onDone, |
- this._cancelOnError) : super(source); |
- |
- void onData(void handleData(T event)) { |
- if (handleData == null) handleData = _nullDataHandler; |
- _onData = handleData; |
- } |
- |
- void onError(void handleError(error)) { |
- if (handleError == null) handleError = _nullErrorHandler; |
- _onError = handleError; |
- } |
- |
- void onDone(void handleDone()) { |
- if (handleDone == null) handleDone = _nullDoneHandler; |
- _onDone = handleDone; |
- } |
- |
- void _sendData(T data) { |
- _onData(data); |
- } |
- |
- void _sendError(error) { |
- _onError(error); |
- if (_cancelOnError) _source._cancel(this); |
- } |
- |
- void _sendDone() { |
- _onDone(); |
- } |
- |
- void cancel() { |
- if (!_isSubscribed) return; |
- _source._cancel(this); |
- } |
- |
- void pause([Future resumeSignal]) { |
- if (!_isSubscribed) return; |
- _source._pause(this, resumeSignal); |
- } |
- |
- void resume() { |
- if (!_isSubscribed || !isPaused) return; |
- _source._resume(this, false); |
- } |
- |
- Future asFuture([var futureValue]) { |
- _FutureImpl<T> result = new _FutureImpl<T>(); |
- // Overwrite the onDone and onError handlers. |
- onDone(() { result._setValue(futureValue); }); |
- onError((error) { |
- cancel(); |
- result._setError(error); |
- }); |
- |
- return result; |
+ void clear() { |
+ if (isScheduled) cancelSchedule(); |
+ _state = _STATE_CLOSED; |
} |
} |
+ |
// Internal helpers. |
// Types of the different handlers on a stream. Types used to type fields. |
@@ -998,20 +559,20 @@ void _nullErrorHandler(error) { |
void _nullDoneHandler() {} |
-/** A delayed event on a stream implementation. */ |
+/** A delayed event on a buffering stream subscription. */ |
abstract class _DelayedEvent { |
/** Added as a linked list on the [StreamController]. */ |
_DelayedEvent next; |
/** Execute the delayed event on the [StreamController]. */ |
- void perform(_StreamImpl stream); |
+ void perform(_EventDispatch dispatch); |
} |
/** A delayed data event. */ |
class _DelayedData<T> extends _DelayedEvent{ |
final T value; |
_DelayedData(this.value); |
- void perform(_StreamImpl<T> stream) { |
- stream._sendData(value); |
+ void perform(_EventDispatch<T> dispatch) { |
+ dispatch._sendData(value); |
} |
} |
@@ -1019,16 +580,16 @@ class _DelayedData<T> extends _DelayedEvent{ |
class _DelayedError extends _DelayedEvent { |
final error; |
_DelayedError(this.error); |
- void perform(_StreamImpl stream) { |
- stream._sendError(error); |
+ void perform(_EventDispatch dispatch) { |
+ dispatch._sendError(error); |
} |
} |
/** A delayed done event. */ |
class _DelayedDone implements _DelayedEvent { |
const _DelayedDone(); |
- void perform(_StreamImpl stream) { |
- stream._sendDone(); |
+ void perform(_EventDispatch dispatch) { |
+ dispatch._sendDone(); |
} |
_DelayedEvent get next => null; |
@@ -1118,90 +679,16 @@ abstract class _InternalLinkList extends _InternalLink { |
} |
} |
-/** Abstract type for an internal interface for sending events. */ |
-abstract class _EventOutputSink<T> { |
- _sendData(T data); |
- _sendError(error); |
- _sendDone(); |
-} |
- |
-abstract class _StreamListener<T> extends _InternalLink |
- implements _EventOutputSink<T> { |
- final _StreamImpl _source; |
- int _state = _LISTENER_UNSUBSCRIBED; |
- |
- _StreamListener(this._source); |
- |
- bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
- |
- bool get _isPendingUnsubscribe => |
- (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
- |
- bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; |
- |
- /** |
- * Whether the listener still needs to receive the currently firing event. |
- * |
- * The currently firing event is identified by a single bit, which alternates |
- * between events. The [_state] contains the previously sent event's bit in |
- * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener |
- * still need the current event. |
- */ |
- bool _needsEvent(int currentEventIdBit) { |
- int lastEventIdBit = |
- (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; |
- return lastEventIdBit != currentEventIdBit; |
- } |
- |
- /// If a subscriber's "firing bit" doesn't match the stream's firing bit, |
- /// we are currently firing an event and the subscriber still need to receive |
- /// the event. |
- void _toggleEventReceived() { |
- _state ^= _LISTENER_EVENT_ID; |
- } |
- |
- void _setSubscribed(int eventIdBit) { |
- assert(eventIdBit == 0 || eventIdBit == 1); |
- _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); |
- } |
- |
- void _setPendingUnsubscribe(int currentEventIdBit) { |
- assert(_isSubscribed); |
- // Sets the pending unsubscribe, and ensures that the listener |
- // won't get the current event. |
- _state |= _LISTENER_PENDING_UNSUBSCRIBE | _LISTENER_EVENT_ID; |
- _state ^= (1 ^ currentEventIdBit) << _LISTENER_EVENT_ID_SHIFT; |
- assert(!_needsEvent(currentEventIdBit)); |
- } |
- |
- /** |
- * Marks the listener as unsubscibed. |
- * |
- * Returns the number of unresumed pauses for the listener. |
- */ |
- int _setUnsubscribed() { |
- assert(_isSubscribed); |
- int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; |
- _state = _LISTENER_UNSUBSCRIBED; |
- return timesPaused; |
- } |
- |
- void _incrementPauseCount() { |
- _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
- } |
- |
- void _decrementPauseCount() { |
- assert(isPaused); |
- _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
- } |
- |
- _sendData(T data); |
- _sendError(error); |
- _sendDone(); |
-} |
- |
/** Superclass for provider of pending events. */ |
abstract class _PendingEvents { |
+ // No async event has been scheduled. |
+ static const int _STATE_UNSCHEDULED = 0; |
+ // An async event has been scheduled to run a function. |
+ static const int _STATE_SCHEDULED = 1; |
+ // An async event has been scheduled, but it will do nothing when it runs. |
+ // Async events can't be preempted. |
+ static const int _STATE_CANCELLED = 3; |
+ |
/** |
* Timer set when pending events are scheduled for execution. |
floitsch
2013/05/22 16:26:29
Update comments.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
* |
@@ -1209,27 +696,44 @@ abstract class _PendingEvents { |
* is stored here. If pending events are executed earlier than that, e.g., |
* due to a second event in the current cycle, the timer is canceled again. |
*/ |
- Timer scheduleTimer = null; |
+ int _state = _STATE_UNSCHEDULED; |
bool get isEmpty; |
- bool get isScheduled => scheduleTimer != null; |
+ bool get isScheduled => _state == _STATE_SCHEDULED; |
+ bool get _eventScheduled => _state >= _STATE_SCHEDULED; |
- void schedule(_StreamImpl stream) { |
+ /** |
+ * Schedule an event to run later. |
+ * |
+ * If called more than once, it should be called with the same dispatch as |
+ * argument each time. It may reuse an earlier argument in some cases. |
+ */ |
+ void schedule(_EventDispatch dispatch) { |
if (isScheduled) return; |
- scheduleTimer = new Timer(Duration.ZERO, () { |
- scheduleTimer = null; |
- stream._handlePendingEvents(); |
+ assert(!isEmpty); |
+ if (_eventScheduled) { |
+ assert(_state == _STATE_CANCELLED); |
+ _state = _STATE_SCHEDULED; |
+ return; |
+ } |
+ runAsync(() { |
+ int oldState = _state; |
+ _state = _STATE_UNSCHEDULED; |
+ if (oldState == _STATE_CANCELLED) return; |
+ handleNext(dispatch); |
}); |
+ _state = _STATE_SCHEDULED; |
} |
void cancelSchedule() { |
- assert(isScheduled); |
- scheduleTimer.cancel(); |
- scheduleTimer = null; |
+ if (isScheduled) _state = _STATE_CANCELLED; |
} |
- void handleNext(_StreamImpl stream); |
+ void handleNext(_EventDispatch dispatch); |
+ |
+ /** Throw away any pending events and cancel scheduled events. */ |
+ void clear(); |
} |
@@ -1242,8 +746,6 @@ class _StreamImplEvents extends _PendingEvents { |
bool get isEmpty => lastPendingEvent == null; |
- bool get isScheduled => scheduleTimer != null; |
- |
void add(_DelayedEvent event) { |
if (lastPendingEvent == null) { |
firstPendingEvent = lastPendingEvent = event; |
@@ -1252,122 +754,353 @@ class _StreamImplEvents extends _PendingEvents { |
} |
} |
- void handleNext(_StreamImpl stream) { |
+ void handleNext(_EventDispatch dispatch) { |
assert(!isScheduled); |
_DelayedEvent event = firstPendingEvent; |
firstPendingEvent = event.next; |
if (firstPendingEvent == null) { |
lastPendingEvent = null; |
} |
- event.perform(stream); |
+ event.perform(dispatch); |
+ } |
+ |
+ void clear() { |
+ if (isScheduled) cancelSchedule(); |
+ firstPendingEvent = lastPendingEvent = null; |
} |
} |
+class _MultiplexerLinkedList { |
+ _MultiplexerLinkedList _next; |
+ _MultiplexerLinkedList _previous; |
-class _DoneSubscription<T> implements StreamSubscription<T> { |
- _DoneHandler _handler; |
- Timer _timer; |
- int _pauseCount = 0; |
+ void _unlink() { |
+ _previous._next = _next; |
+ _next._previous = _previous; |
+ _next = _previous = this; |
+ } |
- _DoneSubscription(this._handler) { |
- _delayDone(); |
+ void _insertBefore(_MultiplexerLinkedList newNext) { |
+ _MultiplexerLinkedList newPrevious = newNext._previous; |
+ newPrevious._next = this; |
+ newNext._previous = _previous; |
+ _previous._next = newNext; |
+ _previous = newPrevious; |
} |
+} |
- void _delayDone() { |
- assert(_timer == null && _pauseCount == 0); |
- _timer = new Timer(Duration.ZERO, () { |
- if (_handler != null) _handler(); |
- }); |
+// TODO(lrn): Change "implements" to "with" when automatic mixin constructors |
+// are implemented. |
+class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> |
floitsch
2013/05/22 16:26:29
Add comment what this class does.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ implements _MultiplexerLinkedList { |
+ static const int _STATE_NOT_LISTENING = 0; |
+ // Bit that alternates between event firings. If bit matches the one currently |
+ // firing, the subscription will not be notified. |
+ static const int _STATE_EVENT_ID_BIT = 1; |
+ static const int _STATE_LISTENING = 2; |
+ static const int _STATE_IS_FIRING = 4; |
+ static const int _STATE_REMOVE_AFTER_FIRING = 8; |
+ |
+ // Firing state. |
+ int _multiplexState; |
+ |
+ _SingleStreamMultiplexer _source; |
+ |
+ _MultiplexerSubscription(this._source, |
+ void onData(T data), |
+ void onError(Object error), |
+ void onDone(), |
+ bool cancelOnError, |
+ int nextEventId) |
+ : _multiplexState = _STATE_LISTENING | nextEventId, |
+ super(onData, onError, onDone, cancelOnError, null) { |
+ _next = _previous = this; |
+ } |
+ |
+ // Mixin workaround. |
+ _MultiplexerLinkedList _next; |
+ _MultiplexerLinkedList _previous; |
+ |
+ void _unlink() { |
+ _previous._next = _next; |
+ _next._previous = _previous; |
+ _next = _previous = this; |
+ } |
+ |
+ void _insertBefore(_MultiplexerLinkedList newNext) { |
+ _MultiplexerLinkedList newPrevious = newNext._previous; |
+ newPrevious._next = this; |
+ newNext._previous = _previous; |
+ _previous._next = newNext; |
+ _previous = newPrevious; |
+ } |
+ // End mixin. |
+ |
+ bool get _isListening => _multiplexState >= _STATE_LISTENING; |
+ bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; |
+ bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; |
+ int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; |
+ |
+ void _setRemoveAfterFiring() { |
+ assert(_isFiring); |
+ _multiplexState |= _STATE_REMOVE_AFTER_FIRING; |
+ } |
+ |
+ void _startFiring() { |
+ assert(!_isFiring); |
+ _multiplexState |= _STATE_IS_FIRING; |
} |
- bool get _isComplete => _timer == null && _pauseCount == 0; |
+ /// Marks listener as no longer firing, and toggles its event id. |
+ void _endFiring() { |
+ assert(_isFiring); |
+ _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); |
+ } |
- void onData(void handleAction(T value)) {} |
+ void _setNotListening() { |
+ assert(_isListening); |
+ _multiplexState = _STATE_NOT_LISTENING; |
+ } |
- void onError(void handleError(error)) {} |
+ void _onCancel() { |
+ assert(_isListening); |
+ _source._removeListener(this); |
+ } |
+} |
- void onDone(void handleDone()) { |
- _handler = handleDone; |
+// TODO(lrn): change "implements" to "with" when the VM supports it. |
+class _SingleStreamMultiplexer<T> extends Stream<T> |
floitsch
2013/05/22 16:26:29
Add comment.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ implements _MultiplexerLinkedList, |
+ _EventDispatch<T> { |
+ final Stream<T> _source; |
+ StreamSubscription<T> _subscription; |
+ // Alternates between zero and one for each event fired. |
+ // Listeners are initialized with the next event's id, and will |
+ // only be notified if they match the event being fired. |
+ // That way listeners added during event firing will not receive |
+ // the current event. |
+ int _eventId = 0; |
+ |
+ bool _isFiring = false; |
+ |
+ // Remember events added while firing. |
+ _StreamImplEvents _pending; |
+ |
+ _SingleStreamMultiplexer(this._source) { |
+ _next = _previous = this; |
+ } |
+ |
+ bool get _hasPending => _pending != null && !_pending.isEmpty; |
+ |
+ // Should be mixin. |
+ _MultiplexerLinkedList _next; |
+ _MultiplexerLinkedList _previous; |
+ |
+ void _unlink() { |
+ _previous._next = _next; |
+ _next._previous = _previous; |
+ _next = _previous = this; |
+ } |
+ |
+ void _insertBefore(_MultiplexerLinkedList newNext) { |
+ _MultiplexerLinkedList newPrevious = newNext._previous; |
+ newPrevious._next = this; |
+ newNext._previous = _previous; |
+ _previous._next = newNext; |
+ _previous = newPrevious; |
} |
+ // End of mixin. |
- void pause([Future signal]) { |
- if (_isComplete) return; |
- if (_timer != null) { |
- _timer.cancel(); |
- _timer = null; |
+ StreamSubscription<T> listen(void onData(T data), |
+ { void onError(Object error), |
+ void onDone(), |
+ bool cancelOnError }) { |
+ if (onData == null) onData = _nullDataHandler; |
+ if (onError == null) onError = _nullErrorHandler; |
+ if (onDone == null) onDone = _nullDoneHandler; |
+ cancelOnError = identical(true, cancelOnError); |
+ _MultiplexerSubscription subscription = |
+ new _MultiplexerSubscription(this, onData, onError, onDone, |
+ cancelOnError, _eventId); |
+ if (_subscription == null) { |
+ _subscription = _source.listen(_add, onError: _addError, onDone: _close); |
} |
- _pauseCount++; |
- if (signal != null) signal.whenComplete(resume); |
+ subscription._insertBefore(this); |
+ return subscription; |
} |
- void resume() { |
- if (_isComplete) return; |
- if (_pauseCount == 0) return; |
- _pauseCount--; |
- if (_pauseCount == 0) { |
- _delayDone(); |
+ /** Called by [_MultiplexerSubscription.remove]. */ |
+ void _removeListener(_MultiplexerSubscription listener) { |
+ if (listener._isFiring) { |
+ listener._setRemoveAfterFiring(); |
+ } else { |
+ _unlinkListener(listener); |
} |
} |
- bool get isPaused => _pauseCount > 0; |
+ /** Remove a listener and close the subscription after the last one. */ |
+ void _unlinkListener(_MultiplexerSubscription listener) { |
+ listener._setNotListening(); |
+ listener._unlink(); |
+ if (identical(_next, this)) { |
+ // Last listener removed. |
+ _cancel(); |
+ } |
+ } |
- void cancel() { |
- if (_isComplete) return; |
- if (_timer != null) { |
- _timer.cancel(); |
- _timer = null; |
+ void _cancel() { |
+ StreamSubscription subscription = _subscription; |
+ _subscription = null; |
+ subscription.cancel(); |
+ if (_pending != null) _pending.cancelSchedule(); |
+ } |
+ |
+ void _forEachListener(void action(_MultiplexerSubscription listener)) { |
+ int eventId = _eventId; |
+ _eventId ^= 1; |
+ _isFiring = true; |
+ _MultiplexerLinkedList entry = _next; |
+ // Call each listener in order. A listener can be removed during any |
+ // other listener's event. During its own event it will only be marked |
+ // as "to be removed", and it will be handled after the event is done. |
+ while (!identical(entry, this)) { |
+ _MultiplexerSubscription listener = entry; |
+ if (listener._eventId == eventId) { |
+ listener._startFiring(); |
+ action(listener); |
+ listener._endFiring(); // Also toggles the event id. |
+ } |
+ entry = listener._next; |
+ if (listener._removeAfterFiring) { |
+ _unlinkListener(listener); |
+ } |
} |
- _pauseCount = 0; |
+ _isFiring = false; |
} |
- Future asFuture([var futureValue]) { |
- // TODO(floitsch): share more code. |
- _FutureImpl<T> result = new _FutureImpl<T>(); |
+ void _add(T data) { |
floitsch
2013/05/22 16:26:29
Why is this not handled by the bufferingStreamSubs
Lasse Reichstein Nielsen
2013/05/24 06:02:49
This isn't a StreamSubscription - it's a Stream, m
|
+ if (_isFiring || _hasPending) { |
+ _StreamImplEvents pending = _pending; |
+ if (pending == null) pending = _pending = new _StreamImplEvents(); |
+ pending.add(new _DelayedData(data)); |
+ } else { |
+ _sendData(data); |
+ } |
+ } |
- // Overwrite the onDone and onError handlers. |
- onDone(() { result._setValue(futureValue); }); |
- onError((error) { |
- cancel(); |
- result._setError(error); |
+ void _addError(Object error) { |
+ if (_isFiring || _hasPending) { |
floitsch
2013/05/22 16:26:29
ditto.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ditto too.
|
+ _StreamImplEvents pending = _pending; |
+ if (pending == null) pending = _pending = new _StreamImplEvents(); |
+ pending.add(new _DelayedError(error)); |
+ } else { |
+ _sendError(error); |
+ } |
+ } |
+ |
+ void _close() { |
+ if (_isFiring || _hasPending) { |
floitsch
2013/05/22 16:26:29
ditto.
|
+ _StreamImplEvents pending = _pending; |
+ if (pending == null) pending = _pending = new _StreamImplEvents(); |
+ pending.add(const _DelayedDone()); |
+ } else { |
+ _sendDone(); |
+ } |
+ } |
+ |
+ void _sendData(T data) { |
+ _forEachListener((_MultiplexerSubscription listener) { |
+ listener._add(data); |
}); |
+ if (_hasPending) { |
+ _pending.schedule(this); |
+ } |
+ } |
- return result; |
+ void _sendError(Object error) { |
+ _forEachListener((_MultiplexerSubscription listener) { |
+ listener._addError(error); |
+ }); |
+ if (_hasPending) { |
+ _pending.schedule(this); |
+ } |
} |
-} |
-class _SingleStreamMultiplexer<T> extends _MultiStreamImpl<T> { |
- final Stream<T> _source; |
- StreamSubscription<T> _subscription; |
+ void _sendDone() { |
+ _forEachListener((_MultiplexerSubscription listener) { |
+ listener._setRemoveAfterFiring(); |
+ listener._close(); |
+ }); |
+ } |
+} |
- _SingleStreamMultiplexer(this._source); |
- void _callOnPauseStateChange() { |
- if (_isPaused) { |
- if (_subscription != null) { |
- _subscription.pause(); |
- } |
- } else { |
- if (_subscription != null) { |
- _subscription.resume(); |
- } |
+/** |
+ * Simple implementation of [StreamIterator]. |
+ */ |
+class _StreamIteratorImpl<T> implements StreamIterator<T> { |
+ // TODO(lrn): Keep a one (or two) element buffer to avoid pausing when not |
floitsch
2013/05/22 16:26:29
I guess the ideal solution would be if the constru
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I don't want this to be about buffering, it should
|
+ // necessary. |
+ StreamSubscription _subscription; |
+ _FutureImpl<bool> _hasNext; |
+ T _current; |
+ |
+ _StreamIteratorImpl(final Stream<T> stream) { |
+ _subscription = stream.listen(_onData, |
+ onError: _onError, |
+ onDone: _onDone, |
+ cancelOnError: true); |
+ _subscription.pause(); |
+ } |
+ |
+ Future<bool> moveNext() { |
+ if (_hasNext != null) throw new StateError("Already waiting for next."); |
+ if (_subscription == null) { |
+ return new _FutureImpl<bool>.immediate(false); |
} |
+ _current = null; |
+ _subscription.resume(); |
+ _hasNext = new _FutureImpl<bool>(); |
+ return _hasNext; |
} |
- /** |
- * Subscribe or unsubscribe on [_source] depending on whether |
- * [_stream] has subscribers. |
- */ |
- void _onSubscriptionStateChange() { |
- if (_hasListener) { |
- assert(_subscription == null); |
- _subscription = _source.listen(this._add, |
- onError: this._addError, |
- onDone: this._close); |
- } else { |
- // TODO(lrn): Check why this can happen. |
- if (_subscription == null) return; |
- _subscription.cancel(); |
- _subscription = null; |
+ T get current => _current; |
+ |
+ void cancel() { |
+ StreamSubscription subscription = _subscription; |
+ _subscription = null; |
+ _current = null; |
+ subscription.cancel(); |
+ if (_hasNext != null) { |
+ _FutureImpl<bool> hasNext = _hasNext; |
+ _hasNext = null; |
+ hasNext._setValue(false); |
} |
} |
+ |
+ void _onData(T data) { |
+ assert(_hasNext != null); |
+ _FutureImpl<bool> hasNext = _hasNext; |
+ _hasNext = null; |
+ _current = data; |
+ _subscription.pause(); |
+ hasNext._setValue(true); |
+ } |
+ |
+ void _onError(Object error) { |
+ assert(_hasNext != null); |
+ _FutureImpl<bool> hasNext = _hasNext; |
+ _hasNext = null; |
+ _subscription = null; |
+ hasNext._setError(error); |
+ // We have cancelOnError: true, so the subscription is cancelled. |
floitsch
2013/05/22 16:26:29
canceled.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ } |
+ |
+ void _onDone() { |
+ assert(_hasNext != null); |
+ _FutureImpl<bool> hasNext = _hasNext; |
+ _hasNext = null; |
+ _subscription = null; |
+ hasNext._setValue(false); |
+ } |
} |