Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..f99dc83ba23218b7b30b32dec07ba1024bbfbfd0 |
--- /dev/null |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -0,0 +1,1017 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+// part of dart.async; |
+ |
+// States shared by single/multi stream implementations. |
+ |
+/// Initial and default state where the stream can receive and send events. |
+const int _STREAM_OPEN = 0; |
+/// The stream has received a request to complete, but hasn't done so yet. |
+/// No further events can be aded to the stream. |
+const int _STREAM_CLOSED = 1; |
+/// The stream has completed and will no longer receive or send events. |
+/// Also counts as closed. The stream must not be paused when it's completed. |
+/// Always used in conjunction with [_STREAM_CLOSED]. |
+const int _STREAM_COMPLETE = 2; |
+/// Bit that alternates between events, and listeners are updated to the |
+/// current value when they are notified of the event. |
+const int _STREAM_EVENT_ID = 4; |
+const int _STREAM_EVENT_ID_SHIFT = 2; |
+/// Bit set while firing and clear while not. |
+const int _STREAM_FIRING = 8; |
+/// The count of times a stream has paused is stored in the |
+/// state, shifted by this amount. |
+const int _STREAM_PAUSE_COUNT_SHIFT = 4; |
+ |
+// States for listeners. |
+ |
+/// The listener is currently not subscribed to its source stream. |
+const int _LISTENER_UNSUBSCRIBED = 0; |
+/// The listener is actively subscribed to its source stream. |
+const int _LISTENER_SUBSCRIBED = 1; |
+/// The listener is subscribed until it has been notified of the current event. |
+/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED]. |
+const int _LISTENER_PENDING_UNSUBSCRIBE = 2; |
+/// Bit that contains the last sent event's "id bit". |
+const int _LISTENER_EVENT_ID = 4; |
+const int _LISTENER_EVENT_ID_SHIFT = 2; |
+/// The count of times a listener has paused is stored in the |
+/// state, shifted by this amount. |
+const int _LISTENER_PAUSE_COUNT_SHIFT = 3; |
+ |
+ |
+// ------------------------------------------------------------------- |
+// Common base class for single and multi-subscription streams. |
+// ------------------------------------------------------------------- |
+abstract class _StreamImpl<T> extends Stream<T> { |
+ /** Current state of the stream. */ |
+ int _state = _STREAM_OPEN; |
+ |
+ /** |
+ * List of pending events. |
+ * |
+ * If events are added to the stream (using [_add], [_signalError] or [_done]) |
+ * while the stream is paused, or while another event is firing, events will |
+ * stored here. |
+ * Also supports scheduling the events for later execution. |
+ */ |
+ _StreamImplEvents _pendingEvents; |
+ |
+ // ------------------------------------------------------------------ |
+ // Stream interface. |
+ |
+ StreamSubscription listen(void onData(T data), |
+ { void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError }) { |
+ if (_isComplete) { |
+ return new _DoneSubscription(onDone); |
+ } |
+ if (onData == null) onData = _nullDataHandler; |
+ if (onError == null) onError = _nullErrorHandler; |
+ if (onDone == null) onDone = _nullDoneHandler; |
+ unsubscribeOnError = identical(true, unsubscribeOnError); |
+ _StreamListener subscription = |
+ _createSubscription(onData, onError, onDone, unsubscribeOnError); |
+ _addListener(subscription); |
+ return subscription; |
+ } |
+ |
+ // ------------------------------------------------------------------ |
+ // StreamSink interface-like methods for sending events into the stream. |
+ // It's the responsibility of the caller to ensure that the stream is not |
+ // paused when adding events. If the stream is paused, the events will be |
+ // queued, but it's better to not send events at all. |
+ |
+ /** |
+ * Send or queue a data event. |
+ */ |
+ void _add(T value) { |
+ if (_isClosed) throw new StateError("Sending on closed stream"); |
+ if (!_canFireEvent) { |
+ _addPendingEvent(new _DelayedData<T>(value)); |
+ return; |
+ } |
+ _sendData(value); |
+ _handlePendingEvents(); |
+ } |
+ |
+ /** |
+ * Send or enqueue an error event. |
+ * |
+ * If a subscription has requested to be unsubscribed on errors, |
+ * it will be unsubscribed after receiving this event. |
+ */ |
+ void _signalError(AsyncError error) { |
+ if (_isClosed) throw new StateError("Sending on closed stream"); |
+ if (!_canFireEvent) { |
+ _addPendingEvent(new _DelayedError(error)); |
+ return; |
+ } |
+ _sendError(error); |
+ _handlePendingEvents(); |
+ } |
+ |
+ /** |
+ * Send or enqueue a "done" message. |
+ * |
+ * The "done" message should be sent at most once by a stream, and it |
+ * should be the last message sent. |
+ */ |
+ void _close() { |
+ if (_isClosed) throw new StateError("Sending on closed stream"); |
+ _state |= _STREAM_CLOSED; |
+ if (!_canFireEvent) { |
+ // You can't enqueue an event after the Done, so make it const. |
+ _addPendingEvent(const _DelayedDone()); |
+ return; |
+ } |
+ _sendDone(); |
+ assert(!_hasPendingEvent); |
+ } |
+ |
+ // ------------------------------------------------------------------- |
+ // Internal implementation. |
+ |
+ // State prediates. |
+ |
+ /** Whether the stream has been closed (a done event requested). */ |
+ bool get _isClosed => (_state & _STREAM_CLOSED) != 0; |
+ |
+ /** Whether the stream is completed. */ |
+ bool get _isComplete => (_state & _STREAM_COMPLETE) != 0; |
+ |
+ /** Whether one or more active subscribers have requested a pause. */ |
+ bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT); |
+ |
+ /** Check whether the pending event queue is non-empty */ |
+ bool get _hasPendingEvent => |
+ _pendingEvents != null && !_pendingEvents.isEmpty; |
+ |
+ /** Whether we are currently firing an event. */ |
+ bool get _isFiring => (_state & _STREAM_FIRING) != 0; |
+ |
+ int get _currentEventIdBit => |
+ (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT; |
+ |
+ /** Whether there is currently a subscriber on this [Stream]. */ |
+ bool get _hasSubscribers; |
+ |
+ /** Whether the stream can fire a new event. */ |
+ bool get _canFireEvent => !_isFiring && !_isPaused && !_hasPendingEvent; |
+ |
+ // State modification. |
+ |
+ /** Record an increases in the number of times the listener has paused. */ |
+ void _incrementPauseCount(_StreamListener<T> listener) { |
+ listener._incrementPauseCount(); |
+ _updatePauseCount(1); |
+ } |
+ |
+ /** Record a decrease in the number of times the listener has paused. */ |
+ void _decrementPauseCount(_StreamListener<T> listener) { |
+ assert(_isPaused); |
+ listener._decrementPauseCount(); |
+ _updatePauseCount(-1); |
+ } |
+ |
+ /** Update the stream's own pause count only. */ |
+ void _updatePauseCount(int by) { |
+ _state += by << _STREAM_PAUSE_COUNT_SHIFT; |
+ assert(_state >= 0); |
+ } |
+ |
+ void _setClosed() { |
+ assert(!_isClosed); |
+ _state |= _STREAM_CLOSED; |
+ } |
+ |
+ void _setComplete() { |
+ assert(_isClosed); |
+ _state = _state |_STREAM_COMPLETE; |
+ } |
+ |
+ void _startFiring() { |
+ assert(!_isFiring); |
+ // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID |
+ // bit. All current subscribers will now have a _LISTENER_EVENT_ID |
+ // that doesn't match _STREAM_EVENT_ID, and they will receive the |
+ // event being fired. |
+ _state ^= _STREAM_FIRING | _STREAM_EVENT_ID; |
+ } |
+ |
+ void _endFiring() { |
+ assert(_isFiring); |
+ _state ^= _STREAM_FIRING; |
+ } |
+ |
+ /** |
+ * Record that a listener wants a pause from events. |
+ * |
+ * This methods is called from [_StreamListener.pause()]. |
+ * Subclasses can override this method, along with [isPaused] and |
+ * [createSubscription], if they want to do a different handling of paused |
+ * subscriptions, e.g., a filtering stream pausing its own source if all its |
+ * subscribers are paused. |
+ */ |
+ void _pause(_StreamListener<T> listener, Signal resumeSignal) { |
+ assert(identical(listener._source, this)); |
+ if (!listener._isSubscribed) { |
+ throw new StateError("Subscription has been canceled."); |
+ } |
+ assert(!_isComplete); // There can be no subscribers when complete. |
+ bool wasPaused = _isPaused; |
+ _incrementPauseCount(listener); |
+ if (resumeSignal != null) { |
+ resumeSignal.then(() { this._resume(listener, true); }); |
+ } |
+ if (!wasPaused) { |
+ _onPauseStateChange(); |
+ } |
+ } |
+ |
+ /** Stops pausing due to one request from the given listener. */ |
+ void _resume(_StreamListener<T> listener, bool fromEvent) { |
+ if (!listener.isPaused) return; |
+ assert(listener._isSubscribed); |
+ assert(_isPaused); |
+ _decrementPauseCount(listener); |
+ if (!_isPaused) { |
+ _onPauseStateChange(); |
+ if (_hasPendingEvent) { |
+ // If we can fire events now, fire any pending events right away. |
+ if (fromEvent && !_isFiring) { |
+ _handlePendingEvents(); |
+ } else { |
+ _pendingEvents.schedule(this); |
+ } |
+ } |
+ } |
+ } |
+ |
+ /** Create a subscription object. Called by [subcribe]. */ |
+ _StreamSubscriptionImpl<T> _createSubscription( |
+ void onData(T data), |
+ void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError); |
+ |
+ /** |
+ * Adds a listener to this stream. |
+ */ |
+ void _addListener(_StreamSubscriptionImpl subscription); |
+ |
+ /** |
+ * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
+ * |
+ * This method is called from [_StreamSubscriptionImpl.cancel]. |
+ * |
+ * If an event is currently firing, the cancel is delayed |
+ * until after the subscribers have received the event. |
+ */ |
+ void _cancel(_StreamSubscriptionImpl subscriber); |
+ |
+ /** |
+ * Iterate over all current subscribers and perform an action on each. |
+ * |
+ * Subscribers added during the iteration will not be visited. |
+ * Subscribers unsubscribed during the iteration will only be removed |
+ * after they have been acted on. |
+ * |
+ * Any change in the pause state is only reported after all subscribers have |
+ * received the event. |
+ * |
+ * The [action] must not throw, or the controller will be left in an |
+ * invalid state. |
+ * |
+ * This method must not be called while [isFiring] is true. |
+ */ |
+ void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription)); |
+ |
+ /** |
+ * Called when the first subscriber requests a pause or the last a resume. |
+ * |
+ * Read [isPaused] to see the new state. |
+ */ |
+ void _onPauseStateChange() {} |
+ |
+ /** |
+ * Called when the first listener subscribes or the last unsubscribes. |
+ * |
+ * Read [hasSubscribers] to see what the new state is. |
+ */ |
+ void _onSubscriptionStateChange() {} |
+ |
+ /** Add a pending event at the end of the pending event queue. */ |
+ void _addPendingEvent(_DelayedEvent event) { |
+ if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents(); |
+ _pendingEvents.add(event); |
+ } |
+ |
+ /** Fire any pending events until the pending event queue. */ |
+ void _handlePendingEvents() { |
+ _StreamImplEvents events = _pendingEvents; |
+ if (events == null) return; |
+ while (!events.isEmpty && !_isPaused) { |
+ events.removeFirst().perform(this); |
+ } |
+ } |
+ |
+ /** |
+ * Send a data event directly to each subscriber. |
+ */ |
+ _sendData(T value) { |
+ assert(!_isPaused); |
+ assert(!_isComplete); |
+ _forEachSubscriber((subscriber) { |
+ try { |
+ subscriber._sendData(value); |
+ } catch (e, s) { |
+ new AsyncError(e, s).throwDelayed(); |
+ } |
+ }); |
+ } |
+ |
+ /** |
+ * Sends an error event directly to each subscriber. |
+ */ |
+ void _sendError(AsyncError error) { |
+ assert(!_isPaused); |
+ assert(!_isComplete); |
+ _forEachSubscriber((subscriber) { |
+ try { |
+ subscriber._sendError(error); |
+ } catch (e, s) { |
+ new AsyncError.withCause(e, s, error).throwDelayed(); |
+ } |
+ }); |
+ } |
+ |
+ /** |
+ * Sends the "done" message directly to each subscriber. |
+ * This automatically stops further subscription and |
+ * unsubscribes all subscribers. |
+ */ |
+ void _sendDone() { |
+ assert(!_isPaused); |
+ assert(_isClosed); |
+ _setComplete(); |
+ if (!_hasSubscribers) return; |
+ _forEachSubscriber((subscriber) { |
+ _cancel(subscriber); |
+ try { |
+ subscriber._sendDone(); |
+ } catch (e, s) { |
+ new AsyncError(e, s).throwDelayed(); |
+ } |
+ }); |
+ assert(!_hasSubscribers); |
+ _onSubscriptionStateChange(); |
+ } |
+} |
+ |
+// ------------------------------------------------------------------- |
+// Default implementation of a stream with a single subscriber. |
+// ------------------------------------------------------------------- |
+/** |
+ * Default implementation of stream capable of sending events to one subscriber. |
+ * |
+ * Any class needing to implement [Stream] can either directly extend this |
+ * class, or extend [Stream] and delegate the subscribe method to an instance |
+ * of this class. |
+ * |
+ * The only public methods are those of [Stream], so instances of |
+ * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing |
+ * internal functionality. |
+ * |
+ * The [StreamController] is a public facing version of this class, with |
+ * some methods made public. |
+ * |
+ * The user interface of [_SingleStreamImpl] are the following methods: |
+ * * [_add]: Add a data event to the stream. |
+ * * [_signalError]: Add an error event to the stream. |
+ * * [_close]: Request to close the stream. |
+ * * [_onSubscriberStateChange]: Called when receiving the first subscriber or |
+ * when losing the last subscriber. |
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
+ * * [_hasSubscribers]: Test whether there are currently any subscribers. |
+ * * [_isPaused]: Test whether the stream is currently paused. |
+ * The user should not add new events while the stream is paused, but if it |
+ * happens anyway, the stream will enqueue the events just as when new events |
+ * arrive while still firing an old event. |
+ */ |
+class _SingleStreamImpl<T> extends _StreamImpl<T> { |
+ _StreamSubscriptionImpl _subscriber = null; |
+ |
+ /** Whether one or more active subscribers have requested a pause. */ |
+ bool get _isPaused => !_hasSubscribers || super._isPaused; |
+ |
+ /** Whether there is currently a subscriber on this [Stream]. */ |
+ bool get _hasSubscribers => _subscriber != null; |
+ |
+ // ------------------------------------------------------------------- |
+ // Internal implementation. |
+ |
+ /** |
+ * Create the new subscription object. |
+ */ |
+ _StreamSubscriptionImpl<T> _createSubscription( |
+ void onData(T data), |
+ void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError) { |
+ return new _StreamSubscriptionImpl<T>( |
+ this, onData, onError, onDone, unsubscribeOnError); |
+ } |
+ |
+ void _addListener(_StreamSubscriptionImpl subscription) { |
+ if (_hasSubscribers) { |
+ throw new StateError("Stream has already subscriber."); |
+ } |
+ _subscriber = subscription; |
+ subscription._setSubscribed(0); |
+ _onSubscriptionStateChange(); |
+ // TODO(floitsch): Should this be delayed? |
+ _handlePendingEvents(); |
+ } |
+ |
+ /** |
+ * Handle a cancel requested from a [_StreamSubscriptionImpl]. |
+ * |
+ * This method is called from [_StreamSubscriptionImpl.cancel]. |
+ * |
+ * If an event is currently firing, the cancel is delayed |
+ * until after the subscriber has received the event. |
+ */ |
+ void _cancel(_StreamSubscriptionImpl subscriber) { |
+ assert(identical(subscriber._source, this)); |
+ // We allow unsubscribing the currently firing subscription during |
+ // the event firing, because it is indistinguishable from delaying it since |
+ // that event has already received the event. |
+ if (!identical(_subscriber, subscriber)) { |
+ // You may unsubscribe more than once, only the first one counts. |
+ return; |
+ } |
+ _subscriber = null; |
+ int timesPaused = subscriber._setUnsubscribed(); |
+ _updatePauseCount(-timesPaused); |
+ if (timesPaused > 0) { |
+ _onPauseStateChange(); |
+ } |
+ _onSubscriptionStateChange(); |
+ } |
+ |
+ void _forEachSubscriber( |
+ void action(_StreamSubscriptionImpl<T> subscription)) { |
+ _StreamSubscriptionImpl subscription = _subscriber; |
+ assert(subscription != null); |
+ _startFiring(); |
+ action(subscription); |
+ _endFiring(); |
+ } |
+} |
+ |
+// ------------------------------------------------------------------- |
+// Default implementation of a stream with subscribers. |
+// ------------------------------------------------------------------- |
+ |
+/** |
+ * Default implementation of stream capable of sending events to subscribers. |
+ * |
+ * Any class needing to implement [Stream] can either directly extend this |
+ * class, or extend [Stream] and delegate the subscribe method to an instance |
+ * of this class. |
+ * |
+ * The only public methods are those of [Stream], so instances of |
+ * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing |
+ * internal functionality. |
+ * |
+ * The [StreamController] is a public facing version of this class, with |
+ * some methods made public. |
+ * |
+ * The user interface of [_MultiStreamImpl] are the following methods: |
+ * * [_add]: Add a data event to the stream. |
+ * * [_signalError]: Add an error event to the stream. |
+ * * [_close]: Request to close the stream. |
+ * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or |
+ * when losing the last subscriber. |
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode. |
+ * * [_hasSubscribers]: Test whether there are currently any subscribers. |
+ * * [_isPaused]: Test whether the stream is currently paused. |
+ * The user should not add new events while the stream is paused, but if it |
+ * happens anyway, the stream will enqueue the events just as when new events |
+ * arrive while still firing an old event. |
+ */ |
+class _MultiStreamImpl<T> extends _StreamImpl<T> |
+ implements _InternalLinkList { |
+ // Link list implementation (mixin when possible). |
+ _InternalLink _nextLink; |
+ _InternalLink _previousLink; |
+ |
+ _MultiStreamImpl() { |
+ _nextLink = _previousLink = this; |
+ } |
+ |
+ // ------------------------------------------------------------------ |
+ // Helper functions that can be overridden in subclasses. |
+ |
+ /** Whether there are currently any subscribers on this [Stream]. */ |
+ bool get _hasSubscribers => !_InternalLinkList.isEmpty(this); |
+ |
+ /** |
+ * Create the new subscription object. |
+ */ |
+ _StreamListener<T> _createSubscription( |
+ void onData(T data), |
+ void onError(AsyncError error), |
+ void onDone(), |
+ bool unsubscribeOnError) { |
+ return new _StreamSubscriptionImpl<T>( |
+ this, onData, onError, onDone, unsubscribeOnError); |
+ } |
+ |
+ // ------------------------------------------------------------------- |
+ // Internal implementation. |
+ |
+ /** |
+ * Iterate over all current subscribers and perform an action on each. |
+ * |
+ * The set of subscribers cannot be modified during this iteration. |
+ * All attempts to add or unsubscribe subscribers will be delayed until |
+ * after the iteration is complete. |
+ * |
+ * The [action] must not throw, or the controller will be left in an |
+ * invalid state. |
+ * |
+ * This method must not be called while [isFiring] is true. |
+ */ |
+ void _forEachSubscriber( |
+ void action(_StreamListener<T> subscription)) { |
+ assert(!_isFiring); |
+ if (!_hasSubscribers) return; |
+ _startFiring(); |
+ _InternalLink cursor = this._nextLink; |
+ while (!identical(cursor, this)) { |
+ _StreamListener<T> current = cursor; |
+ if (current._needsEvent(_currentEventIdBit)) { |
+ action(current); |
+ // Marks as having received the event. |
+ current._toggleEventReceived(); |
+ } |
+ cursor = current._nextLink; |
+ if (current._isPendingUnsubscribe) { |
+ _removeListener(current); |
+ } |
+ } |
+ _endFiring(); |
+ if (_isPaused) _onPauseStateChange(); |
+ if (!_hasSubscribers) _onSubscriptionStateChange(); |
+ } |
+ |
+ void _addListener(_StreamListener listener) { |
+ listener._setSubscribed(_currentEventIdBit); |
+ bool firstSubscriber = !_hasSubscribers; |
+ _InternalLinkList.add(this, listener); |
+ if (firstSubscriber) { |
+ _onSubscriptionStateChange(); |
+ } |
+ } |
+ |
+ /** |
+ * Handle a cancel requested from a [_StreamListener]. |
+ * |
+ * This method is called from [_StreamListener.cancel]. |
+ * |
+ * If an event is currently firing, the cancel is delayed |
+ * until after the subscribers have received the event. |
+ */ |
+ void _cancel(_StreamListener listener) { |
+ assert(identical(listener._source, this)); |
+ if (_InternalLink.isUnlinked(listener)) { |
+ // You may unsubscribe more than once, only the first one counts. |
+ return; |
+ } |
+ if (_isFiring) { |
+ if (listener._needsEvent(_currentEventIdBit)) { |
+ assert(listener._isSubscribed); |
+ listener._setPendingUnsubscribe(); |
+ } else { |
+ // The listener has been notified of the event (or don't need to, |
+ // if it's still pending subscription) so it's safe to remove it. |
+ _removeListener(listener); |
+ } |
+ // Pause and subscription state changes are reported when we end |
+ // firing. |
+ } else { |
+ bool wasPaused = _isPaused; |
+ _removeListener(listener); |
+ if (wasPaused != _isPaused) _onPauseStateChange(); |
+ if (!_hasSubscribers) _onSubscriptionStateChange(); |
+ } |
+ } |
+ |
+ /** |
+ * Removes a listener from this stream and cancels its pauses. |
+ * |
+ * This is a low-level action that doesn't call [_onSubscriptionStateChange]. |
+ * or [_onPauseStateChange]. |
+ */ |
+ void _removeListener(_StreamListener listener) { |
+ int pauseCount = listener._setUnsubscribed(); |
+ _updatePauseCount(-pauseCount); |
+ _InternalLinkList.remove(listener); |
+ } |
+} |
+ |
+/** |
+ * The subscription class that the [StreamController] uses. |
+ * |
+ * The [StreamController.createSubscription] method should |
+ * create an object of this type, or another subclass of [_StreamListener]. |
+ * A subclass of [StreamController] can specify which subclass |
+ * of [_StreamSubscriptionImpl] it uses by overriding |
+ * [StreamController.createSubscription]. |
+ * |
+ * The subscription is in one of three states: |
+ * * Subscribed. |
+ * * Paused-and-subscribed. |
+ * * Unsubscribed. |
+ * Unsubscribing also unpauses. |
+ */ |
+class _StreamSubscriptionImpl<T> extends _StreamListener<T> |
+ implements StreamSubscription<T> { |
+ final bool _unsubscribeOnError; |
+ _DataHandler _onData; |
+ _ErrorHandler _onError; |
+ _DoneHandler _onDone; |
+ _StreamSubscriptionImpl(_StreamImpl source, |
+ this._onData, |
+ this._onError, |
+ this._onDone, |
+ this._unsubscribeOnError) : super(source); |
+ |
+ void onData(void handleData(T event)) { |
+ if (handleData == null) handleData = _nullDataHandler; |
+ _onData = handleData; |
+ } |
+ |
+ void onError(void handleError(AsyncError error)) { |
+ if (handleError == null) handleError = _nullErrorHandler; |
+ _onError = handleError; |
+ } |
+ |
+ void onDone(void handleDone()) { |
+ if (handleDone == null) handleDone = _nullDoneHandler; |
+ _onDone = handleDone; |
+ } |
+ |
+ void _sendData(T data) { |
+ _onData(data); |
+ } |
+ |
+ void _sendError(AsyncError error) { |
+ _onError(error); |
+ if (_unsubscribeOnError) _source._cancel(this); |
+ } |
+ |
+ void _sendDone() { |
+ _onDone(); |
+ } |
+ |
+ void cancel() { |
+ _source._cancel(this); |
+ } |
+ |
+ void pause([Signal resumeSignal]) { |
+ _source._pause(this, resumeSignal); |
+ } |
+ |
+ void resume() { |
+ if (!isPaused) { |
+ throw new StateError("Resuming unpaused subscription"); |
+ } |
+ _source._resume(this, false); |
+ } |
+} |
+ |
+// Internal helpers. |
+ |
+// Types of the different handlers on a stream. Types used to type fields. |
+typedef void _DataHandler<T>(T value); |
+typedef void _ErrorHandler(AsyncError error); |
+typedef void _DoneHandler(); |
+ |
+ |
+/** Default data handler, does nothing. */ |
+void _nullDataHandler(var value) {} |
+ |
+/** Default error handler, reports the error to the global handler. */ |
+void _nullErrorHandler(AsyncError error) { |
+ error.throwDelayed(); |
+} |
+ |
+/** Default done handler, does nothing. */ |
+void _nullDoneHandler() {} |
+ |
+ |
+/** A delayed event on a stream implementation. */ |
+abstract class _DelayedEvent { |
+ /** Added as a linked list on the [StreamController]. */ |
+ _DelayedEvent next; |
+ /** Execute the delayed event on the [StreamController]. */ |
+ void perform(_StreamImpl stream); |
+} |
+ |
+/** A delayed data event. */ |
+class _DelayedData<T> extends _DelayedEvent{ |
+ T value; |
+ _DelayedData(this.value); |
+ void perform(_StreamImpl<T> stream) { |
+ stream._sendData(value); |
+ } |
+} |
+ |
+/** A delayed error event. */ |
+class _DelayedError extends _DelayedEvent { |
+ AsyncError error; |
+ _DelayedError(this.error); |
+ void perform(_StreamImpl stream) { |
+ stream._sendError(error); |
+ } |
+} |
+ |
+/** A delayed done event. */ |
+class _DelayedDone implements _DelayedEvent { |
+ const _DelayedDone(); |
+ void perform(_StreamImpl stream) { |
+ stream._sendDone(); |
+ } |
+ |
+ _DelayedEvent get next => null; |
+ |
+ void set next(_DelayedEvent _) { |
+ throw new StateError("No events after a done."); |
+ } |
+} |
+ |
+/** |
+ * Simple internal doubly-linked list implementation. |
+ * |
+ * In an internal linked list, the links are in the data objects themselves, |
+ * instead of in a separate object. That means each element can be in at most |
+ * one list at a time. |
+ * |
+ * All links are always members of an element cycle. At creation it's a |
+ * singleton cycle. |
+ */ |
+abstract class _InternalLink { |
+ _InternalLink _nextLink; |
+ _InternalLink _previousLink; |
+ |
+ _InternalLink() { |
+ this._previousLink = this._nextLink = this; |
+ } |
+ |
+ /* Removes a link from any list it may be part of, and links it to itself. */ |
+ static void unlink(_InternalLink element) { |
+ _InternalLink next = element._nextLink; |
+ _InternalLink previous = element._previousLink; |
+ next._previousLink = previous; |
+ previous._nextLink = next; |
+ element._nextLink = element._previousLink = element; |
+ } |
+ |
+ /** Check whether an element is unattached to other elements. */ |
+ static bool isUnlinked(_InternalLink element) { |
+ return identical(element, element._nextLink); |
+ } |
+} |
+ |
+/** |
+ * Marker interface for "list" links. |
+ * |
+ * An "InternalLinkList" is an abstraction on top of a link cycle, where the |
+ * "list" object itself is not considered an element (it's just a header link |
+ * created to avoid edge cases). |
+ * An element is considered part of a list if it is in the list's cycle. |
+ * There should never be more than one "list" object in a cycle. |
+ */ |
+abstract class _InternalLinkList extends _InternalLink { |
+ /** |
+ * Adds an element to a list, just before the header link. |
+ * |
+ * This effectively adds it at the end of the list. |
+ */ |
+ static void add(_InternalLinkList list, _InternalLink element) { |
+ if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element); |
+ _InternalLink listEnd = list._previousLink; |
+ listEnd._nextLink = element; |
+ list._previousLink = element; |
+ element._previousLink = listEnd; |
+ element._nextLink = list; |
+ } |
+ |
+ /** Removes an element from its list. */ |
+ static void remove(_InternalLink element) { |
+ _InternalLink.unlink(element); |
+ } |
+ |
+ /** Check whether a list contains no elements, only the header link. */ |
+ static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list); |
+ |
+ /** Moves all elements from the list [other] to [list]. */ |
+ static void addAll(_InternalLinkList list, _InternalLinkList other) { |
+ if (isEmpty(other)) return; |
+ _InternalLink listLast = list._previousLink; |
+ _InternalLink otherNext = other._nextLink; |
+ listLast._nextLink = otherNext; |
+ otherNext._previousLink = listLast; |
+ _InternalLink otherLast = other._previousLink; |
+ list._previousLink = otherLast; |
+ otherLast._nextLink = list; |
+ // Clean up [other]. |
+ other._nextLink = other._previousLink = other; |
+ } |
+} |
+ |
+abstract class _StreamListener<T> extends _InternalLink { |
+ final _StreamImpl _source; |
+ int _state = _LISTENER_UNSUBSCRIBED; |
+ |
+ _StreamListener(this._source); |
+ |
+ bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT); |
+ |
+ bool get _isPendingUnsubscribe => |
+ (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0; |
+ |
+ bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0; |
+ |
+ /** |
+ * Whether the listener still needs to receive the currently firing event. |
+ * |
+ * The currently firing event is identified by a single bit, which alternates |
+ * between events. The [_state] contains the previously sent event's bit in |
+ * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener |
+ * still need the current event. |
+ */ |
+ bool _needsEvent(int currentEventIdBit) { |
+ int lastEventIdBit = |
+ (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT; |
+ return lastEventIdBit != currentEventIdBit; |
+ } |
+ |
+ /// If a subscriber's "firing bit" doesn't match the stream's firing bit, |
+ /// we are currently firing an event and the subscriber still need to receive |
+ /// the event. |
+ void _toggleEventReceived() { |
+ _state ^= _LISTENER_EVENT_ID; |
+ } |
+ |
+ void _setSubscribed(int eventIdBit) { |
+ assert(eventIdBit == 0 || eventIdBit == 1); |
+ _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT); |
+ } |
+ |
+ void _setPendingUnsubscribe() { |
+ assert(_isSubscribed); |
+ _state |= _LISTENER_PENDING_UNSUBSCRIBE; |
+ } |
+ |
+ /** |
+ * Marks the listener as unsubscibed. |
+ * |
+ * Returns the number of unresumed pauses for the listener. |
+ */ |
+ int _setUnsubscribed() { |
+ assert(_isSubscribed); |
+ int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT; |
+ _state = _LISTENER_UNSUBSCRIBED; |
+ return timesPaused; |
+ } |
+ |
+ void _incrementPauseCount() { |
+ _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
+ } |
+ |
+ void _decrementPauseCount() { |
+ assert(isPaused); |
+ _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT; |
+ } |
+ |
+ _sendData(T data); |
+ _sendError(AsyncError error); |
+ _sendDone(); |
+} |
+ |
+/** Class holding pending events for a [_StreamImpl]. */ |
+class _StreamImplEvents { |
+ /// Single linked list of [_DelayedEvent] objects. |
+ _DelayedEvent firstPendingEvent = null; |
+ /// Last element in the list of pending events. New events are added after it. |
+ _DelayedEvent lastPendingEvent = null; |
+ /** |
+ * Timer set when pending events are scheduled for execution. |
+ * |
+ * When scheduling pending events for execution in a later cycle, the timer |
+ * is stored here. If pending events are executed earlier than that, e.g., |
+ * due to a second event in the current cycle, the timer is canceled again. |
+ */ |
+ Timer scheduleTimer = null; |
+ |
+ bool get isEmpty => lastPendingEvent == null; |
+ |
+ bool get isScheduled => scheduleTimer != null; |
+ |
+ void schedule(_StreamImpl stream) { |
+ if (isScheduled) return; |
+ scheduleTimer = new Timer(0, (_) { |
+ scheduleTimer = null; |
+ stream._handlePendingEvents(); |
+ }); |
+ } |
+ |
+ void cancelSchedule() { |
+ assert(isScheduled); |
+ scheduleTimer.cancel(); |
+ scheduleTimer = null; |
+ } |
+ |
+ void add(_DelayedEvent event) { |
+ if (lastPendingEvent == null) { |
+ firstPendingEvent = lastPendingEvent = event; |
+ } else { |
+ lastPendingEvent = lastPendingEvent.next = event; |
+ } |
+ } |
+ |
+ _DelayedEvent removeFirst() { |
+ if (isScheduled) cancelSchedule(); |
+ _DelayedEvent event = firstPendingEvent; |
+ firstPendingEvent = event.next; |
+ if (firstPendingEvent == null) { |
+ lastPendingEvent = null; |
+ } |
+ return event; |
+ } |
+} |
+ |
+ |
+class _DoneSubscription<T> implements StreamSubscription<T> { |
+ _DoneHandler _handler; |
+ Timer _timer; |
+ int _pauseCount = 0; |
+ |
+ _DoneSubscription(this._handler) { |
+ _delayDone(); |
+ } |
+ |
+ void _delayDone() { |
+ assert(_timer == null && _pauseCount == 0); |
+ _timer = new Timer(0, (_) { |
+ if (_handler != null) _handler(); |
+ }); |
+ } |
+ |
+ bool get _isComplete => _timer == null && _pauseCount == 0; |
+ |
+ void onData(void handleAction(T value)) {} |
+ void onError(void handleError(StateError error)) {} |
+ void onDone(void handleDone(T value)) { |
+ _handler = handleDone; |
+ } |
+ |
+ void pause([Signal signal]) { |
+ if (_isComplete) { |
+ throw new StateError("Subscription has been canceled."); |
+ } |
+ if (_timer != null) _timer.cancel(); |
+ _pauseCount++; |
+ } |
+ |
+ void resume() { |
+ if (_isComplete) { |
+ throw new StateError("Subscription has been canceled."); |
+ } |
+ if (_pauseCount == 0) return; |
+ _pauseCount--; |
+ if (_pauseCount == 0) { |
+ _delayDone(); |
+ } |
+ } |
+ |
+ bool get isPaused => _pauseCount > 0; |
+ |
+ void cancel() { |
+ if (_isComplete) { |
+ throw new StateError("Subscription has been canceled."); |
+ } |
+ if (_timer != null) { |
+ _timer.cancel(); |
+ _timer = null; |
+ } |
+ _pauseCount = 0; |
+ } |
+} |