Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 5987050a586da99de0d00b8f31a99d06c08531ad..5990ca52ce6061a0791315722bb24dadea33c329 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -598,86 +598,6 @@ class _DelayedDone implements _DelayedEvent { |
| } |
| } |
| -/** |
| - * Simple internal doubly-linked list implementation. |
| - * |
| - * In an internal linked list, the links are in the data objects themselves, |
| - * instead of in a separate object. That means each element can be in at most |
| - * one list at a time. |
| - * |
| - * All links are always members of an element cycle. At creation it's a |
| - * singleton cycle. |
| - */ |
| -abstract class _InternalLink { |
| - _InternalLink _nextLink; |
| - _InternalLink _previousLink; |
| - |
| - _InternalLink() { |
| - this._previousLink = this._nextLink = this; |
| - } |
| - |
| - /* Removes a link from any list it may be part of, and links it to itself. */ |
| - static void unlink(_InternalLink element) { |
| - _InternalLink next = element._nextLink; |
| - _InternalLink previous = element._previousLink; |
| - next._previousLink = previous; |
| - previous._nextLink = next; |
| - element._nextLink = element._previousLink = element; |
| - } |
| - |
| - /** Check whether an element is unattached to other elements. */ |
| - static bool isUnlinked(_InternalLink element) { |
| - return identical(element, element._nextLink); |
| - } |
| -} |
| - |
| -/** |
| - * Marker interface for "list" links. |
| - * |
| - * An "InternalLinkList" is an abstraction on top of a link cycle, where the |
| - * "list" object itself is not considered an element (it's just a header link |
| - * created to avoid edge cases). |
| - * An element is considered part of a list if it is in the list's cycle. |
| - * There should never be more than one "list" object in a cycle. |
| - */ |
| -abstract class _InternalLinkList extends _InternalLink { |
| - /** |
| - * Adds an element to a list, just before the header link. |
| - * |
| - * This effectively adds it at the end of the list. |
| - */ |
| - static void add(_InternalLinkList list, _InternalLink element) { |
| - if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element); |
| - _InternalLink listEnd = list._previousLink; |
| - listEnd._nextLink = element; |
| - list._previousLink = element; |
| - element._previousLink = listEnd; |
| - element._nextLink = list; |
| - } |
| - |
| - /** Removes an element from its list. */ |
| - static void remove(_InternalLink element) { |
| - _InternalLink.unlink(element); |
| - } |
| - |
| - /** Check whether a list contains no elements, only the header link. */ |
| - static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list); |
| - |
| - /** Moves all elements from the list [other] to [list]. */ |
| - static void addAll(_InternalLinkList list, _InternalLinkList other) { |
| - if (isEmpty(other)) return; |
| - _InternalLink listLast = list._previousLink; |
| - _InternalLink otherNext = other._nextLink; |
| - listLast._nextLink = otherNext; |
| - otherNext._previousLink = listLast; |
| - _InternalLink otherLast = other._previousLink; |
| - list._previousLink = otherLast; |
| - otherLast._nextLink = list; |
| - // Clean up [other]. |
| - other._nextLink = other._previousLink = other; |
| - } |
| -} |
| - |
| /** Superclass for provider of pending events. */ |
| abstract class _PendingEvents { |
| // No async event has been scheduled. |
| @@ -793,283 +713,41 @@ class _MultiplexerLinkedList { |
| } |
| } |
| -/** |
| - * A subscription used by [_SingleStreamMultiplexer]. |
| - * |
| - * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple |
| - * listeners at a time. It is used to implement [Stream.asBroadcastStream]. |
| - * |
| - * It is itself listening to another stream for events, and it forwards all |
| - * events to all of its simultanous listeners. |
| - * |
| - * The listeners are [_MultiplexerSubscription]s and are kept as a linked list. |
| - */ |
| -// TODO(lrn): Change "implements" to "with" when automatic mixin constructors |
| -// are implemented. |
| -class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> |
| - implements _MultiplexerLinkedList { |
| - static const int _STATE_NOT_LISTENING = 0; |
| - // Bit that alternates between event firings. If bit matches the one currently |
| - // firing, the subscription will not be notified. |
| - static const int _STATE_EVENT_ID_BIT = 1; |
| - // Whether the subscription is listening at all. This should be set while |
| - // it is part of the linked list of listeners of a multiplexer stream. |
| - static const int _STATE_LISTENING = 2; |
| - // State bit set while firing an event. |
| - static const int _STATE_IS_FIRING = 4; |
| - // Bit set if a subscription is canceled while it's firing (the |
| - // [_STATE_IS_FIRING] bit is set). |
| - // If the subscription is canceled while firing, it is not removed from the |
| - // linked list immediately (to avoid breaking iteration), but is instead |
| - // removed after it is done firing. |
| - static const int _STATE_REMOVE_AFTER_FIRING = 8; |
| - |
| - // Firing state. |
| - int _multiplexState; |
| - |
| - _SingleStreamMultiplexer _source; |
| - |
| - _MultiplexerSubscription(this._source, |
| - void onData(T data), |
| - void onError(Object error), |
| - void onDone(), |
| - bool cancelOnError, |
| - int nextEventId) |
| - : _multiplexState = _STATE_LISTENING | nextEventId, |
| - super(onData, onError, onDone, cancelOnError) { |
| - _next = _previous = this; |
| - } |
| - |
| - // Mixin workaround. |
| - _MultiplexerLinkedList _next; |
| - _MultiplexerLinkedList _previous; |
| - |
| - void _unlink() { |
| - _previous._next = _next; |
| - _next._previous = _previous; |
| - _next = _previous = this; |
| - } |
| - |
| - void _insertBefore(_MultiplexerLinkedList newNext) { |
| - _MultiplexerLinkedList newPrevious = newNext._previous; |
| - newPrevious._next = this; |
| - newNext._previous = _previous; |
| - _previous._next = newNext; |
| - _previous = newPrevious; |
| - } |
| - // End mixin. |
| - |
| - bool get _isListening => _multiplexState >= _STATE_LISTENING; |
| - bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; |
| - bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; |
| - int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; |
| - |
| - void _setRemoveAfterFiring() { |
| - assert(_isFiring); |
| - _multiplexState |= _STATE_REMOVE_AFTER_FIRING; |
| - } |
| - |
| - void _startFiring() { |
| - assert(!_isFiring); |
| - _multiplexState |= _STATE_IS_FIRING; |
| - } |
| - |
| - /// Marks listener as no longer firing, and toggles its event id. |
| - void _endFiring() { |
| - assert(_isFiring); |
| - _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); |
| - } |
| - |
| - void _setNotListening() { |
| - assert(_isListening); |
| - _multiplexState = _STATE_NOT_LISTENING; |
| - } |
| - |
| - void _onCancel() { |
| - assert(_isListening); |
| - _source._removeListener(this); |
| - } |
| -} |
| - |
| -/** |
| - * A stream that sends events from another stream to multiple listeners. |
| - * |
| - * This is used to implement [Stream.asBroadcastStream]. |
| - * |
| - * This stream allows listening more than once. |
| - * When the first listener is added, it starts listening on its source |
| - * stream for events. All events from the source stream are sent to all |
| - * active listeners. The listeners handle their own buffering. |
| - * When the last listener cancels, the source stream subscription is also |
| - * canceled, and no further listening is possible. |
| - */ |
| -// TODO(lrn): change "implements" to "with" when the VM supports it. |
| -class _SingleStreamMultiplexer<T> extends Stream<T> |
| - implements _MultiplexerLinkedList, |
| - _EventDispatch<T> { |
| +class _AsBroadcastStream<T> extends Stream<T> { |
| final Stream<T> _source; |
| + _BufferingMultiplexStreamController<T> _controller; |
| StreamSubscription<T> _subscription; |
| - // Alternates between zero and one for each event fired. |
| - // Listeners are initialized with the next event's id, and will |
| - // only be notified if they match the event being fired. |
| - // That way listeners added during event firing will not receive |
| - // the current event. |
| - int _eventId = 0; |
| - |
| - bool _isFiring = false; |
| - |
| - // Remember events added while firing. |
| - _StreamImplEvents _pending; |
| - |
| - _SingleStreamMultiplexer(this._source) { |
| - _next = _previous = this; |
| - } |
| - bool get _hasPending => _pending != null && !_pending.isEmpty; |
| - |
| - // Should be mixin. |
| - _MultiplexerLinkedList _next; |
| - _MultiplexerLinkedList _previous; |
| - |
| - void _unlink() { |
| - _previous._next = _next; |
| - _next._previous = _previous; |
| - _next = _previous = this; |
| + _AsBroadcastStream(this._source) { |
| + _controller = new _BufferingMultiplexStreamController<T>(null, _close); |
| } |
| - void _insertBefore(_MultiplexerLinkedList newNext) { |
| - _MultiplexerLinkedList newPrevious = newNext._previous; |
| - newPrevious._next = this; |
| - newNext._previous = _previous; |
| - _previous._next = newNext; |
| - _previous = newPrevious; |
| - } |
| - // End of mixin. |
| + bool get isBroadcast => true; |
| StreamSubscription<T> listen(void onData(T data), |
| { void onError(Object error), |
| void onDone(), |
| - bool cancelOnError }) { |
| - if (onData == null) onData = _nullDataHandler; |
| - if (onError == null) onError = _nullErrorHandler; |
| - if (onDone == null) onDone = _nullDoneHandler; |
| - cancelOnError = identical(true, cancelOnError); |
| - _MultiplexerSubscription subscription = |
| - new _MultiplexerSubscription(this, onData, onError, onDone, |
| - cancelOnError, _eventId); |
| - if (_subscription == null) { |
| - _subscription = _source.listen(_add, onError: _addError, onDone: _close); |
| + bool cancelOnError}) { |
| + if (_controller == null) { |
| + throw new StateError("Source stream has been closed."); |
| } |
| - subscription._insertBefore(this); |
| - return subscription; |
| - } |
| - |
| - /** Called by [_MultiplexerSubscription.remove]. */ |
| - void _removeListener(_MultiplexerSubscription listener) { |
| - if (listener._isFiring) { |
| - listener._setRemoveAfterFiring(); |
| - } else { |
| - _unlinkListener(listener); |
| - } |
| - } |
| - |
| - /** Remove a listener and close the subscription after the last one. */ |
| - void _unlinkListener(_MultiplexerSubscription listener) { |
| - listener._setNotListening(); |
| - listener._unlink(); |
| - if (identical(_next, this)) { |
| - // Last listener removed. |
| - _cancel(); |
| + if (_subscription == null) { |
| + _subscription = _source.listen(_controller.add, |
| + onError: _controller.addError, |
| + onDone: _controller.close); |
| } |
| + return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| + cancelOnError: cancelOnError); |
| } |
| - void _cancel() { |
| + void _close() { |
|
floitsch
2013/05/29 09:39:58
call it _onCancel.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
|
| StreamSubscription subscription = _subscription; |
| _subscription = null; |
| + _controller = null; |
| subscription.cancel(); |
| - if (_pending != null) _pending.cancelSchedule(); |
| - } |
| - |
| - void _forEachListener(void action(_MultiplexerSubscription listener)) { |
| - int eventId = _eventId; |
| - _eventId ^= 1; |
| - _isFiring = true; |
| - _MultiplexerLinkedList entry = _next; |
| - // Call each listener in order. A listener can be removed during any |
| - // other listener's event. During its own event it will only be marked |
| - // as "to be removed", and it will be handled after the event is done. |
| - while (!identical(entry, this)) { |
| - _MultiplexerSubscription listener = entry; |
| - if (listener._eventId == eventId) { |
| - listener._startFiring(); |
| - action(listener); |
| - listener._endFiring(); // Also toggles the event id. |
| - } |
| - entry = listener._next; |
| - if (listener._removeAfterFiring) { |
| - _unlinkListener(listener); |
| - } |
| - } |
| - _isFiring = false; |
| - } |
| - |
| - void _add(T data) { |
| - if (_isFiring || _hasPending) { |
| - _StreamImplEvents pending = _pending; |
| - if (pending == null) pending = _pending = new _StreamImplEvents(); |
| - pending.add(new _DelayedData(data)); |
| - } else { |
| - _sendData(data); |
| - } |
| - } |
| - |
| - void _addError(Object error) { |
| - if (_isFiring || _hasPending) { |
| - _StreamImplEvents pending = _pending; |
| - if (pending == null) pending = _pending = new _StreamImplEvents(); |
| - pending.add(new _DelayedError(error)); |
| - } else { |
| - _sendError(error); |
| - } |
| - } |
| - |
| - void _close() { |
| - if (_isFiring || _hasPending) { |
| - _StreamImplEvents pending = _pending; |
| - if (pending == null) pending = _pending = new _StreamImplEvents(); |
| - pending.add(const _DelayedDone()); |
| - } else { |
| - _sendDone(); |
| - } |
| - } |
| - |
| - void _sendData(T data) { |
| - _forEachListener((_MultiplexerSubscription listener) { |
| - listener._add(data); |
| - }); |
| - if (_hasPending) { |
| - _pending.schedule(this); |
| - } |
| - } |
| - |
| - void _sendError(Object error) { |
| - _forEachListener((_MultiplexerSubscription listener) { |
| - listener._addError(error); |
| - }); |
| - if (_hasPending) { |
| - _pending.schedule(this); |
| - } |
| - } |
| - |
| - void _sendDone() { |
| - _forEachListener((_MultiplexerSubscription listener) { |
| - listener._setRemoveAfterFiring(); |
| - listener._close(); |
| - }); |
| } |
| } |
| - |
| /** |
| * Simple implementation of [StreamIterator]. |
| */ |