Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(116)

Unified Diff: sdk/lib/async/stream_impl.dart

Issue 2149893002: Make StreamIterator not delay pausing between requests. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Address comment. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/language/async_star_pause_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream_impl.dart
diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart
index cb2e20d0f1373c8ec59700d85e76e81e2dc91f5a..7f894644bb5e881614f24bb53e3dce97eb69d22f 100644
--- a/sdk/lib/async/stream_impl.dart
+++ b/sdk/lib/async/stream_impl.dart
@@ -86,7 +86,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
static const int _STATE_IN_CALLBACK = 32;
static const int _STATE_HAS_PENDING = 64;
static const int _STATE_PAUSE_COUNT = 128;
- static const int _STATE_PAUSE_COUNT_SHIFT = 7;
/* Event handlers provided in constructor. */
_DataHandler<T> _onData;
@@ -239,15 +238,6 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
}
/**
- * Increment the pause count.
- *
- * Also marks input as paused.
- */
- void _incrementPauseCount() {
- _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
- }
-
- /**
* Decrements the pause count.
*
* Does not automatically unpause the input (call [_onResume]) when
@@ -722,25 +712,6 @@ class _StreamImplEvents<T> extends _PendingEvents<T> {
}
}
-class _BroadcastLinkedList {
- _BroadcastLinkedList _next;
- _BroadcastLinkedList _previous;
-
- void _unlink() {
- _previous._next = _next;
- _next._previous = _previous;
- _next = _previous = this;
- }
-
- void _insertBefore(_BroadcastLinkedList newNext) {
- _BroadcastLinkedList newPrevious = newNext._previous;
- newPrevious._next = this;
- newNext._previous = _previous;
- _previous._next = newNext;
- _previous = newPrevious;
- }
-}
-
typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
/**
@@ -941,156 +912,139 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
/**
* Simple implementation of [StreamIterator].
+ *
+ * Pauses the stream between calls to [moveNext].
*/
-class _StreamIteratorImpl<T> implements StreamIterator<T> {
- // Internal state of the stream iterator.
- // At any time, it is in one of these states.
- // The interpretation of the [_futureOrPrefecth] field depends on the state.
- // In _STATE_MOVING, the _data field holds the most recently returned
- // future.
- // When in one of the _STATE_EXTRA_* states, the it may hold the
- // next data/error object, and the subscription is paused.
-
- /// The simple state where [_data] holds the data to return, and [moveNext]
- /// is allowed. The subscription is actively listening.
- static const int _STATE_FOUND = 0;
- /// State set after [moveNext] has returned false or an error,
- /// or after calling [cancel]. The subscription is always canceled.
- static const int _STATE_DONE = 1;
- /// State set after calling [moveNext], but before its returned future has
- /// completed. Calling [moveNext] again is not allowed in this state.
- /// The subscription is actively listening.
- static const int _STATE_MOVING = 2;
- /// States set when another event occurs while in _STATE_FOUND.
- /// This extra overflow event is cached until the next call to [moveNext],
- /// which will complete as if it received the event normally.
- /// The subscription is paused in these states, so we only ever get one
- /// event too many.
- static const int _STATE_EXTRA_DATA = 3;
- static const int _STATE_EXTRA_ERROR = 4;
- static const int _STATE_EXTRA_DONE = 5;
+class _StreamIterator<T> implements StreamIterator<T> {
+ // The stream iterator is always in one of four states.
+ // The value of the [_stateData] field depends on the state.
+ //
+ // When `_subscription == null` and `_stateData != null`:
+ // The stream iterator has been created, but [moveNext] has not been called
+ // yet. The [_stateData] field contains the stream to listen to on the first
+ // call to [moveNext] and [current] returns `null`.
+ //
+ // When `_subscription != null` and `!_isPaused`:
+ // The user has called [moveNext] and the iterator is waiting for the next
+ // event. The [_stateData] field contains the [_Future] returned by the
+ // [_moveNext] call and [current] returns `null.`
+ //
+ // When `_subscription != null` and `_isPaused`:
+ // The most recent call to [moveNext] has completed with a `true` value
+ // and [current] provides the value of the data event.
+ // The [_stateData] field contains the [current] value.
+ //
+ // When `_subscription == null` and `_stateData == null`:
+ // The stream has completed or been canceled using [cancel].
+ // The stream completes on either a done event or an error event.
+ // The last call to [moveNext] has completed with `false` and [current]
+ // returns `null`.
/// Subscription being listened to.
+ ///
+ /// Set to `null` when the stream subscription is done or canceled.
StreamSubscription _subscription;
- /// The current element represented by the most recent call to moveNext.
+ /// Data value depending on the current state.
///
- /// Is null between the time moveNext is called and its future completes.
- T _current = null;
-
- /// The future returned by the most recent call to [moveNext].
+ /// Before first call to [moveNext]: The stream to listen to.
+ ///
+ /// After calling [moveNext] but before the returned future completes:
+ /// The returned future.
///
- /// Also used to store the next value/error in case the stream provides an
- /// event before [moveNext] is called again. In that case, the stream will
- /// be paused to prevent further events.
- var/*Future<bool> or T*/ _futureOrPrefetch = null;
+ /// After calling [moveNext] and the returned future has completed
+ /// with `true`: The value of [current].
+ ///
+ /// After calling [moveNext] and the returned future has completed
+ /// with `false`, or after calling [cancel]: `null`.
+ Object _stateData;
- /// The current state.
- int _state = _STATE_FOUND;
+ /// Whether the iterator is between calls to `moveNext`.
+ /// This will usually cause the [_subscription] to be paused, but as an
+ /// optimization, we only pause after the [moveNext] future has been
+ /// completed.
+ bool _isPaused = false;
- _StreamIteratorImpl(final Stream<T> stream) {
- _subscription = stream.listen(_onData,
- onError: _onError,
- onDone: _onDone,
- cancelOnError: true);
- }
+ _StreamIterator(final Stream<T> stream) : _stateData = stream;
- T get current => _current;
+ T get current {
+ if (_subscription != null && _isPaused) {
+ return _stateData as Object /*=T*/;
+ }
+ return null;
+ }
Future<bool> moveNext() {
- if (_state == _STATE_DONE) {
- return new _Future<bool>.immediate(false);
- }
- if (_state == _STATE_MOVING) {
- throw new StateError("Already waiting for next.");
- }
- if (_state == _STATE_FOUND) {
- _state = _STATE_MOVING;
- _current = null;
- var result = new _Future<bool>();
- _futureOrPrefetch = result;
- return result;
- } else {
- assert(_state >= _STATE_EXTRA_DATA);
- switch (_state) {
- case _STATE_EXTRA_DATA:
- _state = _STATE_FOUND;
- _current = _futureOrPrefetch as Object /*=T*/;
- _futureOrPrefetch = null;
- _subscription.resume();
- return new _Future<bool>.immediate(true);
- case _STATE_EXTRA_ERROR:
- AsyncError prefetch = _futureOrPrefetch;
- _clear();
- return new _Future<bool>.immediateError(prefetch.error,
- prefetch.stackTrace);
- case _STATE_EXTRA_DONE:
- _clear();
- return new _Future<bool>.immediate(false);
+ if (_subscription != null) {
+ if (_isPaused) {
+ var future = new _Future<bool>();
+ _stateData = future;
+ _isPaused = false;
+ _subscription.resume();
+ return future;
}
+ throw new StateError("Already waiting for next.");
}
+ return _initializeOrDone();
}
- /** Clears up the internal state when the iterator ends. */
- void _clear() {
- _subscription = null;
- _futureOrPrefetch = null;
- _current = null;
- _state = _STATE_DONE;
+ /// Called if there is no active subscription when [moveNext] is called.
+ ///
+ /// Either starts listening on the stream if this is the first call to
+ /// [moveNext], or returns a `false` future because the stream has already
+ /// ended.
+ Future<bool> _initializeOrDone() {
+ assert(_subscription == null);
+ var stateData = _stateData;
+ if (stateData != null) {
+ Stream<T> stream = stateData as Object /*=Stream<T>*/;
+ _subscription = stream.listen(
+ _onData, onError: _onError, onDone: _onDone, cancelOnError: true);
+ var future = new _Future<bool>();
+ _stateData = future;
+ return future;
+ }
+ return new _Future<bool>.immediate(false);
}
Future cancel() {
- StreamSubscription subscription = _subscription;
- if (subscription == null) return Future._nullFuture;
- if (_state == _STATE_MOVING) {
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
- _clear();
- hasNext._complete(false);
- } else {
- _clear();
+ StreamSubscription<T> subscription = _subscription;
+ Object stateData = _stateData;
+ _stateData = null;
+ if (subscription != null) {
+ _subscription = null;
+ if (!_isPaused) {
+ _Future<bool> future = stateData as Object /*=_Future<bool>*/;
+ future._asyncComplete(false);
+ }
+ return subscription.cancel();
}
- return subscription.cancel();
+ return Future._nullFuture;
}
void _onData(T data) {
- if (_state == _STATE_MOVING) {
- _current = data;
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
- _futureOrPrefetch = null;
- _state = _STATE_FOUND;
- hasNext._complete(true);
- return;
- }
- _subscription.pause();
- assert(_futureOrPrefetch == null);
- _futureOrPrefetch = data;
- _state = _STATE_EXTRA_DATA;
+ assert(_subscription != null && !_isPaused);
+ _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
+ _stateData = data;
+ _isPaused = true;
+ moveNextFuture._complete(true);
+ if (_subscription != null && _isPaused) _subscription.pause();
}
void _onError(Object error, [StackTrace stackTrace]) {
- if (_state == _STATE_MOVING) {
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
- // We have cancelOnError: true, so the subscription is canceled.
- _clear();
- hasNext._completeError(error, stackTrace);
- return;
- }
- _subscription.pause();
- assert(_futureOrPrefetch == null);
- _futureOrPrefetch = new AsyncError(error, stackTrace);
- _state = _STATE_EXTRA_ERROR;
+ assert(_subscription != null && !_isPaused);
+ _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
+ _subscription = null;
+ _stateData = null;
+ moveNextFuture._completeError(error, stackTrace);
}
void _onDone() {
- if (_state == _STATE_MOVING) {
- _Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
- _clear();
- hasNext._complete(false);
- return;
- }
- _subscription.pause();
- _futureOrPrefetch = null;
- _state = _STATE_EXTRA_DONE;
+ assert(_subscription != null && !_isPaused);
+ _Future<bool> moveNextFuture = _stateData as Object /*=_Future<bool>*/;
+ _subscription = null;
+ _stateData = null;
+ moveNextFuture._complete(false);
}
}
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/language/async_star_pause_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698