Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
new file mode 100644
index 0000000000000000000000000000000000000000..f99dc83ba23218b7b30b32dec07ba1024bbfbfd0
--- /dev/null
+++ b/sdk/lib/async/stream_impl.dart
@@ -0,0 +1,1017 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// part of dart.async;
+
+// States shared by single/multi stream implementations.
+
+/// Initial and default state where the stream can receive and send events.
+const int _STREAM_OPEN = 0;
+/// The stream has received a request to complete, but hasn't done so yet.
+/// No further events can be aded to the stream.
+const int _STREAM_CLOSED = 1;
+/// The stream has completed and will no longer receive or send events.
+/// Also counts as closed. The stream must not be paused when it's completed.
+/// Always used in conjunction with [_STREAM_CLOSED].
+const int _STREAM_COMPLETE = 2;
+/// Bit that alternates between events, and listeners are updated to the
+/// current value when they are notified of the event.
+const int _STREAM_EVENT_ID = 4;
+const int _STREAM_EVENT_ID_SHIFT = 2;
+/// Bit set while firing and clear while not.
+const int _STREAM_FIRING = 8;
+/// The count of times a stream has paused is stored in the
+/// state, shifted by this amount.
+const int _STREAM_PAUSE_COUNT_SHIFT = 4;
+
+// States for listeners.
+
+/// The listener is currently not subscribed to its source stream.
+const int _LISTENER_UNSUBSCRIBED = 0;
+/// The listener is actively subscribed to its source stream.
+const int _LISTENER_SUBSCRIBED = 1;
+/// The listener is subscribed until it has been notified of the current event.
+/// This flag bit is always used in conjuction with [_LISTENER_SUBSCRIBED].
+const int _LISTENER_PENDING_UNSUBSCRIBE = 2;
+/// Bit that contains the last sent event's "id bit".
+const int _LISTENER_EVENT_ID = 4;
+const int _LISTENER_EVENT_ID_SHIFT = 2;
+/// The count of times a listener has paused is stored in the
+/// state, shifted by this amount.
+const int _LISTENER_PAUSE_COUNT_SHIFT = 3;
+
+
+// -------------------------------------------------------------------
+// Common base class for single and multi-subscription streams.
+// -------------------------------------------------------------------
+abstract class _StreamImpl<T> extends Stream<T> {
+ /** Current state of the stream. */
+ int _state = _STREAM_OPEN;
+
+ /**
+ * List of pending events.
+ *
+ * If events are added to the stream (using [_add], [_signalError] or [_done])
+ * while the stream is paused, or while another event is firing, events will
+ * stored here.
+ * Also supports scheduling the events for later execution.
+ */
+ _StreamImplEvents _pendingEvents;
+
+ // ------------------------------------------------------------------
+ // Stream interface.
+
+ StreamSubscription listen(void onData(T data),
+ { void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError }) {
+ if (_isComplete) {
+ return new _DoneSubscription(onDone);
+ }
+ if (onData == null) onData = _nullDataHandler;
+ if (onError == null) onError = _nullErrorHandler;
+ if (onDone == null) onDone = _nullDoneHandler;
+ unsubscribeOnError = identical(true, unsubscribeOnError);
+ _StreamListener subscription =
+ _createSubscription(onData, onError, onDone, unsubscribeOnError);
+ _addListener(subscription);
+ return subscription;
+ }
+
+ // ------------------------------------------------------------------
+ // StreamSink interface-like methods for sending events into the stream.
+ // It's the responsibility of the caller to ensure that the stream is not
+ // paused when adding events. If the stream is paused, the events will be
+ // queued, but it's better to not send events at all.
+
+ /**
+ * Send or queue a data event.
+ */
+ void _add(T value) {
+ if (_isClosed) throw new StateError("Sending on closed stream");
+ if (!_canFireEvent) {
+ _addPendingEvent(new _DelayedData<T>(value));
+ return;
+ }
+ _sendData(value);
+ _handlePendingEvents();
+ }
+
+ /**
+ * Send or enqueue an error event.
+ *
+ * If a subscription has requested to be unsubscribed on errors,
+ * it will be unsubscribed after receiving this event.
+ */
+ void _signalError(AsyncError error) {
+ if (_isClosed) throw new StateError("Sending on closed stream");
+ if (!_canFireEvent) {
+ _addPendingEvent(new _DelayedError(error));
+ return;
+ }
+ _sendError(error);
+ _handlePendingEvents();
+ }
+
+ /**
+ * Send or enqueue a "done" message.
+ *
+ * The "done" message should be sent at most once by a stream, and it
+ * should be the last message sent.
+ */
+ void _close() {
+ if (_isClosed) throw new StateError("Sending on closed stream");
+ _state |= _STREAM_CLOSED;
+ if (!_canFireEvent) {
+ // You can't enqueue an event after the Done, so make it const.
+ _addPendingEvent(const _DelayedDone());
+ return;
+ }
+ _sendDone();
+ assert(!_hasPendingEvent);
+ }
+
+ // -------------------------------------------------------------------
+ // Internal implementation.
+
+ // State prediates.
+
+ /** Whether the stream has been closed (a done event requested). */
+ bool get _isClosed => (_state & _STREAM_CLOSED) != 0;
+
+ /** Whether the stream is completed. */
+ bool get _isComplete => (_state & _STREAM_COMPLETE) != 0;
+
+ /** Whether one or more active subscribers have requested a pause. */
+ bool get _isPaused => _state >= (1 << _STREAM_PAUSE_COUNT_SHIFT);
+
+ /** Check whether the pending event queue is non-empty */
+ bool get _hasPendingEvent =>
+ _pendingEvents != null && !_pendingEvents.isEmpty;
+
+ /** Whether we are currently firing an event. */
+ bool get _isFiring => (_state & _STREAM_FIRING) != 0;
+
+ int get _currentEventIdBit =>
+ (_state & _STREAM_EVENT_ID ) >> _STREAM_EVENT_ID_SHIFT;
+
+ /** Whether there is currently a subscriber on this [Stream]. */
+ bool get _hasSubscribers;
+
+ /** Whether the stream can fire a new event. */
+ bool get _canFireEvent => !_isFiring && !_isPaused && !_hasPendingEvent;
+
+ // State modification.
+
+ /** Record an increases in the number of times the listener has paused. */
+ void _incrementPauseCount(_StreamListener<T> listener) {
+ listener._incrementPauseCount();
+ _updatePauseCount(1);
+ }
+
+ /** Record a decrease in the number of times the listener has paused. */
+ void _decrementPauseCount(_StreamListener<T> listener) {
+ assert(_isPaused);
+ listener._decrementPauseCount();
+ _updatePauseCount(-1);
+ }
+
+ /** Update the stream's own pause count only. */
+ void _updatePauseCount(int by) {
+ _state += by << _STREAM_PAUSE_COUNT_SHIFT;
+ assert(_state >= 0);
+ }
+
+ void _setClosed() {
+ assert(!_isClosed);
+ _state |= _STREAM_CLOSED;
+ }
+
+ void _setComplete() {
+ assert(_isClosed);
+ _state = _state |_STREAM_COMPLETE;
+ }
+
+ void _startFiring() {
+ assert(!_isFiring);
+ // This sets the _STREAM_FIRING bit and toggles the _STREAM_EVENT_ID
+ // bit. All current subscribers will now have a _LISTENER_EVENT_ID
+ // that doesn't match _STREAM_EVENT_ID, and they will receive the
+ // event being fired.
+ _state ^= _STREAM_FIRING | _STREAM_EVENT_ID;
+ }
+
+ void _endFiring() {
+ assert(_isFiring);
+ _state ^= _STREAM_FIRING;
+ }
+
+ /**
+ * Record that a listener wants a pause from events.
+ *
+ * This methods is called from [_StreamListener.pause()].
+ * Subclasses can override this method, along with [isPaused] and
+ * [createSubscription], if they want to do a different handling of paused
+ * subscriptions, e.g., a filtering stream pausing its own source if all its
+ * subscribers are paused.
+ */
+ void _pause(_StreamListener<T> listener, Signal resumeSignal) {
+ assert(identical(listener._source, this));
+ if (!listener._isSubscribed) {
+ throw new StateError("Subscription has been canceled.");
+ }
+ assert(!_isComplete); // There can be no subscribers when complete.
+ bool wasPaused = _isPaused;
+ _incrementPauseCount(listener);
+ if (resumeSignal != null) {
+ resumeSignal.then(() { this._resume(listener, true); });
+ }
+ if (!wasPaused) {
+ _onPauseStateChange();
+ }
+ }
+
+ /** Stops pausing due to one request from the given listener. */
+ void _resume(_StreamListener<T> listener, bool fromEvent) {
+ if (!listener.isPaused) return;
+ assert(listener._isSubscribed);
+ assert(_isPaused);
+ _decrementPauseCount(listener);
+ if (!_isPaused) {
+ _onPauseStateChange();
+ if (_hasPendingEvent) {
+ // If we can fire events now, fire any pending events right away.
+ if (fromEvent && !_isFiring) {
+ _handlePendingEvents();
+ } else {
+ _pendingEvents.schedule(this);
+ }
+ }
+ }
+ }
+
+ /** Create a subscription object. Called by [subcribe]. */
+ _StreamSubscriptionImpl<T> _createSubscription(
+ void onData(T data),
+ void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError);
+
+ /**
+ * Adds a listener to this stream.
+ */
+ void _addListener(_StreamSubscriptionImpl subscription);
+
+ /**
+ * Handle a cancel requested from a [_StreamSubscriptionImpl].
+ *
+ * This method is called from [_StreamSubscriptionImpl.cancel].
+ *
+ * If an event is currently firing, the cancel is delayed
+ * until after the subscribers have received the event.
+ */
+ void _cancel(_StreamSubscriptionImpl subscriber);
+
+ /**
+ * Iterate over all current subscribers and perform an action on each.
+ *
+ * Subscribers added during the iteration will not be visited.
+ * Subscribers unsubscribed during the iteration will only be removed
+ * after they have been acted on.
+ *
+ * Any change in the pause state is only reported after all subscribers have
+ * received the event.
+ *
+ * The [action] must not throw, or the controller will be left in an
+ * invalid state.
+ *
+ * This method must not be called while [isFiring] is true.
+ */
+ void _forEachSubscriber(void action(_StreamSubscriptionImpl<T> subscription));
+
+ /**
+ * Called when the first subscriber requests a pause or the last a resume.
+ *
+ * Read [isPaused] to see the new state.
+ */
+ void _onPauseStateChange() {}
+
+ /**
+ * Called when the first listener subscribes or the last unsubscribes.
+ *
+ * Read [hasSubscribers] to see what the new state is.
+ */
+ void _onSubscriptionStateChange() {}
+
+ /** Add a pending event at the end of the pending event queue. */
+ void _addPendingEvent(_DelayedEvent event) {
+ if (_pendingEvents == null) _pendingEvents = new _StreamImplEvents();
+ _pendingEvents.add(event);
+ }
+
+ /** Fire any pending events until the pending event queue. */
+ void _handlePendingEvents() {
+ _StreamImplEvents events = _pendingEvents;
+ if (events == null) return;
+ while (!events.isEmpty && !_isPaused) {
+ events.removeFirst().perform(this);
+ }
+ }
+
+ /**
+ * Send a data event directly to each subscriber.
+ */
+ _sendData(T value) {
+ assert(!_isPaused);
+ assert(!_isComplete);
+ _forEachSubscriber((subscriber) {
+ try {
+ subscriber._sendData(value);
+ } catch (e, s) {
+ new AsyncError(e, s).throwDelayed();
+ }
+ });
+ }
+
+ /**
+ * Sends an error event directly to each subscriber.
+ */
+ void _sendError(AsyncError error) {
+ assert(!_isPaused);
+ assert(!_isComplete);
+ _forEachSubscriber((subscriber) {
+ try {
+ subscriber._sendError(error);
+ } catch (e, s) {
+ new AsyncError.withCause(e, s, error).throwDelayed();
+ }
+ });
+ }
+
+ /**
+ * Sends the "done" message directly to each subscriber.
+ * This automatically stops further subscription and
+ * unsubscribes all subscribers.
+ */
+ void _sendDone() {
+ assert(!_isPaused);
+ assert(_isClosed);
+ _setComplete();
+ if (!_hasSubscribers) return;
+ _forEachSubscriber((subscriber) {
+ _cancel(subscriber);
+ try {
+ subscriber._sendDone();
+ } catch (e, s) {
+ new AsyncError(e, s).throwDelayed();
+ }
+ });
+ assert(!_hasSubscribers);
+ _onSubscriptionStateChange();
+ }
+}
+
+// -------------------------------------------------------------------
+// Default implementation of a stream with a single subscriber.
+// -------------------------------------------------------------------
+/**
+ * Default implementation of stream capable of sending events to one subscriber.
+ *
+ * Any class needing to implement [Stream] can either directly extend this
+ * class, or extend [Stream] and delegate the subscribe method to an instance
+ * of this class.
+ *
+ * The only public methods are those of [Stream], so instances of
+ * [_SingleStreamImpl] can be returned directly as a [Stream] without exposing
+ * internal functionality.
+ *
+ * The [StreamController] is a public facing version of this class, with
+ * some methods made public.
+ *
+ * The user interface of [_SingleStreamImpl] are the following methods:
+ * * [_add]: Add a data event to the stream.
+ * * [_signalError]: Add an error event to the stream.
+ * * [_close]: Request to close the stream.
+ * * [_onSubscriberStateChange]: Called when receiving the first subscriber or
+ * when losing the last subscriber.
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode.
+ * * [_hasSubscribers]: Test whether there are currently any subscribers.
+ * * [_isPaused]: Test whether the stream is currently paused.
+ * The user should not add new events while the stream is paused, but if it
+ * happens anyway, the stream will enqueue the events just as when new events
+ * arrive while still firing an old event.
+ */
+class _SingleStreamImpl<T> extends _StreamImpl<T> {
+ _StreamSubscriptionImpl _subscriber = null;
+
+ /** Whether one or more active subscribers have requested a pause. */
+ bool get _isPaused => !_hasSubscribers || super._isPaused;
+
+ /** Whether there is currently a subscriber on this [Stream]. */
+ bool get _hasSubscribers => _subscriber != null;
+
+ // -------------------------------------------------------------------
+ // Internal implementation.
+
+ /**
+ * Create the new subscription object.
+ */
+ _StreamSubscriptionImpl<T> _createSubscription(
+ void onData(T data),
+ void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError) {
+ return new _StreamSubscriptionImpl<T>(
+ this, onData, onError, onDone, unsubscribeOnError);
+ }
+
+ void _addListener(_StreamSubscriptionImpl subscription) {
+ if (_hasSubscribers) {
+ throw new StateError("Stream has already subscriber.");
+ }
+ _subscriber = subscription;
+ subscription._setSubscribed(0);
+ _onSubscriptionStateChange();
+ // TODO(floitsch): Should this be delayed?
+ _handlePendingEvents();
+ }
+
+ /**
+ * Handle a cancel requested from a [_StreamSubscriptionImpl].
+ *
+ * This method is called from [_StreamSubscriptionImpl.cancel].
+ *
+ * If an event is currently firing, the cancel is delayed
+ * until after the subscriber has received the event.
+ */
+ void _cancel(_StreamSubscriptionImpl subscriber) {
+ assert(identical(subscriber._source, this));
+ // We allow unsubscribing the currently firing subscription during
+ // the event firing, because it is indistinguishable from delaying it since
+ // that event has already received the event.
+ if (!identical(_subscriber, subscriber)) {
+ // You may unsubscribe more than once, only the first one counts.
+ return;
+ }
+ _subscriber = null;
+ int timesPaused = subscriber._setUnsubscribed();
+ _updatePauseCount(-timesPaused);
+ if (timesPaused > 0) {
+ _onPauseStateChange();
+ }
+ _onSubscriptionStateChange();
+ }
+
+ void _forEachSubscriber(
+ void action(_StreamSubscriptionImpl<T> subscription)) {
+ _StreamSubscriptionImpl subscription = _subscriber;
+ assert(subscription != null);
+ _startFiring();
+ action(subscription);
+ _endFiring();
+ }
+}
+
+// -------------------------------------------------------------------
+// Default implementation of a stream with subscribers.
+// -------------------------------------------------------------------
+
+/**
+ * Default implementation of stream capable of sending events to subscribers.
+ *
+ * Any class needing to implement [Stream] can either directly extend this
+ * class, or extend [Stream] and delegate the subscribe method to an instance
+ * of this class.
+ *
+ * The only public methods are those of [Stream], so instances of
+ * [_MultiStreamImpl] can be returned directly as a [Stream] without exposing
+ * internal functionality.
+ *
+ * The [StreamController] is a public facing version of this class, with
+ * some methods made public.
+ *
+ * The user interface of [_MultiStreamImpl] are the following methods:
+ * * [_add]: Add a data event to the stream.
+ * * [_signalError]: Add an error event to the stream.
+ * * [_close]: Request to close the stream.
+ * * [_onSubscriptionStateChange]: Called when receiving the first subscriber or
+ * when losing the last subscriber.
+ * * [_onPauseStateChange]: Called when entering or leaving paused mode.
+ * * [_hasSubscribers]: Test whether there are currently any subscribers.
+ * * [_isPaused]: Test whether the stream is currently paused.
+ * The user should not add new events while the stream is paused, but if it
+ * happens anyway, the stream will enqueue the events just as when new events
+ * arrive while still firing an old event.
+ */
+class _MultiStreamImpl<T> extends _StreamImpl<T>
+ implements _InternalLinkList {
+ // Link list implementation (mixin when possible).
+ _InternalLink _nextLink;
+ _InternalLink _previousLink;
+
+ _MultiStreamImpl() {
+ _nextLink = _previousLink = this;
+ }
+
+ // ------------------------------------------------------------------
+ // Helper functions that can be overridden in subclasses.
+
+ /** Whether there are currently any subscribers on this [Stream]. */
+ bool get _hasSubscribers => !_InternalLinkList.isEmpty(this);
+
+ /**
+ * Create the new subscription object.
+ */
+ _StreamListener<T> _createSubscription(
+ void onData(T data),
+ void onError(AsyncError error),
+ void onDone(),
+ bool unsubscribeOnError) {
+ return new _StreamSubscriptionImpl<T>(
+ this, onData, onError, onDone, unsubscribeOnError);
+ }
+
+ // -------------------------------------------------------------------
+ // Internal implementation.
+
+ /**
+ * Iterate over all current subscribers and perform an action on each.
+ *
+ * The set of subscribers cannot be modified during this iteration.
+ * All attempts to add or unsubscribe subscribers will be delayed until
+ * after the iteration is complete.
+ *
+ * The [action] must not throw, or the controller will be left in an
+ * invalid state.
+ *
+ * This method must not be called while [isFiring] is true.
+ */
+ void _forEachSubscriber(
+ void action(_StreamListener<T> subscription)) {
+ assert(!_isFiring);
+ if (!_hasSubscribers) return;
+ _startFiring();
+ _InternalLink cursor = this._nextLink;
+ while (!identical(cursor, this)) {
+ _StreamListener<T> current = cursor;
+ if (current._needsEvent(_currentEventIdBit)) {
+ action(current);
+ // Marks as having received the event.
+ current._toggleEventReceived();
+ }
+ cursor = current._nextLink;
+ if (current._isPendingUnsubscribe) {
+ _removeListener(current);
+ }
+ }
+ _endFiring();
+ if (_isPaused) _onPauseStateChange();
+ if (!_hasSubscribers) _onSubscriptionStateChange();
+ }
+
+ void _addListener(_StreamListener listener) {
+ listener._setSubscribed(_currentEventIdBit);
+ bool firstSubscriber = !_hasSubscribers;
+ _InternalLinkList.add(this, listener);
+ if (firstSubscriber) {
+ _onSubscriptionStateChange();
+ }
+ }
+
+ /**
+ * Handle a cancel requested from a [_StreamListener].
+ *
+ * This method is called from [_StreamListener.cancel].
+ *
+ * If an event is currently firing, the cancel is delayed
+ * until after the subscribers have received the event.
+ */
+ void _cancel(_StreamListener listener) {
+ assert(identical(listener._source, this));
+ if (_InternalLink.isUnlinked(listener)) {
+ // You may unsubscribe more than once, only the first one counts.
+ return;
+ }
+ if (_isFiring) {
+ if (listener._needsEvent(_currentEventIdBit)) {
+ assert(listener._isSubscribed);
+ listener._setPendingUnsubscribe();
+ } else {
+ // The listener has been notified of the event (or don't need to,
+ // if it's still pending subscription) so it's safe to remove it.
+ _removeListener(listener);
+ }
+ // Pause and subscription state changes are reported when we end
+ // firing.
+ } else {
+ bool wasPaused = _isPaused;
+ _removeListener(listener);
+ if (wasPaused != _isPaused) _onPauseStateChange();
+ if (!_hasSubscribers) _onSubscriptionStateChange();
+ }
+ }
+
+ /**
+ * Removes a listener from this stream and cancels its pauses.
+ *
+ * This is a low-level action that doesn't call [_onSubscriptionStateChange].
+ * or [_onPauseStateChange].
+ */
+ void _removeListener(_StreamListener listener) {
+ int pauseCount = listener._setUnsubscribed();
+ _updatePauseCount(-pauseCount);
+ _InternalLinkList.remove(listener);
+ }
+}
+
+/**
+ * The subscription class that the [StreamController] uses.
+ *
+ * The [StreamController.createSubscription] method should
+ * create an object of this type, or another subclass of [_StreamListener].
+ * A subclass of [StreamController] can specify which subclass
+ * of [_StreamSubscriptionImpl] it uses by overriding
+ * [StreamController.createSubscription].
+ *
+ * The subscription is in one of three states:
+ * * Subscribed.
+ * * Paused-and-subscribed.
+ * * Unsubscribed.
+ * Unsubscribing also unpauses.
+ */
+class _StreamSubscriptionImpl<T> extends _StreamListener<T>
+ implements StreamSubscription<T> {
+ final bool _unsubscribeOnError;
+ _DataHandler _onData;
+ _ErrorHandler _onError;
+ _DoneHandler _onDone;
+ _StreamSubscriptionImpl(_StreamImpl source,
+ this._onData,
+ this._onError,
+ this._onDone,
+ this._unsubscribeOnError) : super(source);
+
+ void onData(void handleData(T event)) {
+ if (handleData == null) handleData = _nullDataHandler;
+ _onData = handleData;
+ }
+
+ void onError(void handleError(AsyncError error)) {
+ if (handleError == null) handleError = _nullErrorHandler;
+ _onError = handleError;
+ }
+
+ void onDone(void handleDone()) {
+ if (handleDone == null) handleDone = _nullDoneHandler;
+ _onDone = handleDone;
+ }
+
+ void _sendData(T data) {
+ _onData(data);
+ }
+
+ void _sendError(AsyncError error) {
+ _onError(error);
+ if (_unsubscribeOnError) _source._cancel(this);
+ }
+
+ void _sendDone() {
+ _onDone();
+ }
+
+ void cancel() {
+ _source._cancel(this);
+ }
+
+ void pause([Signal resumeSignal]) {
+ _source._pause(this, resumeSignal);
+ }
+
+ void resume() {
+ if (!isPaused) {
+ throw new StateError("Resuming unpaused subscription");
+ }
+ _source._resume(this, false);
+ }
+}
+
+// Internal helpers.
+
+// Types of the different handlers on a stream. Types used to type fields.
+typedef void _DataHandler<T>(T value);
+typedef void _ErrorHandler(AsyncError error);
+typedef void _DoneHandler();
+
+
+/** Default data handler, does nothing. */
+void _nullDataHandler(var value) {}
+
+/** Default error handler, reports the error to the global handler. */
+void _nullErrorHandler(AsyncError error) {
+ error.throwDelayed();
+}
+
+/** Default done handler, does nothing. */
+void _nullDoneHandler() {}
+
+
+/** A delayed event on a stream implementation. */
+abstract class _DelayedEvent {
+ /** Added as a linked list on the [StreamController]. */
+ _DelayedEvent next;
+ /** Execute the delayed event on the [StreamController]. */
+ void perform(_StreamImpl stream);
+}
+
+/** A delayed data event. */
+class _DelayedData<T> extends _DelayedEvent{
+ T value;
+ _DelayedData(this.value);
+ void perform(_StreamImpl<T> stream) {
+ stream._sendData(value);
+ }
+}
+
+/** A delayed error event. */
+class _DelayedError extends _DelayedEvent {
+ AsyncError error;
+ _DelayedError(this.error);
+ void perform(_StreamImpl stream) {
+ stream._sendError(error);
+ }
+}
+
+/** A delayed done event. */
+class _DelayedDone implements _DelayedEvent {
+ const _DelayedDone();
+ void perform(_StreamImpl stream) {
+ stream._sendDone();
+ }
+
+ _DelayedEvent get next => null;
+
+ void set next(_DelayedEvent _) {
+ throw new StateError("No events after a done.");
+ }
+}
+
+/**
+ * Simple internal doubly-linked list implementation.
+ *
+ * In an internal linked list, the links are in the data objects themselves,
+ * instead of in a separate object. That means each element can be in at most
+ * one list at a time.
+ *
+ * All links are always members of an element cycle. At creation it's a
+ * singleton cycle.
+ */
+abstract class _InternalLink {
+ _InternalLink _nextLink;
+ _InternalLink _previousLink;
+
+ _InternalLink() {
+ this._previousLink = this._nextLink = this;
+ }
+
+ /* Removes a link from any list it may be part of, and links it to itself. */
+ static void unlink(_InternalLink element) {
+ _InternalLink next = element._nextLink;
+ _InternalLink previous = element._previousLink;
+ next._previousLink = previous;
+ previous._nextLink = next;
+ element._nextLink = element._previousLink = element;
+ }
+
+ /** Check whether an element is unattached to other elements. */
+ static bool isUnlinked(_InternalLink element) {
+ return identical(element, element._nextLink);
+ }
+}
+
+/**
+ * Marker interface for "list" links.
+ *
+ * An "InternalLinkList" is an abstraction on top of a link cycle, where the
+ * "list" object itself is not considered an element (it's just a header link
+ * created to avoid edge cases).
+ * An element is considered part of a list if it is in the list's cycle.
+ * There should never be more than one "list" object in a cycle.
+ */
+abstract class _InternalLinkList extends _InternalLink {
+ /**
+ * Adds an element to a list, just before the header link.
+ *
+ * This effectively adds it at the end of the list.
+ */
+ static void add(_InternalLinkList list, _InternalLink element) {
+ if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
+ _InternalLink listEnd = list._previousLink;
+ listEnd._nextLink = element;
+ list._previousLink = element;
+ element._previousLink = listEnd;
+ element._nextLink = list;
+ }
+
+ /** Removes an element from its list. */
+ static void remove(_InternalLink element) {
+ _InternalLink.unlink(element);
+ }
+
+ /** Check whether a list contains no elements, only the header link. */
+ static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
+
+ /** Moves all elements from the list [other] to [list]. */
+ static void addAll(_InternalLinkList list, _InternalLinkList other) {
+ if (isEmpty(other)) return;
+ _InternalLink listLast = list._previousLink;
+ _InternalLink otherNext = other._nextLink;
+ listLast._nextLink = otherNext;
+ otherNext._previousLink = listLast;
+ _InternalLink otherLast = other._previousLink;
+ list._previousLink = otherLast;
+ otherLast._nextLink = list;
+ // Clean up [other].
+ other._nextLink = other._previousLink = other;
+ }
+}
+
+abstract class _StreamListener<T> extends _InternalLink {
+ final _StreamImpl _source;
+ int _state = _LISTENER_UNSUBSCRIBED;
+
+ _StreamListener(this._source);
+
+ bool get isPaused => _state >= (1 << _LISTENER_PAUSE_COUNT_SHIFT);
+
+ bool get _isPendingUnsubscribe =>
+ (_state & _LISTENER_PENDING_UNSUBSCRIBE) != 0;
+
+ bool get _isSubscribed => (_state & _LISTENER_SUBSCRIBED) != 0;
+
+ /**
+ * Whether the listener still needs to receive the currently firing event.
+ *
+ * The currently firing event is identified by a single bit, which alternates
+ * between events. The [_state] contains the previously sent event's bit in
+ * the [_LISTENER_EVENT_ID] bit. If the two don't match, this listener
+ * still need the current event.
+ */
+ bool _needsEvent(int currentEventIdBit) {
+ int lastEventIdBit =
+ (_state & _LISTENER_EVENT_ID) >> _LISTENER_EVENT_ID_SHIFT;
+ return lastEventIdBit != currentEventIdBit;
+ }
+
+ /// If a subscriber's "firing bit" doesn't match the stream's firing bit,
+ /// we are currently firing an event and the subscriber still need to receive
+ /// the event.
+ void _toggleEventReceived() {
+ _state ^= _LISTENER_EVENT_ID;
+ }
+
+ void _setSubscribed(int eventIdBit) {
+ assert(eventIdBit == 0 || eventIdBit == 1);
+ _state = _LISTENER_SUBSCRIBED | (eventIdBit << _LISTENER_EVENT_ID_SHIFT);
+ }
+
+ void _setPendingUnsubscribe() {
+ assert(_isSubscribed);
+ _state |= _LISTENER_PENDING_UNSUBSCRIBE;
+ }
+
+ /**
+ * Marks the listener as unsubscibed.
+ *
+ * Returns the number of unresumed pauses for the listener.
+ */
+ int _setUnsubscribed() {
+ assert(_isSubscribed);
+ int timesPaused = _state >> _LISTENER_PAUSE_COUNT_SHIFT;
+ _state = _LISTENER_UNSUBSCRIBED;
+ return timesPaused;
+ }
+
+ void _incrementPauseCount() {
+ _state += 1 << _LISTENER_PAUSE_COUNT_SHIFT;
+ }
+
+ void _decrementPauseCount() {
+ assert(isPaused);
+ _state -= 1 << _LISTENER_PAUSE_COUNT_SHIFT;
+ }
+
+ _sendData(T data);
+ _sendError(AsyncError error);
+ _sendDone();
+}
+
+/** Class holding pending events for a [_StreamImpl]. */
+class _StreamImplEvents {
+ /// Single linked list of [_DelayedEvent] objects.
+ _DelayedEvent firstPendingEvent = null;
+ /// Last element in the list of pending events. New events are added after it.
+ _DelayedEvent lastPendingEvent = null;
+ /**
+ * Timer set when pending events are scheduled for execution.
+ *
+ * When scheduling pending events for execution in a later cycle, the timer
+ * is stored here. If pending events are executed earlier than that, e.g.,
+ * due to a second event in the current cycle, the timer is canceled again.
+ */
+ Timer scheduleTimer = null;
+
+ bool get isEmpty => lastPendingEvent == null;
+
+ bool get isScheduled => scheduleTimer != null;
+
+ void schedule(_StreamImpl stream) {
+ if (isScheduled) return;
+ scheduleTimer = new Timer(0, (_) {
+ scheduleTimer = null;
+ stream._handlePendingEvents();
+ });
+ }
+
+ void cancelSchedule() {
+ assert(isScheduled);
+ scheduleTimer.cancel();
+ scheduleTimer = null;
+ }
+
+ void add(_DelayedEvent event) {
+ if (lastPendingEvent == null) {
+ firstPendingEvent = lastPendingEvent = event;
+ } else {
+ lastPendingEvent = lastPendingEvent.next = event;
+ }
+ }
+
+ _DelayedEvent removeFirst() {
+ if (isScheduled) cancelSchedule();
+ _DelayedEvent event = firstPendingEvent;
+ firstPendingEvent = event.next;
+ if (firstPendingEvent == null) {
+ lastPendingEvent = null;
+ }
+ return event;
+ }
+}
+
+
+class _DoneSubscription<T> implements StreamSubscription<T> {
+ _DoneHandler _handler;
+ Timer _timer;
+ int _pauseCount = 0;
+
+ _DoneSubscription(this._handler) {
+ _delayDone();
+ }
+
+ void _delayDone() {
+ assert(_timer == null && _pauseCount == 0);
+ _timer = new Timer(0, (_) {
+ if (_handler != null) _handler();
+ });
+ }
+
+ bool get _isComplete => _timer == null && _pauseCount == 0;
+
+ void onData(void handleAction(T value)) {}
+ void onError(void handleError(StateError error)) {}
+ void onDone(void handleDone(T value)) {
+ _handler = handleDone;
+ }
+
+ void pause([Signal signal]) {
+ if (_isComplete) {
+ throw new StateError("Subscription has been canceled.");
+ }
+ if (_timer != null) _timer.cancel();
+ _pauseCount++;
+ }
+
+ void resume() {
+ if (_isComplete) {
+ throw new StateError("Subscription has been canceled.");
+ }
+ if (_pauseCount == 0) return;
+ _pauseCount--;
+ if (_pauseCount == 0) {
+ _delayDone();
+ }
+ }
+
+ bool get isPaused => _pauseCount > 0;
+
+ void cancel() {
+ if (_isComplete) {
+ throw new StateError("Subscription has been canceled.");
+ }
+ if (_timer != null) {
+ _timer.cancel();
+ _timer = null;
+ }
+ _pauseCount = 0;
+ }
+}
« no previous file with comments | « sdk/lib/async/stream_controller.dart ('k') | sdk/lib/async/stream_pipe.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698