Chromium Code Reviews| Index: sdk/lib/async/stream_impl.dart |
| diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
| index 8c9af64bb9e01fd4bfe9511e9024117bcce3e8e8..4fd66e7885caa3d5f54c7ebca454a3ebcd579a54 100644 |
| --- a/sdk/lib/async/stream_impl.dart |
| +++ b/sdk/lib/async/stream_impl.dart |
| @@ -86,7 +86,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| static const int _STATE_IN_CALLBACK = 32; |
| static const int _STATE_HAS_PENDING = 64; |
| static const int _STATE_PAUSE_COUNT = 128; |
| - static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
| /* Event handlers provided in constructor. */ |
| _DataHandler<T> _onData; |
| @@ -232,15 +231,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| } |
| /** |
| - * Increment the pause count. |
| - * |
| - * Also marks input as paused. |
| - */ |
| - void _incrementPauseCount() { |
| - _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| - } |
| - |
| - /** |
| * Decrements the pause count. |
| * |
| * Does not automatically unpause the input (call [_onResume]) when |
| @@ -713,25 +703,6 @@ class _StreamImplEvents<T> extends _PendingEvents<T> { |
| } |
| } |
| -class _BroadcastLinkedList { |
| - _BroadcastLinkedList _next; |
| - _BroadcastLinkedList _previous; |
| - |
| - void _unlink() { |
| - _previous._next = _next; |
| - _next._previous = _previous; |
| - _next = _previous = this; |
| - } |
| - |
| - void _insertBefore(_BroadcastLinkedList newNext) { |
| - _BroadcastLinkedList newPrevious = newNext._previous; |
| - newPrevious._next = this; |
| - newNext._previous = _previous; |
| - _previous._next = newNext; |
| - _previous = newPrevious; |
| - } |
| -} |
| - |
| typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
| /** |
| @@ -932,156 +903,131 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
| /** |
| * Simple implementation of [StreamIterator]. |
| + * |
| + * Pauses the stream between calles to [moveNext]. |
| */ |
| -class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| - // Internal state of the stream iterator. |
| - // At any time, it is in one of these states. |
| - // The interpretation of the [_futureOrPrefecth] field depends on the state. |
| - // In _STATE_MOVING, the _data field holds the most recently returned |
| - // future. |
| - // When in one of the _STATE_EXTRA_* states, the it may hold the |
| - // next data/error object, and the subscription is paused. |
| - |
| - /// The simple state where [_data] holds the data to return, and [moveNext] |
| - /// is allowed. The subscription is actively listening. |
| - static const int _STATE_FOUND = 0; |
| - /// State set after [moveNext] has returned false or an error, |
| - /// or after calling [cancel]. The subscription is always canceled. |
| - static const int _STATE_DONE = 1; |
| - /// State set after calling [moveNext], but before its returned future has |
| - /// completed. Calling [moveNext] again is not allowed in this state. |
| - /// The subscription is actively listening. |
| - static const int _STATE_MOVING = 2; |
| - /// States set when another event occurs while in _STATE_FOUND. |
| - /// This extra overflow event is cached until the next call to [moveNext], |
| - /// which will complete as if it received the event normally. |
| - /// The subscription is paused in these states, so we only ever get one |
| - /// event too many. |
| - static const int _STATE_EXTRA_DATA = 3; |
| - static const int _STATE_EXTRA_ERROR = 4; |
| - static const int _STATE_EXTRA_DONE = 5; |
| +class _StreamIterator<T> implements StreamIterator<T> { |
| + // The stream iterator is always in one of four states. |
| + // The value of the [_stateData] field depends on the state. |
| + // |
| + // When `_subscription == null` and `_stateData != null`: |
| + // The stream iterator has been created, but [moveNext] has not been called |
| + // yet. The [_stateData] field contains the stream to listen to on the first |
| + // call to [moveNext] and [current] returns `null`. |
| + // |
| + // When `_subscription != null` and `!_subscription.isPaused`: |
| + // The user has called [moveNext] and the iterator is waiting for the next |
| + // event. The [_stateData] field contains the [_Future] returned by the |
| + // [_moveNext] call and [current] returns `null.` |
| + // |
| + // When `_subscription != null` and `_subscription.isPaused`: |
| + // The most recent call to [moveNext] has completed with a `true` value |
| + // and [current] provides the value of the data event. |
| + // The [_stateData] field contains the [current] value. |
| + // |
| + // When `_subscription == null` and `_stateData == null`: |
| + // The stream has completed or been canceled using [cancel]. |
| + // The stream completes on either a done event or an error event. |
| + // The last call to [moveNext] has completed with `false` and [current] |
| + // returns `null`. |
| /// Subscription being listened to. |
| + /// |
| + /// Set to `null` when the stream subscription is done or canceled. |
| StreamSubscription _subscription; |
| - /// The current element represented by the most recent call to moveNext. |
| + /// Data value depending on the current state. |
| /// |
| - /// Is null between the time moveNext is called and its future completes. |
| - T _current = null; |
| - |
| - /// The future returned by the most recent call to [moveNext]. |
| + /// Before first call to [moveNext]: The stream to listen to. |
| + /// |
| + /// After calling [moveNext] but before the returned future completes: |
| + /// The returned future. |
| + /// |
| + /// After calling [moveNext] and the returned future has completed |
| + /// with `true`: The value of [current]. |
| /// |
| - /// Also used to store the next value/error in case the stream provides an |
| - /// event before [moveNext] is called again. In that case, the stream will |
| - /// be paused to prevent further events. |
| - var/*Future<bool> or T*/ _futureOrPrefetch = null; |
| + /// After calling [moveNext] and the returned future has completed |
| + /// with `false`, or after calling [cancel]: `null`. |
| + Object _stateData; |
| - /// The current state. |
| - int _state = _STATE_FOUND; |
| + _StreamIterator(final Stream<T> stream) : _stateData = stream; |
| - _StreamIteratorImpl(final Stream<T> stream) { |
| - _subscription = stream.listen(_onData, |
| - onError: _onError, |
| - onDone: _onDone, |
| - cancelOnError: true); |
| + T get current { |
| + if (_subscription != null && _subscription.isPaused) { |
| + return _stateData; |
|
floitsch
2016/07/14 13:29:25
Please make sure that this is still strong-mode cl
|
| + } |
| + return null; |
| } |
| - T get current => _current; |
| - |
| Future<bool> moveNext() { |
| - if (_state == _STATE_DONE) { |
| - return new _Future<bool>.immediate(false); |
| - } |
| - if (_state == _STATE_MOVING) { |
| + if (_subscription != null) { |
| + if (_subscription.isPaused) { |
| + var future = new _Future<bool>(); |
| + _stateData = future; |
| + _subscription.resume(); |
| + return future; |
| + } |
| throw new StateError("Already waiting for next."); |
| } |
| - if (_state == _STATE_FOUND) { |
| - _state = _STATE_MOVING; |
| - _current = null; |
| - var result = new _Future<bool>(); |
| - _futureOrPrefetch = result; |
| - return result; |
| - } else { |
| - assert(_state >= _STATE_EXTRA_DATA); |
| - switch (_state) { |
| - case _STATE_EXTRA_DATA: |
| - _state = _STATE_FOUND; |
| - _current = _futureOrPrefetch as Object /*=T*/; |
| - _futureOrPrefetch = null; |
| - _subscription.resume(); |
| - return new _Future<bool>.immediate(true); |
| - case _STATE_EXTRA_ERROR: |
| - AsyncError prefetch = _futureOrPrefetch; |
| - _clear(); |
| - return new _Future<bool>.immediateError(prefetch.error, |
| - prefetch.stackTrace); |
| - case _STATE_EXTRA_DONE: |
| - _clear(); |
| - return new _Future<bool>.immediate(false); |
| - } |
| - } |
| + return _initializeOrDone(); |
| } |
| - /** Clears up the internal state when the iterator ends. */ |
| - void _clear() { |
| - _subscription = null; |
| - _futureOrPrefetch = null; |
| - _current = null; |
| - _state = _STATE_DONE; |
| + /// Called if there is no active subscription when [moveNext] is called. |
| + /// |
| + /// Either starts listening on the stream if this is the first call to |
| + /// [moveNext], or returns a `false` future because the stream has already |
| + /// ended. |
| + Future<bool> _initializeOrDone() { |
| + assert(_subscription == null); |
| + var stateData = _stateData; |
| + if (stateData != null) { |
| + Stream<T> stream = stateData; |
| + _subscription = stream.listen( |
| + _onData, onError: _onError, onDone: _onDone, cancelOnError: true); |
| + var future = new _Future<bool>(); |
| + _stateData = future; |
| + return future; |
| + } |
| + return new _Future.immediate(false); |
| } |
| Future cancel() { |
| - StreamSubscription subscription = _subscription; |
| - if (subscription == null) return null; |
| - if (_state == _STATE_MOVING) { |
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| - _clear(); |
| - hasNext._complete(false); |
| - } else { |
| - _clear(); |
| + StreamSubscription<T> subscription = _subscription; |
| + Object stateData = _stateData; |
| + _stateData = null; |
| + if (subscription != null) { |
| + _subscription = null; |
| + if (!subscription.isPaused) { |
| + _Future<bool> future = stateData; |
| + future._asyncComplete(false); |
| + } |
| + return subscription.cancel(); |
| } |
| - return subscription.cancel(); |
| + return null; |
| } |
| void _onData(T data) { |
| - if (_state == _STATE_MOVING) { |
| - _current = data; |
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| - _futureOrPrefetch = null; |
| - _state = _STATE_FOUND; |
| - hasNext._complete(true); |
| - return; |
| - } |
| + assert(_subscription != null && !_subscription.isPaused); |
| + _Future<bool> moveNextFuture = _stateData; |
| + _stateData = data; |
| _subscription.pause(); |
| - assert(_futureOrPrefetch == null); |
| - _futureOrPrefetch = data; |
| - _state = _STATE_EXTRA_DATA; |
| + moveNextFuture._complete(true); |
| } |
| void _onError(Object error, [StackTrace stackTrace]) { |
| - if (_state == _STATE_MOVING) { |
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| - // We have cancelOnError: true, so the subscription is canceled. |
| - _clear(); |
| - hasNext._completeError(error, stackTrace); |
| - return; |
| - } |
| - _subscription.pause(); |
| - assert(_futureOrPrefetch == null); |
| - _futureOrPrefetch = new AsyncError(error, stackTrace); |
| - _state = _STATE_EXTRA_ERROR; |
| + assert(_subscription != null && !_subscription.isPaused); |
| + _Future<bool> moveNextFuture = _stateData; |
| + _subscription = null; |
| + _stateData = null; |
| + moveNextFuture._completeError(error, stackTrace); |
| } |
| void _onDone() { |
| - if (_state == _STATE_MOVING) { |
| - _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
| - _clear(); |
| - hasNext._complete(false); |
| - return; |
| - } |
| - _subscription.pause(); |
| - _futureOrPrefetch = null; |
| - _state = _STATE_EXTRA_DONE; |
| + assert(_subscription != null && !_subscription.isPaused); |
| + _Future<bool> moveNextFuture = _stateData; |
| + _subscription = null; |
| + _stateData = null; |
| + moveNextFuture._complete(false); |
| } |
| } |