Index: sdk/lib/async/stream_impl.dart |
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart |
index cb2e20d0f1373c8ec59700d85e76e81e2dc91f5a..7f894644bb5e881614f24bb53e3dce97eb69d22f 100644 |
--- a/sdk/lib/async/stream_impl.dart |
+++ b/sdk/lib/async/stream_impl.dart |
@@ -86,7 +86,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
static const int _STATE_IN_CALLBACK = 32; |
static const int _STATE_HAS_PENDING = 64; |
static const int _STATE_PAUSE_COUNT = 128; |
- static const int _STATE_PAUSE_COUNT_SHIFT = 7; |
/* Event handlers provided in constructor. */ |
_DataHandler<T> _onData; |
@@ -239,15 +238,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
} |
/** |
- * Increment the pause count. |
- * |
- * Also marks input as paused. |
- */ |
- void _incrementPauseCount() { |
- _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
- } |
- |
- /** |
* Decrements the pause count. |
* |
* Does not automatically unpause the input (call [_onResume]) when |
@@ -722,25 +712,6 @@ class _StreamImplEvents<T> extends _PendingEvents<T> { |
} |
} |
-class _BroadcastLinkedList { |
- _BroadcastLinkedList _next; |
- _BroadcastLinkedList _previous; |
- |
- void _unlink() { |
- _previous._next = _next; |
- _next._previous = _previous; |
- _next = _previous = this; |
- } |
- |
- void _insertBefore(_BroadcastLinkedList newNext) { |
- _BroadcastLinkedList newPrevious = newNext._previous; |
- newPrevious._next = this; |
- newNext._previous = _previous; |
- _previous._next = newNext; |
- _previous = newPrevious; |
- } |
-} |
- |
typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription); |
/** |
@@ -941,156 +912,139 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { |
/** |
* Simple implementation of [StreamIterator]. |
+ * |
+ * Pauses the stream between calls to [moveNext]. |
*/ |
-class _StreamIteratorImpl<T> implements StreamIterator<T> { |
- // Internal state of the stream iterator. |
- // At any time, it is in one of these states. |
- // The interpretation of the [_futureOrPrefecth] field depends on the state. |
- // In _STATE_MOVING, the _data field holds the most recently returned |
- // future. |
- // When in one of the _STATE_EXTRA_* states, the it may hold the |
- // next data/error object, and the subscription is paused. |
- |
- /// The simple state where [_data] holds the data to return, and [moveNext] |
- /// is allowed. The subscription is actively listening. |
- static const int _STATE_FOUND = 0; |
- /// State set after [moveNext] has returned false or an error, |
- /// or after calling [cancel]. The subscription is always canceled. |
- static const int _STATE_DONE = 1; |
- /// State set after calling [moveNext], but before its returned future has |
- /// completed. Calling [moveNext] again is not allowed in this state. |
- /// The subscription is actively listening. |
- static const int _STATE_MOVING = 2; |
- /// States set when another event occurs while in _STATE_FOUND. |
- /// This extra overflow event is cached until the next call to [moveNext], |
- /// which will complete as if it received the event normally. |
- /// The subscription is paused in these states, so we only ever get one |
- /// event too many. |
- static const int _STATE_EXTRA_DATA = 3; |
- static const int _STATE_EXTRA_ERROR = 4; |
- static const int _STATE_EXTRA_DONE = 5; |
+class _StreamIterator<T> implements StreamIterator<T> { |
+ // The stream iterator is always in one of four states. |
+ // The value of the [_stateData] field depends on the state. |
+ // |
+ // When `_subscription == null` and `_stateData != null`: |
+ // The stream iterator has been created, but [moveNext] has not been called |
+ // yet. The [_stateData] field contains the stream to listen to on the first |
+ // call to [moveNext] and [current] returns `null`. |
+ // |
+ // When `_subscription != null` and `!_isPaused`: |
+ // The user has called [moveNext] and the iterator is waiting for the next |
+ // event. The [_stateData] field contains the [_Future] returned by the |
+ // [_moveNext] call and [current] returns `null.` |
+ // |
+ // When `_subscription != null` and `_isPaused`: |
+ // The most recent call to [moveNext] has completed with a `true` value |
+ // and [current] provides the value of the data event. |
+ // The [_stateData] field contains the [current] value. |
+ // |
+ // When `_subscription == null` and `_stateData == null`: |
+ // The stream has completed or been canceled using [cancel]. |
+ // The stream completes on either a done event or an error event. |
+ // The last call to [moveNext] has completed with `false` and [current] |
+ // returns `null`. |
/// Subscription being listened to. |
+ /// |
+ /// Set to `null` when the stream subscription is done or canceled. |
StreamSubscription _subscription; |
- /// The current element represented by the most recent call to moveNext. |
+ /// Data value depending on the current state. |
/// |
- /// Is null between the time moveNext is called and its future completes. |
- T _current = null; |
- |
- /// The future returned by the most recent call to [moveNext]. |
+ /// Before first call to [moveNext]: The stream to listen to. |
+ /// |
+ /// After calling [moveNext] but before the returned future completes: |
+ /// The returned future. |
/// |
- /// Also used to store the next value/error in case the stream provides an |
- /// event before [moveNext] is called again. In that case, the stream will |
- /// be paused to prevent further events. |
- var/*Future<bool> or T*/ _futureOrPrefetch = null; |
+ /// After calling [moveNext] and the returned future has completed |
+ /// with `true`: The value of [current]. |
+ /// |
+ /// After calling [moveNext] and the returned future has completed |
+ /// with `false`, or after calling [cancel]: `null`. |
+ Object _stateData; |
- /// The current state. |
- int _state = _STATE_FOUND; |
+ /// Whether the iterator is between calls to `moveNext`. |
+ /// This will usually cause the [_subscription] to be paused, but as an |
+ /// optimization, we only pause after the [moveNext] future has been |
+ /// completed. |
+ bool _isPaused = false; |
- _StreamIteratorImpl(final Stream<T> stream) { |
- _subscription = stream.listen(_onData, |
- onError: _onError, |
- onDone: _onDone, |
- cancelOnError: true); |
- } |
+ _StreamIterator(final Stream<T> stream) : _stateData = stream; |
- T get current => _current; |
+ T get current { |
+ if (_subscription != null && _isPaused) { |
+ return _stateData as Object /*=T*/; |
+ } |
+ return null; |
+ } |
Future<bool> moveNext() { |
- if (_state == _STATE_DONE) { |
- return new _Future<bool>.immediate(false); |
- } |
- if (_state == _STATE_MOVING) { |
- throw new StateError("Already waiting for next."); |
- } |
- if (_state == _STATE_FOUND) { |
- _state = _STATE_MOVING; |
- _current = null; |
- var result = new _Future<bool>(); |
- _futureOrPrefetch = result; |
- return result; |
- } else { |
- assert(_state >= _STATE_EXTRA_DATA); |
- switch (_state) { |
- case _STATE_EXTRA_DATA: |
- _state = _STATE_FOUND; |
- _current = _futureOrPrefetch as Object /*=T*/; |
- _futureOrPrefetch = null; |
- _subscription.resume(); |
- return new _Future<bool>.immediate(true); |
- case _STATE_EXTRA_ERROR: |
- AsyncError prefetch = _futureOrPrefetch; |
- _clear(); |
- return new _Future<bool>.immediateError(prefetch.error, |
- prefetch.stackTrace); |
- case _STATE_EXTRA_DONE: |
- _clear(); |
- return new _Future<bool>.immediate(false); |
+ if (_subscription != null) { |
+ if (_isPaused) { |
+ var future = new _Future<bool>(); |
+ _stateData = future; |
+ _isPaused = false; |
+ _subscription.resume(); |
+ return future; |
} |
+ throw new StateError("Already waiting for next."); |
} |
+ return _initializeOrDone(); |
} |
- /** Clears up the internal state when the iterator ends. */ |
- void _clear() { |
- _subscription = null; |
- _futureOrPrefetch = null; |
- _current = null; |
- _state = _STATE_DONE; |
+ /// Called if there is no active subscription when [moveNext] is called. |
+ /// |
+ /// Either starts listening on the stream if this is the first call to |
+ /// [moveNext], or returns a `false` future because the stream has already |
+ /// ended. |
+ Future<bool> _initializeOrDone() { |
+ assert(_subscription == null); |
+ var stateData = _stateData; |
+ if (stateData != null) { |
+ Stream<T> stream = stateData as Object /*=Stream<T>*/; |
+ _subscription = stream.listen( |
+ _onData, onError: _onError, onDone: _onDone, cancelOnError: true); |
+ var future = new _Future<bool>(); |
+ _stateData = future; |
+ return future; |
+ } |
+ return new _Future<bool>.immediate(false); |
} |
Future cancel() { |
- StreamSubscription subscription = _subscription; |
- if (subscription == null) return Future._nullFuture; |
- if (_state == _STATE_MOVING) { |
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
- _clear(); |
- hasNext._complete(false); |
- } else { |
- _clear(); |
+ StreamSubscription<T> subscription = _subscription; |
+ Object stateData = _stateData; |
+ _stateData = null; |
+ if (subscription != null) { |
+ _subscription = null; |
+ if (!_isPaused) { |
+ _Future<bool> future = stateData as Object /*=_Future<bool>*/; |
+ future._asyncComplete(false); |
+ } |
+ return subscription.cancel(); |
} |
- return subscription.cancel(); |
+ return Future._nullFuture; |
} |
void _onData(T data) { |
- if (_state == _STATE_MOVING) { |
- _current = data; |
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
- _futureOrPrefetch = null; |
- _state = _STATE_FOUND; |
- hasNext._complete(true); |
- return; |
- } |
- _subscription.pause(); |
- assert(_futureOrPrefetch == null); |
- _futureOrPrefetch = data; |
- _state = _STATE_EXTRA_DATA; |
+ assert(_subscription != null && !_isPaused); |
+ _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
+ _stateData = data; |
+ _isPaused = true; |
+ moveNextFuture._complete(true); |
+ if (_subscription != null && _isPaused) _subscription.pause(); |
} |
void _onError(Object error, [StackTrace stackTrace]) { |
- if (_state == _STATE_MOVING) { |
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
- // We have cancelOnError: true, so the subscription is canceled. |
- _clear(); |
- hasNext._completeError(error, stackTrace); |
- return; |
- } |
- _subscription.pause(); |
- assert(_futureOrPrefetch == null); |
- _futureOrPrefetch = new AsyncError(error, stackTrace); |
- _state = _STATE_EXTRA_ERROR; |
+ assert(_subscription != null && !_isPaused); |
+ _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
+ _subscription = null; |
+ _stateData = null; |
+ moveNextFuture._completeError(error, stackTrace); |
} |
void _onDone() { |
- if (_state == _STATE_MOVING) { |
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/; |
- _clear(); |
- hasNext._complete(false); |
- return; |
- } |
- _subscription.pause(); |
- _futureOrPrefetch = null; |
- _state = _STATE_EXTRA_DONE; |
+ assert(_subscription != null && !_isPaused); |
+ _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/; |
+ _subscription = null; |
+ _stateData = null; |
+ moveNextFuture._complete(false); |
} |
} |