Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index 5987050a586da99de0d00b8f31a99d06c08531ad..5990ca52ce6061a0791315722bb24dadea33c329 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -598,86 +598,6 @@ class _DelayedDone implements _DelayedEvent { |
} |
} |
-/** |
- * Simple internal doubly-linked list implementation. |
- * |
- * In an internal linked list, the links are in the data objects themselves, |
- * instead of in a separate object. That means each element can be in at most |
- * one list at a time. |
- * |
- * All links are always members of an element cycle. At creation it's a |
- * singleton cycle. |
- */ |
-abstract class _InternalLink { |
- _InternalLink _nextLink; |
- _InternalLink _previousLink; |
- |
- _InternalLink() { |
- this._previousLink = this._nextLink = this; |
- } |
- |
- /* Removes a link from any list it may be part of, and links it to itself. */ |
- static void unlink(_InternalLink element) { |
- _InternalLink next = element._nextLink; |
- _InternalLink previous = element._previousLink; |
- next._previousLink = previous; |
- previous._nextLink = next; |
- element._nextLink = element._previousLink = element; |
- } |
- |
- /** Check whether an element is unattached to other elements. */ |
- static bool isUnlinked(_InternalLink element) { |
- return identical(element, element._nextLink); |
- } |
-} |
- |
-/** |
- * Marker interface for "list" links. |
- * |
- * An "InternalLinkList" is an abstraction on top of a link cycle, where the |
- * "list" object itself is not considered an element (it's just a header link |
- * created to avoid edge cases). |
- * An element is considered part of a list if it is in the list's cycle. |
- * There should never be more than one "list" object in a cycle. |
- */ |
-abstract class _InternalLinkList extends _InternalLink { |
- /** |
- * Adds an element to a list, just before the header link. |
- * |
- * This effectively adds it at the end of the list. |
- */ |
- static void add(_InternalLinkList list, _InternalLink element) { |
- if (!_InternalLink.isUnlinked(element)) _InternalLink.unlink(element); |
- _InternalLink listEnd = list._previousLink; |
- listEnd._nextLink = element; |
- list._previousLink = element; |
- element._previousLink = listEnd; |
- element._nextLink = list; |
- } |
- |
- /** Removes an element from its list. */ |
- static void remove(_InternalLink element) { |
- _InternalLink.unlink(element); |
- } |
- |
- /** Check whether a list contains no elements, only the header link. */ |
- static bool isEmpty(_InternalLinkList list) => _InternalLink.isUnlinked(list); |
- |
- /** Moves all elements from the list [other] to [list]. */ |
- static void addAll(_InternalLinkList list, _InternalLinkList other) { |
- if (isEmpty(other)) return; |
- _InternalLink listLast = list._previousLink; |
- _InternalLink otherNext = other._nextLink; |
- listLast._nextLink = otherNext; |
- otherNext._previousLink = listLast; |
- _InternalLink otherLast = other._previousLink; |
- list._previousLink = otherLast; |
- otherLast._nextLink = list; |
- // Clean up [other]. |
- other._nextLink = other._previousLink = other; |
- } |
-} |
- |
/** Superclass for provider of pending events. */ |
abstract class _PendingEvents { |
// No async event has been scheduled. |
@@ -793,283 +713,41 @@ class _MultiplexerLinkedList { |
} |
} |
-/** |
- * A subscription used by [_SingleStreamMultiplexer]. |
- * |
- * The [_SingleStreamMultiplexer] is a [Stream] which allows multiple |
- * listeners at a time. It is used to implement [Stream.asBroadcastStream]. |
- * |
- * It is itself listening to another stream for events, and it forwards all |
- * events to all of its simultanous listeners. |
- * |
- * The listeners are [_MultiplexerSubscription]s and are kept as a linked list. |
- */ |
-// TODO(lrn): Change "implements" to "with" when automatic mixin constructors |
-// are implemented. |
-class _MultiplexerSubscription<T> extends _BufferingStreamSubscription<T> |
- implements _MultiplexerLinkedList { |
- static const int _STATE_NOT_LISTENING = 0; |
- // Bit that alternates between event firings. If bit matches the one currently |
- // firing, the subscription will not be notified. |
- static const int _STATE_EVENT_ID_BIT = 1; |
- // Whether the subscription is listening at all. This should be set while |
- // it is part of the linked list of listeners of a multiplexer stream. |
- static const int _STATE_LISTENING = 2; |
- // State bit set while firing an event. |
- static const int _STATE_IS_FIRING = 4; |
- // Bit set if a subscription is canceled while it's firing (the |
- // [_STATE_IS_FIRING] bit is set). |
- // If the subscription is canceled while firing, it is not removed from the |
- // linked list immediately (to avoid breaking iteration), but is instead |
- // removed after it is done firing. |
- static const int _STATE_REMOVE_AFTER_FIRING = 8; |
- |
- // Firing state. |
- int _multiplexState; |
- |
- _SingleStreamMultiplexer _source; |
- |
- _MultiplexerSubscription(this._source, |
- void onData(T data), |
- void onError(Object error), |
- void onDone(), |
- bool cancelOnError, |
- int nextEventId) |
- : _multiplexState = _STATE_LISTENING | nextEventId, |
- super(onData, onError, onDone, cancelOnError) { |
- _next = _previous = this; |
- } |
- |
- // Mixin workaround. |
- _MultiplexerLinkedList _next; |
- _MultiplexerLinkedList _previous; |
- |
- void _unlink() { |
- _previous._next = _next; |
- _next._previous = _previous; |
- _next = _previous = this; |
- } |
- |
- void _insertBefore(_MultiplexerLinkedList newNext) { |
- _MultiplexerLinkedList newPrevious = newNext._previous; |
- newPrevious._next = this; |
- newNext._previous = _previous; |
- _previous._next = newNext; |
- _previous = newPrevious; |
- } |
- // End mixin. |
- |
- bool get _isListening => _multiplexState >= _STATE_LISTENING; |
- bool get _isFiring => _multiplexState >= _STATE_IS_FIRING; |
- bool get _removeAfterFiring => _multiplexState >= _STATE_REMOVE_AFTER_FIRING; |
- int get _eventId => _multiplexState & _STATE_EVENT_ID_BIT; |
- |
- void _setRemoveAfterFiring() { |
- assert(_isFiring); |
- _multiplexState |= _STATE_REMOVE_AFTER_FIRING; |
- } |
- |
- void _startFiring() { |
- assert(!_isFiring); |
- _multiplexState |= _STATE_IS_FIRING; |
- } |
- |
- /// Marks listener as no longer firing, and toggles its event id. |
- void _endFiring() { |
- assert(_isFiring); |
- _multiplexState ^= (_STATE_IS_FIRING | _STATE_EVENT_ID_BIT); |
- } |
- |
- void _setNotListening() { |
- assert(_isListening); |
- _multiplexState = _STATE_NOT_LISTENING; |
- } |
- |
- void _onCancel() { |
- assert(_isListening); |
- _source._removeListener(this); |
- } |
-} |
- |
-/** |
- * A stream that sends events from another stream to multiple listeners. |
- * |
- * This is used to implement [Stream.asBroadcastStream]. |
- * |
- * This stream allows listening more than once. |
- * When the first listener is added, it starts listening on its source |
- * stream for events. All events from the source stream are sent to all |
- * active listeners. The listeners handle their own buffering. |
- * When the last listener cancels, the source stream subscription is also |
- * canceled, and no further listening is possible. |
- */ |
-// TODO(lrn): change "implements" to "with" when the VM supports it. |
-class _SingleStreamMultiplexer<T> extends Stream<T> |
- implements _MultiplexerLinkedList, |
- _EventDispatch<T> { |
+class _AsBroadcastStream<T> extends Stream<T> { |
final Stream<T> _source; |
+ _BufferingMultiplexStreamController<T> _controller; |
StreamSubscription<T> _subscription; |
- // Alternates between zero and one for each event fired. |
- // Listeners are initialized with the next event's id, and will |
- // only be notified if they match the event being fired. |
- // That way listeners added during event firing will not receive |
- // the current event. |
- int _eventId = 0; |
- |
- bool _isFiring = false; |
- |
- // Remember events added while firing. |
- _StreamImplEvents _pending; |
- |
- _SingleStreamMultiplexer(this._source) { |
- _next = _previous = this; |
- } |
- bool get _hasPending => _pending != null && !_pending.isEmpty; |
- |
- // Should be mixin. |
- _MultiplexerLinkedList _next; |
- _MultiplexerLinkedList _previous; |
- |
- void _unlink() { |
- _previous._next = _next; |
- _next._previous = _previous; |
- _next = _previous = this; |
+ _AsBroadcastStream(this._source) { |
+ _controller = new _BufferingMultiplexStreamController<T>(null, _close); |
} |
- void _insertBefore(_MultiplexerLinkedList newNext) { |
- _MultiplexerLinkedList newPrevious = newNext._previous; |
- newPrevious._next = this; |
- newNext._previous = _previous; |
- _previous._next = newNext; |
- _previous = newPrevious; |
- } |
- // End of mixin. |
+ bool get isBroadcast => true; |
StreamSubscription<T> listen(void onData(T data), |
{ void onError(Object error), |
void onDone(), |
- bool cancelOnError }) { |
- if (onData == null) onData = _nullDataHandler; |
- if (onError == null) onError = _nullErrorHandler; |
- if (onDone == null) onDone = _nullDoneHandler; |
- cancelOnError = identical(true, cancelOnError); |
- _MultiplexerSubscription subscription = |
- new _MultiplexerSubscription(this, onData, onError, onDone, |
- cancelOnError, _eventId); |
- if (_subscription == null) { |
- _subscription = _source.listen(_add, onError: _addError, onDone: _close); |
+ bool cancelOnError}) { |
+ if (_controller == null) { |
+ throw new StateError("Source stream has been closed."); |
} |
- subscription._insertBefore(this); |
- return subscription; |
- } |
- |
- /** Called by [_MultiplexerSubscription.remove]. */ |
- void _removeListener(_MultiplexerSubscription listener) { |
- if (listener._isFiring) { |
- listener._setRemoveAfterFiring(); |
- } else { |
- _unlinkListener(listener); |
- } |
- } |
- |
- /** Remove a listener and close the subscription after the last one. */ |
- void _unlinkListener(_MultiplexerSubscription listener) { |
- listener._setNotListening(); |
- listener._unlink(); |
- if (identical(_next, this)) { |
- // Last listener removed. |
- _cancel(); |
+ if (_subscription == null) { |
+ _subscription = _source.listen(_controller.add, |
+ onError: _controller.addError, |
+ onDone: _controller.close); |
} |
+ return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
+ cancelOnError: cancelOnError); |
} |
- void _cancel() { |
+ void _close() { |
floitsch
2013/05/29 09:39:58
call it _onCancel.
Lasse Reichstein Nielsen
2013/05/29 10:39:12
Done.
|
StreamSubscription subscription = _subscription; |
_subscription = null; |
+ _controller = null; |
subscription.cancel(); |
- if (_pending != null) _pending.cancelSchedule(); |
- } |
- |
- void _forEachListener(void action(_MultiplexerSubscription listener)) { |
- int eventId = _eventId; |
- _eventId ^= 1; |
- _isFiring = true; |
- _MultiplexerLinkedList entry = _next; |
- // Call each listener in order. A listener can be removed during any |
- // other listener's event. During its own event it will only be marked |
- // as "to be removed", and it will be handled after the event is done. |
- while (!identical(entry, this)) { |
- _MultiplexerSubscription listener = entry; |
- if (listener._eventId == eventId) { |
- listener._startFiring(); |
- action(listener); |
- listener._endFiring(); // Also toggles the event id. |
- } |
- entry = listener._next; |
- if (listener._removeAfterFiring) { |
- _unlinkListener(listener); |
- } |
- } |
- _isFiring = false; |
- } |
- |
- void _add(T data) { |
- if (_isFiring || _hasPending) { |
- _StreamImplEvents pending = _pending; |
- if (pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(new _DelayedData(data)); |
- } else { |
- _sendData(data); |
- } |
- } |
- |
- void _addError(Object error) { |
- if (_isFiring || _hasPending) { |
- _StreamImplEvents pending = _pending; |
- if (pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(new _DelayedError(error)); |
- } else { |
- _sendError(error); |
- } |
- } |
- |
- void _close() { |
- if (_isFiring || _hasPending) { |
- _StreamImplEvents pending = _pending; |
- if (pending == null) pending = _pending = new _StreamImplEvents(); |
- pending.add(const _DelayedDone()); |
- } else { |
- _sendDone(); |
- } |
- } |
- |
- void _sendData(T data) { |
- _forEachListener((_MultiplexerSubscription listener) { |
- listener._add(data); |
- }); |
- if (_hasPending) { |
- _pending.schedule(this); |
- } |
- } |
- |
- void _sendError(Object error) { |
- _forEachListener((_MultiplexerSubscription listener) { |
- listener._addError(error); |
- }); |
- if (_hasPending) { |
- _pending.schedule(this); |
- } |
- } |
- |
- void _sendDone() { |
- _forEachListener((_MultiplexerSubscription listener) { |
- listener._setRemoveAfterFiring(); |
- listener._close(); |
- }); |
} |
} |
- |
/** |
* Simple implementation of [StreamIterator]. |
*/ |