| Index: sdk/lib/async/stream_impl.dart
|
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
|
| index cb2e20d0f1373c8ec59700d85e76e81e2dc91f5a..7f894644bb5e881614f24bb53e3dce97eb69d22f 100644
|
| --- a/sdk/lib/async/stream_impl.dart
|
| +++ b/sdk/lib/async/stream_impl.dart
|
| @@ -86,7 +86,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| static const int _STATE_IN_CALLBACK = 32;
|
| static const int _STATE_HAS_PENDING = 64;
|
| static const int _STATE_PAUSE_COUNT = 128;
|
| - static const int _STATE_PAUSE_COUNT_SHIFT = 7;
|
|
|
| /* Event handlers provided in constructor. */
|
| _DataHandler<T> _onData;
|
| @@ -239,15 +238,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
|
| }
|
|
|
| /**
|
| - * Increment the pause count.
|
| - *
|
| - * Also marks input as paused.
|
| - */
|
| - void _incrementPauseCount() {
|
| - _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
|
| - }
|
| -
|
| - /**
|
| * Decrements the pause count.
|
| *
|
| * Does not automatically unpause the input (call [_onResume]) when
|
| @@ -722,25 +712,6 @@ class _StreamImplEvents<T> extends _PendingEvents<T> {
|
| }
|
| }
|
|
|
| -class _BroadcastLinkedList {
|
| - _BroadcastLinkedList _next;
|
| - _BroadcastLinkedList _previous;
|
| -
|
| - void _unlink() {
|
| - _previous._next = _next;
|
| - _next._previous = _previous;
|
| - _next = _previous = this;
|
| - }
|
| -
|
| - void _insertBefore(_BroadcastLinkedList newNext) {
|
| - _BroadcastLinkedList newPrevious = newNext._previous;
|
| - newPrevious._next = this;
|
| - newNext._previous = _previous;
|
| - _previous._next = newNext;
|
| - _previous = newPrevious;
|
| - }
|
| -}
|
| -
|
| typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
|
|
|
| /**
|
| @@ -941,156 +912,139 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
|
|
|
| /**
|
| * Simple implementation of [StreamIterator].
|
| + *
|
| + * Pauses the stream between calls to [moveNext].
|
| */
|
| -class _StreamIteratorImpl<T> implements StreamIterator<T> {
|
| - // Internal state of the stream iterator.
|
| - // At any time, it is in one of these states.
|
| - // The interpretation of the [_futureOrPrefecth] field depends on the state.
|
| - // In _STATE_MOVING, the _data field holds the most recently returned
|
| - // future.
|
| - // When in one of the _STATE_EXTRA_* states, the it may hold the
|
| - // next data/error object, and the subscription is paused.
|
| -
|
| - /// The simple state where [_data] holds the data to return, and [moveNext]
|
| - /// is allowed. The subscription is actively listening.
|
| - static const int _STATE_FOUND = 0;
|
| - /// State set after [moveNext] has returned false or an error,
|
| - /// or after calling [cancel]. The subscription is always canceled.
|
| - static const int _STATE_DONE = 1;
|
| - /// State set after calling [moveNext], but before its returned future has
|
| - /// completed. Calling [moveNext] again is not allowed in this state.
|
| - /// The subscription is actively listening.
|
| - static const int _STATE_MOVING = 2;
|
| - /// States set when another event occurs while in _STATE_FOUND.
|
| - /// This extra overflow event is cached until the next call to [moveNext],
|
| - /// which will complete as if it received the event normally.
|
| - /// The subscription is paused in these states, so we only ever get one
|
| - /// event too many.
|
| - static const int _STATE_EXTRA_DATA = 3;
|
| - static const int _STATE_EXTRA_ERROR = 4;
|
| - static const int _STATE_EXTRA_DONE = 5;
|
| +class _StreamIterator<T> implements StreamIterator<T> {
|
| + // The stream iterator is always in one of four states.
|
| + // The value of the [_stateData] field depends on the state.
|
| + //
|
| + // When `_subscription == null` and `_stateData != null`:
|
| + // The stream iterator has been created, but [moveNext] has not been called
|
| + // yet. The [_stateData] field contains the stream to listen to on the first
|
| + // call to [moveNext] and [current] returns `null`.
|
| + //
|
| + // When `_subscription != null` and `!_isPaused`:
|
| + // The user has called [moveNext] and the iterator is waiting for the next
|
| + // event. The [_stateData] field contains the [_Future] returned by the
|
| + // [_moveNext] call and [current] returns `null.`
|
| + //
|
| + // When `_subscription != null` and `_isPaused`:
|
| + // The most recent call to [moveNext] has completed with a `true` value
|
| + // and [current] provides the value of the data event.
|
| + // The [_stateData] field contains the [current] value.
|
| + //
|
| + // When `_subscription == null` and `_stateData == null`:
|
| + // The stream has completed or been canceled using [cancel].
|
| + // The stream completes on either a done event or an error event.
|
| + // The last call to [moveNext] has completed with `false` and [current]
|
| + // returns `null`.
|
|
|
| /// Subscription being listened to.
|
| + ///
|
| + /// Set to `null` when the stream subscription is done or canceled.
|
| StreamSubscription _subscription;
|
|
|
| - /// The current element represented by the most recent call to moveNext.
|
| + /// Data value depending on the current state.
|
| ///
|
| - /// Is null between the time moveNext is called and its future completes.
|
| - T _current = null;
|
| -
|
| - /// The future returned by the most recent call to [moveNext].
|
| + /// Before first call to [moveNext]: The stream to listen to.
|
| + ///
|
| + /// After calling [moveNext] but before the returned future completes:
|
| + /// The returned future.
|
| ///
|
| - /// Also used to store the next value/error in case the stream provides an
|
| - /// event before [moveNext] is called again. In that case, the stream will
|
| - /// be paused to prevent further events.
|
| - var/*Future<bool> or T*/ _futureOrPrefetch = null;
|
| + /// After calling [moveNext] and the returned future has completed
|
| + /// with `true`: The value of [current].
|
| + ///
|
| + /// After calling [moveNext] and the returned future has completed
|
| + /// with `false`, or after calling [cancel]: `null`.
|
| + Object _stateData;
|
|
|
| - /// The current state.
|
| - int _state = _STATE_FOUND;
|
| + /// Whether the iterator is between calls to `moveNext`.
|
| + /// This will usually cause the [_subscription] to be paused, but as an
|
| + /// optimization, we only pause after the [moveNext] future has been
|
| + /// completed.
|
| + bool _isPaused = false;
|
|
|
| - _StreamIteratorImpl(final Stream<T> stream) {
|
| - _subscription = stream.listen(_onData,
|
| - onError: _onError,
|
| - onDone: _onDone,
|
| - cancelOnError: true);
|
| - }
|
| + _StreamIterator(final Stream<T> stream) : _stateData = stream;
|
|
|
| - T get current => _current;
|
| + T get current {
|
| + if (_subscription != null && _isPaused) {
|
| + return _stateData as Object /*=T*/;
|
| + }
|
| + return null;
|
| + }
|
|
|
| Future<bool> moveNext() {
|
| - if (_state == _STATE_DONE) {
|
| - return new _Future<bool>.immediate(false);
|
| - }
|
| - if (_state == _STATE_MOVING) {
|
| - throw new StateError("Already waiting for next.");
|
| - }
|
| - if (_state == _STATE_FOUND) {
|
| - _state = _STATE_MOVING;
|
| - _current = null;
|
| - var result = new _Future<bool>();
|
| - _futureOrPrefetch = result;
|
| - return result;
|
| - } else {
|
| - assert(_state >= _STATE_EXTRA_DATA);
|
| - switch (_state) {
|
| - case _STATE_EXTRA_DATA:
|
| - _state = _STATE_FOUND;
|
| - _current = _futureOrPrefetch as Object /*=T*/;
|
| - _futureOrPrefetch = null;
|
| - _subscription.resume();
|
| - return new _Future<bool>.immediate(true);
|
| - case _STATE_EXTRA_ERROR:
|
| - AsyncError prefetch = _futureOrPrefetch;
|
| - _clear();
|
| - return new _Future<bool>.immediateError(prefetch.error,
|
| - prefetch.stackTrace);
|
| - case _STATE_EXTRA_DONE:
|
| - _clear();
|
| - return new _Future<bool>.immediate(false);
|
| + if (_subscription != null) {
|
| + if (_isPaused) {
|
| + var future = new _Future<bool>();
|
| + _stateData = future;
|
| + _isPaused = false;
|
| + _subscription.resume();
|
| + return future;
|
| }
|
| + throw new StateError("Already waiting for next.");
|
| }
|
| + return _initializeOrDone();
|
| }
|
|
|
| - /** Clears up the internal state when the iterator ends. */
|
| - void _clear() {
|
| - _subscription = null;
|
| - _futureOrPrefetch = null;
|
| - _current = null;
|
| - _state = _STATE_DONE;
|
| + /// Called if there is no active subscription when [moveNext] is called.
|
| + ///
|
| + /// Either starts listening on the stream if this is the first call to
|
| + /// [moveNext], or returns a `false` future because the stream has already
|
| + /// ended.
|
| + Future<bool> _initializeOrDone() {
|
| + assert(_subscription == null);
|
| + var stateData = _stateData;
|
| + if (stateData != null) {
|
| + Stream<T> stream = stateData as Object /*=Stream<T>*/;
|
| + _subscription = stream.listen(
|
| + _onData, onError: _onError, onDone: _onDone, cancelOnError: true);
|
| + var future = new _Future<bool>();
|
| + _stateData = future;
|
| + return future;
|
| + }
|
| + return new _Future<bool>.immediate(false);
|
| }
|
|
|
| Future cancel() {
|
| - StreamSubscription subscription = _subscription;
|
| - if (subscription == null) return Future._nullFuture;
|
| - if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - _clear();
|
| - hasNext._complete(false);
|
| - } else {
|
| - _clear();
|
| + StreamSubscription<T> subscription = _subscription;
|
| + Object stateData = _stateData;
|
| + _stateData = null;
|
| + if (subscription != null) {
|
| + _subscription = null;
|
| + if (!_isPaused) {
|
| + _Future<bool> future = stateData as Object /*=_Future<bool>*/;
|
| + future._asyncComplete(false);
|
| + }
|
| + return subscription.cancel();
|
| }
|
| - return subscription.cancel();
|
| + return Future._nullFuture;
|
| }
|
|
|
| void _onData(T data) {
|
| - if (_state == _STATE_MOVING) {
|
| - _current = data;
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - _futureOrPrefetch = null;
|
| - _state = _STATE_FOUND;
|
| - hasNext._complete(true);
|
| - return;
|
| - }
|
| - _subscription.pause();
|
| - assert(_futureOrPrefetch == null);
|
| - _futureOrPrefetch = data;
|
| - _state = _STATE_EXTRA_DATA;
|
| + assert(_subscription != null && !_isPaused);
|
| + _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
|
| + _stateData = data;
|
| + _isPaused = true;
|
| + moveNextFuture._complete(true);
|
| + if (_subscription != null && _isPaused) _subscription.pause();
|
| }
|
|
|
| void _onError(Object error, [StackTrace stackTrace]) {
|
| - if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - // We have cancelOnError: true, so the subscription is canceled.
|
| - _clear();
|
| - hasNext._completeError(error, stackTrace);
|
| - return;
|
| - }
|
| - _subscription.pause();
|
| - assert(_futureOrPrefetch == null);
|
| - _futureOrPrefetch = new AsyncError(error, stackTrace);
|
| - _state = _STATE_EXTRA_ERROR;
|
| + assert(_subscription != null && !_isPaused);
|
| + _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
|
| + _subscription = null;
|
| + _stateData = null;
|
| + moveNextFuture._completeError(error, stackTrace);
|
| }
|
|
|
| void _onDone() {
|
| - if (_state == _STATE_MOVING) {
|
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
|
| - _clear();
|
| - hasNext._complete(false);
|
| - return;
|
| - }
|
| - _subscription.pause();
|
| - _futureOrPrefetch = null;
|
| - _state = _STATE_EXTRA_DONE;
|
| + assert(_subscription != null && !_isPaused);
|
| + _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
|
| + _subscription = null;
|
| + _stateData = null;
|
| + moveNextFuture._complete(false);
|
| }
|
| }
|
|
|
|
|