| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index 5987050a586da99de0d00b8f31a99d06c08531ad..375844cdcdf535ce2d3a502714e3fc4363e56a59 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -598,86 +598,6 @@ class _DelayedDone implements _DelayedEvent {
|
| }
|
| }
|
|
|
| -/**
|
| - * Simple internal doubly-linked list implementation.
|
| - *
|
| - * In an internal linked list, the links are in the data objects themselves,
|
| - * instead of in a separate object. That means each element can be in at most
|
| - * one list at a time.
|
| - *
|
| - * All links are always members of an element cycle. At creation it's a
|
| - * singleton cycle.
|
| - */
|
| -abstract class _InternalLink {
|
| - _InternalLink _nextLink;
|
| - _InternalLink _previousLink;
|
| -
|
| - _InternalLink() {
|
| - this._previousLink = this._nextLink = this;
|
| - }
|
| -
|
| - /* Removes a link from any list it may be part of, and links it to itself. */
|
| - static void unlink(_InternalLink element) {
|
| - _InternalLink next = element._nextLink;
|
| - _InternalLink previous = element._previousLink;
|
| - next._previousLink = previous;
|
| - previous._nextLink = next;
|
| - element._nextLink = element._previousLink = element;
|
| - }
|
| -
|
| - /** Check whether an element is unattached to other elements. */
|
| - static bool isUnlinked(_InternalLink element) {
|
| - return identical(element, element._nextLink);
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * Marker interface for "list" links.
|
| - *
|
| - * An "InternalLinkList" is an abstraction on top of a link cycle, where the
|
| - * "list" object itself is not considered an element (it's just a header link
|
| - * created to avoid edge cases).
|
| - * An element is considered part of a list if it is in the list's cycle.
|
| - * There should never be more than one "list" object in a cycle.
|
| - */
|
| -abstract class _InternalLinkList extends _InternalLink {
|
| - /**
|
| - * Adds an element to a list, just before the header link.
|
| - *
|
| - * This effectively adds it at the end of the list.
|
| - */
|
| - static void add(_InternalLinkList list, _InternalLink element) {
|
| - if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element);
|
| - _InternalLink listEnd = list._previousLink;
|
| - listEnd._nextLink = element;
|
| - list._previousLink = element;
|
| - element._previousLink = listEnd;
|
| - element._nextLink = list;
|
| - }
|
| -
|
| - /** Removes an element from its list. */
|
| - static void remove(_InternalLink element) {
|
| - _InternalLink.unlink(element);
|
| - }
|
| -
|
| - /** Check whether a list contains no elements, only the header link. */
|
| - static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list);
|
| -
|
| - /** Moves all elements from the list [other] to [list]. */
|
| - static void addAll(_InternalLinkList list, _InternalLinkList other) {
|
| - if (isEmpty(other)) return;
|
| - _InternalLink listLast = list._previousLink;
|
| - _InternalLink otherNext = other._nextLink;
|
| - listLast._nextLink = otherNext;
|
| - otherNext._previousLink = listLast;
|
| - _InternalLink otherLast = other._previousLink;
|
| - list._previousLink = otherLast;
|
| - otherLast._nextLink = list;
|
| - // Clean up [other].
|
| - other._nextLink = other._previousLink = other;
|
| - }
|
| -}
|
| -
|
| /** Superclass for provider of pending events. */
|
| abstract class _PendingEvents {
|
| // No async event has been scheduled.
|
| @@ -793,283 +713,42 @@ class _MultiplexerLinkedList {
|
| }
|
| }
|
|
|
| -/**
|
| - * A subscription used by [_SingleStreamMultiplexer].
|
| - *
|
| - * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple
|
| - * listeners at a time. It is used to implement [Stream.asBroadcastStream].
|
| - *
|
| - * It is itself listening to another stream for events, and it forwards all
|
| - * events to all of its simultanous listeners.
|
| - *
|
| - * The listeners are [_MultiplexerSubscription]s and are kept as a linked list.
|
| - */
|
| -// TODO(lrn): Change "implements" to "with" when automatic mixin constructors
|
| -// are implemented.
|
| -class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T>
|
| - implements _MultiplexerLinkedList {
|
| - static const int _STATE_NOT_LISTENING = 0;
|
| - // Bit that alternates between event firings. If bit matches the one currently
|
| - // firing, the subscription will not be notified.
|
| - static const int _STATE_EVENT_ID_BIT = 1;
|
| - // Whether the subscription is listening at all. This should be set while
|
| - // it is part of the linked list of listeners of a multiplexer stream.
|
| - static const int _STATE_LISTENING = 2;
|
| - // State bit set while firing an event.
|
| - static const int _STATE_IS_FIRING = 4;
|
| - // Bit set if a subscription is canceled while it's firing (the
|
| - // [_STATE_IS_FIRING] bit is set).
|
| - // If the subscription is canceled while firing, it is not removed from the
|
| - // linked list immediately (to avoid breaking iteration), but is instead
|
| - // removed after it is done firing.
|
| - static const int _STATE_REMOVE_AFTER_FIRING = 8;
|
| -
|
| - // Firing state.
|
| - int _multiplexState;
|
| -
|
| - _SingleStreamMultiplexer _source;
|
| -
|
| - _MultiplexerSubscription(this._source,
|
| - void onData(T data),
|
| - void onError(Object error),
|
| - void onDone(),
|
| - bool cancelOnError,
|
| - int nextEventId)
|
| - : _multiplexState = _STATE_LISTENING | nextEventId,
|
| - super(onData, onError, onDone, cancelOnError) {
|
| - _next = _previous = this;
|
| - }
|
| -
|
| - // Mixin workaround.
|
| - _MultiplexerLinkedList _next;
|
| - _MultiplexerLinkedList _previous;
|
| -
|
| - void _unlink() {
|
| - _previous._next = _next;
|
| - _next._previous = _previous;
|
| - _next = _previous = this;
|
| - }
|
| -
|
| - void _insertBefore(_MultiplexerLinkedList newNext) {
|
| - _MultiplexerLinkedList newPrevious = newNext._previous;
|
| - newPrevious._next = this;
|
| - newNext._previous = _previous;
|
| - _previous._next = newNext;
|
| - _previous = newPrevious;
|
| - }
|
| - // End mixin.
|
| -
|
| - bool get _isListening => _multiplexState >= _STATE_LISTENING;
|
| - bool get _isFiring => _multiplexState >= _STATE_IS_FIRING;
|
| - bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING;
|
| - int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT;
|
| -
|
| - void _setRemoveAfterFiring() {
|
| - assert(_isFiring);
|
| - _multiplexState |= _STATE_REMOVE_AFTER_FIRING;
|
| - }
|
| -
|
| - void _startFiring() {
|
| - assert(!_isFiring);
|
| - _multiplexState |= _STATE_IS_FIRING;
|
| - }
|
| -
|
| - /// Marks listener as no longer firing, and toggles its event id.
|
| - void _endFiring() {
|
| - assert(_isFiring);
|
| - _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT);
|
| - }
|
| -
|
| - void _setNotListening() {
|
| - assert(_isListening);
|
| - _multiplexState = _STATE_NOT_LISTENING;
|
| - }
|
| -
|
| - void _onCancel() {
|
| - assert(_isListening);
|
| - _source._removeListener(this);
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * A stream that sends events from another stream to multiple listeners.
|
| - *
|
| - * This is used to implement [Stream.asBroadcastStream].
|
| - *
|
| - * This stream allows listening more than once.
|
| - * When the first listener is added, it starts listening on its source
|
| - * stream for events. All events from the source stream are sent to all
|
| - * active listeners. The listeners handle their own buffering.
|
| - * When the last listener cancels, the source stream subscription is also
|
| - * canceled, and no further listening is possible.
|
| - */
|
| -// TODO(lrn): change "implements" to "with" when the VM supports it.
|
| -class _SingleStreamMultiplexer<T> extends Stream<T>
|
| - implements _MultiplexerLinkedList,
|
| - _EventDispatch<T> {
|
| +class _AsBroadcastStream<T> extends Stream<T> {
|
| final Stream<T> _source;
|
| + _BufferingMultiplexStreamController<T> _controller;
|
| StreamSubscription<T> _subscription;
|
| - // Alternates between zero and one for each event fired.
|
| - // Listeners are initialized with the next event's id, and will
|
| - // only be notified if they match the event being fired.
|
| - // That way listeners added during event firing will not receive
|
| - // the current event.
|
| - int _eventId = 0;
|
| -
|
| - bool _isFiring = false;
|
| -
|
| - // Remember events added while firing.
|
| - _StreamImplEvents _pending;
|
| -
|
| - _SingleStreamMultiplexer(this._source) {
|
| - _next = _previous = this;
|
| - }
|
|
|
| - bool get _hasPending => _pending != null && !_pending.isEmpty;
|
| -
|
| - // Should be mixin.
|
| - _MultiplexerLinkedList _next;
|
| - _MultiplexerLinkedList _previous;
|
| -
|
| - void _unlink() {
|
| - _previous._next = _next;
|
| - _next._previous = _previous;
|
| - _next = _previous = this;
|
| + _AsBroadcastStream(this._source) {
|
| + _controller = new _BufferingMultiplexStreamController<T>(null, _onCancel);
|
| }
|
|
|
| - void _insertBefore(_MultiplexerLinkedList newNext) {
|
| - _MultiplexerLinkedList newPrevious = newNext._previous;
|
| - newPrevious._next = this;
|
| - newNext._previous = _previous;
|
| - _previous._next = newNext;
|
| - _previous = newPrevious;
|
| - }
|
| - // End of mixin.
|
| + bool get isBroadcast => true;
|
|
|
| StreamSubscription<T> listen(void onData(T data),
|
| { void onError(Object error),
|
| void onDone(),
|
| - bool cancelOnError }) {
|
| - if (onData == null) onData = _nullDataHandler;
|
| - if (onError == null) onError = _nullErrorHandler;
|
| - if (onDone == null) onDone = _nullDoneHandler;
|
| - cancelOnError = identical(true, cancelOnError);
|
| - _MultiplexerSubscription subscription =
|
| - new _MultiplexerSubscription(this, onData, onError, onDone,
|
| - cancelOnError, _eventId);
|
| - if (_subscription == null) {
|
| - _subscription = _source.listen(_add, onError: _addError, onDone: _close);
|
| + bool cancelOnError}) {
|
| + if (_controller == null) {
|
| + throw new StateError("Source stream has been closed.");
|
| }
|
| - subscription._insertBefore(this);
|
| - return subscription;
|
| - }
|
| -
|
| - /** Called by [_MultiplexerSubscription.remove]. */
|
| - void _removeListener(_MultiplexerSubscription listener) {
|
| - if (listener._isFiring) {
|
| - listener._setRemoveAfterFiring();
|
| - } else {
|
| - _unlinkListener(listener);
|
| - }
|
| - }
|
| -
|
| - /** Remove a listener and close the subscription after the last one. */
|
| - void _unlinkListener(_MultiplexerSubscription listener) {
|
| - listener._setNotListening();
|
| - listener._unlink();
|
| - if (identical(_next, this)) {
|
| - // Last listener removed.
|
| - _cancel();
|
| + if (_subscription == null) {
|
| + _subscription = _source.listen(_controller.add,
|
| + onError: _controller.addError,
|
| + onDone: _controller.close);
|
| }
|
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone,
|
| + cancelOnError: cancelOnError);
|
| }
|
|
|
| - void _cancel() {
|
| + void _onCancel() {
|
| + // Called by [_controller] when it has no subscribers left.
|
| StreamSubscription subscription = _subscription;
|
| _subscription = null;
|
| + _controller = null; // Marks the stream as no longer listenable.
|
| subscription.cancel();
|
| - if (_pending != null) _pending.cancelSchedule();
|
| - }
|
| -
|
| - void _forEachListener(void action(_MultiplexerSubscription listener)) {
|
| - int eventId = _eventId;
|
| - _eventId ^= 1;
|
| - _isFiring = true;
|
| - _MultiplexerLinkedList entry = _next;
|
| - // Call each listener in order. A listener can be removed during any
|
| - // other listener's event. During its own event it will only be marked
|
| - // as "to be removed", and it will be handled after the event is done.
|
| - while (!identical(entry, this)) {
|
| - _MultiplexerSubscription listener = entry;
|
| - if (listener._eventId == eventId) {
|
| - listener._startFiring();
|
| - action(listener);
|
| - listener._endFiring(); // Also toggles the event id.
|
| - }
|
| - entry = listener._next;
|
| - if (listener._removeAfterFiring) {
|
| - _unlinkListener(listener);
|
| - }
|
| - }
|
| - _isFiring = false;
|
| - }
|
| -
|
| - void _add(T data) {
|
| - if (_isFiring || _hasPending) {
|
| - _StreamImplEvents pending = _pending;
|
| - if (pending == null) pending = _pending = new _StreamImplEvents();
|
| - pending.add(new _DelayedData(data));
|
| - } else {
|
| - _sendData(data);
|
| - }
|
| - }
|
| -
|
| - void _addError(Object error) {
|
| - if (_isFiring || _hasPending) {
|
| - _StreamImplEvents pending = _pending;
|
| - if (pending == null) pending = _pending = new _StreamImplEvents();
|
| - pending.add(new _DelayedError(error));
|
| - } else {
|
| - _sendError(error);
|
| - }
|
| - }
|
| -
|
| - void _close() {
|
| - if (_isFiring || _hasPending) {
|
| - _StreamImplEvents pending = _pending;
|
| - if (pending == null) pending = _pending = new _StreamImplEvents();
|
| - pending.add(const _DelayedDone());
|
| - } else {
|
| - _sendDone();
|
| - }
|
| - }
|
| -
|
| - void _sendData(T data) {
|
| - _forEachListener((_MultiplexerSubscription listener) {
|
| - listener._add(data);
|
| - });
|
| - if (_hasPending) {
|
| - _pending.schedule(this);
|
| - }
|
| - }
|
| -
|
| - void _sendError(Object error) {
|
| - _forEachListener((_MultiplexerSubscription listener) {
|
| - listener._addError(error);
|
| - });
|
| - if (_hasPending) {
|
| - _pending.schedule(this);
|
| - }
|
| - }
|
| -
|
| - void _sendDone() {
|
| - _forEachListener((_MultiplexerSubscription listener) {
|
| - listener._setRemoveAfterFiring();
|
| - listener._close();
|
| - });
|
| }
|
| }
|
|
|
| -
|
| /**
|
| * Simple implementation of [StreamIterator].
|
| */
|
|
|