Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Unified Diff: packages/async/lib/src/stream_queue.dart

Issue 1521693002: Roll Observatory deps (charted -> ^0.3.0) (Closed) Base URL: https://chromium.googlesource.com/external/github.com/dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/async/lib/src/restartable_timer.dart ('k') | packages/async/pubspec.yaml » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/async/lib/src/stream_queue.dart
diff --git a/packages/async/lib/src/stream_queue.dart b/packages/async/lib/src/stream_queue.dart
index 36d03ef13a88d97f07952f6d061dba09017a2e69..09b3a75b2360a9adf2a18ad2715fe2fef08cc656 100644
--- a/packages/async/lib/src/stream_queue.dart
+++ b/packages/async/lib/src/stream_queue.dart
@@ -60,15 +60,17 @@ import "../result.dart";
///
/// When you need no further events the `StreamQueue` should be closed
/// using [cancel]. This releases the underlying stream subscription.
-class StreamQueue<T> {
+abstract class StreamQueue<T> {
// This class maintains two queues: one of events and one of requests.
// The active request (the one in front of the queue) is called with
- // the current event queue when it becomes active.
+ // the current event queue when it becomes active, every time a
+ // new event arrives, and when the event source closes.
//
- // If the request returns true, it's complete and will be removed from the
+ // If the request returns `true`, it's complete and will be removed from the
// request queue.
- // If the request returns false, it needs more events, and will be called
- // again when new events are available.
+ // If the request returns `false`, it needs more events, and will be called
+ // again when new events are available. It may trigger a call itself by
+ // calling [_updateRequests].
// The request can remove events that it uses, or keep them in the event
// queue until it has all that it needs.
//
@@ -77,16 +79,7 @@ class StreamQueue<T> {
// potentially a request that takes either five or zero events, determined
// by the content of the fifth event.
- /// Source of events.
- final Stream _sourceStream;
-
- /// Subscription on [_sourceStream] while listening for events.
- ///
- /// Set to subscription when listening, and set to `null` when the
- /// subscription is done (and [_isDone] is set to true).
- StreamSubscription _subscription;
-
- /// Whether we have listened on [_sourceStream] and the subscription is done.
+ /// Whether the event source is done.
bool _isDone = false;
/// Whether a closing operation has been performed on the stream queue.
@@ -103,8 +96,9 @@ class StreamQueue<T> {
final Queue<_EventRequest> _requestQueue = new Queue();
/// Create a `StreamQueue` of the events of [source].
- StreamQueue(Stream source)
- : _sourceStream = source;
+ factory StreamQueue(Stream source) = _StreamQueue<T>;
+
+ StreamQueue._();
/// Asks if the stream has any more events.
///
@@ -115,6 +109,8 @@ class StreamQueue<T> {
///
/// Can be used before using [next] to avoid getting an error in the
/// future returned by `next` in the case where there are no more events.
+ /// Another alternative is to use `take(1)` which returns either zero or
+ /// one events.
Future<bool> get hasNext {
if (!_isClosed) {
var hasNextRequest = new _HasNextRequest();
@@ -216,15 +212,15 @@ class StreamQueue<T> {
throw _failClosed();
}
- /// Cancels the underlying stream subscription.
+ /// Cancels the underlying event source.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
/// all previously requested events have been processed, then it cancels the
/// subscription providing the events.
///
- /// If [immediate] is `true`, the subscription is instead canceled
- /// immediately. Any pending events complete with a 'closed'-event, as though
- /// the stream had closed by itself.
+ /// If [immediate] is `true`, the source is instead canceled
+ /// immediately. Any pending events are completed as though the underlying
+ /// stream had closed.
///
/// The returned future completes with the result of calling
/// `cancel`.
@@ -242,114 +238,178 @@ class StreamQueue<T> {
return request.future;
}
- if (_isDone) return new Future.value();
- if (_subscription == null) _subscription = _sourceStream.listen(null);
- var future = _subscription.cancel();
- _onDone();
- return future;
+ if (_isDone && _eventQueue.isEmpty) return new Future.value();
+ return _cancel();
}
- /// Returns an error for when a request is made after cancel.
+ // ------------------------------------------------------------------
+ // Methods that may be called from the request implementations to
+ // control the even stream.
+
+ /// Matches events with requests.
///
- /// Returns a [StateError] with a message saying that either
- /// [cancel] or [rest] have already been called.
- Error _failClosed() {
- return new StateError("Already cancelled");
+ /// Called after receiving an event or when the event source closes.
+ ///
+ /// May be called by requests which have returned `false` (saying they
+ /// are not yet done) so they can be checked again before any new
+ /// events arrive.
+ /// Any request returing `false` from `update` when `isDone` is `true`
+ /// *must* call `_updateRequests` when they are ready to continue
+ /// (since no further events will trigger the call).
+ void _updateRequests() {
+ while (_requestQueue.isNotEmpty) {
+ if (_requestQueue.first.update(_eventQueue, _isDone)) {
+ _requestQueue.removeFirst();
+ } else {
+ return;
+ }
+ }
+
+ if (!_isDone) {
+ _pause();
+ }
}
- // Callbacks receiving the events of the source stream.
+ /// Extracts a stream from the event source and makes this stream queue
+ /// unusable.
+ ///
+ /// Can only be used by the very last request (the stream queue must
+ /// be closed by that request).
+ /// Only used by [rest].
+ Stream _extractStream();
- void _onData(T data) {
- _eventQueue.add(new Result.value(data));
- _checkQueues();
- }
+ /// Requests that the event source pauses events.
+ ///
+ /// This is called automatically when the request queue is empty.
+ ///
+ /// The event source is restarted by the next call to [_ensureListening].
+ void _pause();
- void _onError(error, StackTrace stack) {
- _eventQueue.add(new Result.error(error, stack));
- _checkQueues();
+ /// Ensures that we are listening on events from the event source.
+ ///
+ /// Starts listening for the first time or resumes after a [_pause].
+ ///
+ /// Is called automatically if a request requires more events.
+ void _ensureListening();
+
+ /// Cancels the underlying event source.
+ Future _cancel();
+
+ // ------------------------------------------------------------------
+ // Methods called by the event source to add events or say that it's
+ // done.
+
+ /// Called when the event source adds a new data or error event.
+ /// Always calls [_updateRequests] after adding.
+ void _addResult(Result result) {
+ _eventQueue.add(result);
+ _updateRequests();
}
- void _onDone() {
- _subscription = null;
+ /// Called when the event source is done.
+ /// Always calls [_updateRequests] after adding.
+ void _close() {
_isDone = true;
- _closeAllRequests();
+ _updateRequests();
}
- // Request queue management.
+ // ------------------------------------------------------------------
+ // Internal helper methods.
+
+ /// Returns an error for when a request is made after cancel.
+ ///
+ /// Returns a [StateError] with a message saying that either
+ /// [cancel] or [rest] have already been called.
+ Error _failClosed() {
+ return new StateError("Already cancelled");
+ }
/// Adds a new request to the queue.
+ ///
+ /// If the request queue is empty and the request can be completed
+ /// immediately, it skips the queue.
void _addRequest(_EventRequest request) {
- if (_isDone) {
- assert(_requestQueue.isEmpty);
- if (!request.addEvents(_eventQueue)) {
- request.close(_eventQueue);
- }
- return;
- }
if (_requestQueue.isEmpty) {
- if (request.addEvents(_eventQueue)) return;
+ if (request.update(_eventQueue, _isDone)) return;
_ensureListening();
}
_requestQueue.add(request);
}
+}
+
- /// Ensures that we are listening on events from [_sourceStream].
+/// The default implementation of [StreamQueue].
+///
+/// This queue gets its events from a stream which is listened
+/// to when a request needs events.
+class _StreamQueue<T> extends StreamQueue<T> {
+ /// Source of events.
+ final Stream _sourceStream;
+
+ /// Subscription on [_sourceStream] while listening for events.
///
- /// Resumes subscription on [_sourceStream], or creates it if necessary.
+ /// Set to subscription when listening, and set to `null` when the
+ /// subscription is done (and [_isDone] is set to true).
+ StreamSubscription _subscription;
+
+ _StreamQueue(this._sourceStream) : super._();
+
+ Future _cancel() {
+ if (_isDone) return null;
+ if (_subscription == null) _subscription = _sourceStream.listen(null);
+ var future = _subscription.cancel();
+ _close();
+ return future;
+ }
+
void _ensureListening() {
assert(!_isDone);
if (_subscription == null) {
_subscription =
- _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
+ _sourceStream.listen(
+ (data) {
+ _addResult(new Result.value(data));
+ },
+ onError: (error, StackTrace stackTrace) {
+ _addResult(new Result.error(error, stackTrace));
+ },
+ onDone: () {
+ _subscription = null;
+ this._close();
+ });
} else {
_subscription.resume();
}
}
- /// Removes all requests and closes them.
- ///
- /// Used when the source stream is done.
- /// After this, no further requests will be added to the queue,
- /// requests are immediately served entirely by events already in the event
- /// queue, if any.
- void _closeAllRequests() {
- assert(_isDone);
- while (_requestQueue.isNotEmpty) {
- var request = _requestQueue.removeFirst();
- if (!request.addEvents(_eventQueue)) {
- request.close(_eventQueue);
- }
- }
+ void _pause() {
+ _subscription.pause();
}
- /// Matches events with requests.
- ///
- /// Called after receiving an event.
- void _checkQueues() {
- while (_requestQueue.isNotEmpty) {
- if (_requestQueue.first.addEvents(_eventQueue)) {
- _requestQueue.removeFirst();
- } else {
- return;
- }
+ Stream<T> _extractStream() {
+ assert(_isClosed);
+ if (_isDone) {
+ return new Stream<T>.empty();
}
- if (!_isDone) {
- _subscription.pause();
+
+ if (_subscription == null) {
+ return _sourceStream;
}
- }
- /// Extracts the subscription and makes this stream queue unusable.
- ///
- /// Can only be used by the very last request.
- StreamSubscription _dispose() {
- assert(_isClosed);
var subscription = _subscription;
_subscription = null;
_isDone = true;
- return subscription;
+
+ var wasPaused = subscription.isPaused;
+ var result = new SubscriptionStream<T>(subscription);
+ // Resume after creating stream because that pauses the subscription too.
+ // This way there won't be a short resumption in the middle.
+ if (wasPaused) subscription.resume();
+ return result;
}
}
+
/// Request object that receives events when they arrive, until fulfilled.
///
/// Each request that cannot be fulfilled immediately is represented by
@@ -367,7 +427,7 @@ class StreamQueue<T> {
abstract class _EventRequest {
/// Handle available events.
///
- /// The available events are provided as a queue. The `addEvents` function
+ /// The available events are provided as a queue. The `update` function
/// should only remove events from the front of the event queue, e.g.,
/// using [removeFirst].
///
@@ -382,22 +442,10 @@ abstract class _EventRequest {
/// This method is called when a request reaches the front of the request
/// queue, and if it returns `false`, it's called again every time a new event
/// becomes available, or when the stream closes.
- bool addEvents(Queue<Result> events);
-
- /// Complete the request.
- ///
- /// This is called when the source stream is done before the request
- /// had a chance to receive all its events. That is, after a call
- /// to [addEvents] has returned `false`.
- /// If there are any unused events available, they are in the [events] queue.
- /// No further events will become available.
- ///
- /// The queue should only remove events from the front of the event queue,
- /// e.g., using [removeFirst].
- ///
- /// If the request kept events in the queue after an [addEvents] call,
- /// this is the last chance to use them.
- void close(Queue<Result> events);
+ /// If the function returns `false` when the stream has already closed
+ /// ([isDone] is true), then the request must call
+ /// [StreamQueue._updateRequests] itself when it's ready to continue.
+ bool update(Queue<Result> events, bool isDone);
}
/// Request for a [StreamQueue.next] call.
@@ -412,16 +460,18 @@ class _NextRequest<T> implements _EventRequest {
Future<T> get future => _completer.future;
- bool addEvents(Queue<Result> events) {
- if (events.isEmpty) return false;
- events.removeFirst().complete(_completer);
- return true;
- }
-
- void close(Queue<Result> events) {
- var errorFuture =
- new Future.sync(() => throw new StateError("No elements"));
- _completer.complete(errorFuture);
+ bool update(Queue<Result> events, bool isDone) {
+ if (events.isNotEmpty) {
+ events.removeFirst().complete(_completer);
+ return true;
+ }
+ if (isDone) {
+ var errorFuture =
+ new Future.sync(() => throw new StateError("No elements"));
+ _completer.complete(errorFuture);
+ return true;
+ }
+ return false;
}
}
@@ -443,22 +493,22 @@ class _SkipRequest implements _EventRequest {
/// The future completed when the correct number of events have been skipped.
Future get future => _completer.future;
- bool addEvents(Queue<Result> events) {
+ bool update(Queue<Result> events, bool isDone) {
while (_eventsToSkip > 0) {
- if (events.isEmpty) return false;
+ if (events.isEmpty) {
+ if (isDone) break;
+ return false;
+ }
_eventsToSkip--;
+
var event = events.removeFirst();
if (event.isError) {
event.complete(_completer);
return true;
}
}
- _completer.complete(0);
- return true;
- }
-
- void close(Queue<Result> events) {
_completer.complete(_eventsToSkip);
+ return true;
}
}
@@ -481,9 +531,13 @@ class _TakeRequest<T> implements _EventRequest {
/// The future completed when the correct number of events have been captured.
Future get future => _completer.future;
- bool addEvents(Queue<Result> events) {
+ bool update(Queue<Result> events, bool isDone) {
while (_list.length < _eventsToTake) {
- if (events.isEmpty) return false;
+ if (events.isEmpty) {
+ if (isDone) break;
+ return false;
+ }
+
var result = events.removeFirst();
if (result.isError) {
result.complete(_completer);
@@ -494,10 +548,6 @@ class _TakeRequest<T> implements _EventRequest {
_completer.complete(_list);
return true;
}
-
- void close(Queue<Result> events) {
- _completer.complete(_list);
- }
}
/// Request for a [StreamQueue.cancel] call.
@@ -520,22 +570,14 @@ class _CancelRequest implements _EventRequest {
/// The future completed when the cancel request is completed.
Future get future => _completer.future;
- bool addEvents(Queue<Result> events) {
- _shutdown();
- return true;
- }
-
- void close(_) {
- _shutdown();
- }
-
- void _shutdown() {
+ bool update(Queue<Result> events, bool isDone) {
if (_streamQueue._isDone) {
_completer.complete();
} else {
_streamQueue._ensureListening();
- _completer.complete(_streamQueue._dispose().cancel());
+ _completer.complete(_streamQueue._extractStream().listen(null).cancel());
}
+ return true;
}
}
@@ -559,21 +601,12 @@ class _RestRequest<T> implements _EventRequest {
/// The stream which will contain the remaining events of [_streamQueue].
Stream<T> get stream => _completer.stream;
- bool addEvents(Queue<Result> events) {
- _completeStream(events);
- return true;
- }
-
- void close(Queue<Result> events) {
- _completeStream(events);
- }
-
- void _completeStream(Queue<Result> events) {
+ bool update(Queue<Result> events, bool isDone) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
} else {
- _completer.setSourceStream(_getRestStream());
+ _completer.setSourceStream(_streamQueue._extractStream());
}
} else {
// There are prefetched events which needs to be added before the
@@ -582,26 +615,11 @@ class _RestRequest<T> implements _EventRequest {
for (var event in events) {
event.addTo(controller);
}
- controller.addStream(_getRestStream(), cancelOnError: false)
+ controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
.whenComplete(controller.close);
_completer.setSourceStream(controller.stream);
}
- }
-
- /// Create a stream from the rest of [_streamQueue]'s subscription.
- Stream _getRestStream() {
- if (_streamQueue._isDone) {
- var controller = new StreamController<T>()..close();
- return controller.stream;
- // TODO(lrn). Use the following when 1.11 is released.
- // return new Stream<T>.empty();
- }
- if (_streamQueue._subscription == null) {
- return _streamQueue._sourceStream;
- }
- var subscription = _streamQueue._dispose();
- subscription.resume();
- return new SubscriptionStream<T>(subscription);
+ return true;
}
}
@@ -616,15 +634,15 @@ class _HasNextRequest<T> implements _EventRequest {
Future<bool> get future => _completer.future;
- bool addEvents(Queue<Result> events) {
+ bool update(Queue<Result> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
}
+ if (isDone) {
+ _completer.complete(false);
+ return true;
+ }
return false;
}
-
- void close(_) {
- _completer.complete(false);
- }
}
« no previous file with comments | « packages/async/lib/src/restartable_timer.dart ('k') | packages/async/pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698