Chromium Code Reviews (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out

Unified Diff: lib/src/util/stream_queue.dart

Issue 1262623006: Temporarily bring in code from the async package. (Closed) Base URL:
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/util/stream_queue.dart
diff --git a/lib/src/util/stream_queue.dart b/lib/src/util/stream_queue.dart
new file mode 100644
index 0000000000000000000000000000000000000000..dc7c7840727481e5e4f6160db39981e3540918d6
--- /dev/null
+++ b/lib/src/util/stream_queue.dart
@@ -0,0 +1,699 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+// TODO(nweiz): Get rid of this when
+// lands.
+library test.util.forkable_stream_queue;
+import 'dart:async';
+import 'dart:collection';
+import "package:async/async.dart" hide ForkableStream, StreamQueue;
+import "forkable_stream.dart";
+/// An asynchronous pull-based interface for accessing stream events.
+/// Wraps a stream and makes individual events available on request.
+/// You can request (and reserve) one or more events from the stream,
+/// and after all previous requests have been fulfilled, stream events
+/// go towards fulfilling your request.
+/// For example, if you ask for [next] two times, the returned futures
+/// will be completed by the next two unrequested events from the stream.
+/// The stream subscription is paused when there are no active
+/// requests.
+/// Some streams, including broadcast streams, will buffer
+/// events while paused, so waiting too long between requests may
+/// cause memory bloat somewhere else.
+/// This is similar to, but more convenient than, a [StreamIterator].
+/// A `StreamIterator` requires you to manually check when a new event is
+/// available and you can only access the value of that event until you
+/// check for the next one. A `StreamQueue` allows you to request, for example,
+/// three events at a time, either individually, as a group using [take]
+/// or [skip], or in any combination.
+/// You can also ask to have the [rest] of the stream provided as
+/// a new stream. This allows, for example, taking the first event
+/// out of a stream and continuing to use the rest of the stream as a stream.
+/// Example:
+/// var events = new StreamQueue<String>(someStreamOfLines);
+/// var first = await;
+/// while (first.startsWith('#')) {
+/// // Skip comments.
+/// first = await;
+/// }
+/// if (first.startsWith(MAGIC_MARKER)) {
+/// var headerCount =
+/// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
+/// handleMessage(headers: await events.take(headerCount),
+/// body:;
+/// return;
+/// }
+/// // Error handling.
+/// When you need no further events the `StreamQueue` should be closed
+/// using [cancel]. This releases the underlying stream subscription.
+class StreamQueue<T> {
+ // This class maintains two queues: one of events and one of requests.
+ // The active request (the one in front of the queue) is called with
+ // the current event queue when it becomes active.
+ //
+ // If the request returns true, it's complete and will be removed from the
+ // request queue.
+ // If the request returns false, it needs more events, and will be called
+ // again when new events are available.
+ // The request can remove events that it uses, or keep them in the event
+ // queue until it has all that it needs.
+ //
+ // This model is very flexible and easily extensible.
+ // It allows requests that don't consume events (like [hasNext]) or
+ // potentially a request that takes either five or zero events, determined
+ // by the content of the fifth event.
+ /// Source of events.
+ final ForkableStream _sourceStream;
+ /// Subscription on [_sourceStream] while listening for events.
+ ///
+ /// Set to subscription when listening, and set to `null` when the
+ /// subscription is done (and [_isDone] is set to true).
+ StreamSubscription _subscription;
+ /// Whether we have listened on [_sourceStream] and the subscription is done.
+ bool _isDone = false;
+ /// Whether a closing operation has been performed on the stream queue.
+ ///
+ /// Closing operations are [cancel] and [rest].
+ bool _isClosed = false;
+ /// Queue of events not used by a request yet.
+ final Queue<Result> _eventQueue = new Queue();
+ /// Queue of pending requests.
+ ///
+ /// Access through methods below to ensure consistency.
+ final Queue<_EventRequest> _requestQueue = new Queue();
+ /// Create a `StreamQueue` of the events of [source].
+ StreamQueue(Stream source)
+ : _sourceStream = source is ForkableStream
+ ? source
+ : new ForkableStream(source);
+ /// Asks if the stream has any more events.
+ ///
+ /// Returns a future that completes with `true` if the stream has any
+ /// more events, whether data or error.
+ /// If the stream closes without producing any more events, the returned
+ /// future completes with `false`.
+ ///
+ /// Can be used before using [next] to avoid getting an error in the
+ /// future returned by `next` in the case where there are no more events.
+ Future<bool> get hasNext {
+ if (!_isClosed) {
+ var hasNextRequest = new _HasNextRequest();
+ _addRequest(hasNextRequest);
+ return hasNextRequest.future;
+ }
+ throw _failClosed();
+ }
+ /// Requests the next (yet unrequested) event from the stream.
+ ///
+ /// When the requested event arrives, the returned future is completed with
+ /// the event.
+ /// If the event is a data event, the returned future completes
+ /// with its value.
+ /// If the event is an error event, the returned future completes with
+ /// its error and stack trace.
+ /// If the stream closes before an event arrives, the returned future
+ /// completes with a [StateError].
+ ///
+ /// It's possible to have several pending [next] calls (or other requests),
+ /// and they will be completed in the order they were requested, by the
+ /// first events that were not consumed by previous requeusts.
+ Future<T> get next {
+ if (!_isClosed) {
+ var nextRequest = new _NextRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
+ }
+ throw _failClosed();
+ }
+ /// Returns a stream of all the remaning events of the source stream.
+ ///
+ /// All requested [next], [skip] or [take] operations are completed
+ /// first, and then any remaining events are provided as events of
+ /// the returned stream.
+ ///
+ /// Using `rest` closes this stream queue. After getting the
+ /// `rest` the caller may no longer request other events, like
+ /// after calling [cancel].
+ Stream<T> get rest {
+ if (_isClosed) {
+ throw _failClosed();
+ }
+ var request = new _RestRequest<T>(this);
+ _isClosed = true;
+ _addRequest(request);
+ return;
+ }
+ /// Skips the next [count] *data* events.
+ ///
+ /// The [count] must be non-negative.
+ ///
+ /// When successful, this is equivalent to using [take]
+ /// and ignoring the result.
+ ///
+ /// If an error occurs before `count` data events have been skipped,
+ /// the returned future completes with that error instead.
+ ///
+ /// If the stream closes before `count` data events,
+ /// the remaining unskipped event count is returned.
+ /// If the returned future completes with the integer `0`,
+ /// then all events were succssfully skipped. If the value
+ /// is greater than zero then the stream ended early.
+ Future<int> skip(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ var request = new _SkipRequest(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+ /// Requests the next [count] data events as a list.
+ ///
+ /// The [count] must be non-negative.
+ ///
+ /// Equivalent to calling [next] `count` times and
+ /// storing the data values in a list.
+ ///
+ /// If an error occurs before `count` data events has
+ /// been collected, the returned future completes with
+ /// that error instead.
+ ///
+ /// If the stream closes before `count` data events,
+ /// the returned future completes with the list
+ /// of data collected so far. That is, the returned
+ /// list may have fewer than [count] elements.
+ Future<List<T>> take(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ var request = new _TakeRequest<T>(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+ /// Creates a new stream queue in the same position as this one.
+ ///
+ /// The fork is subscribed to the same underlying stream as this queue, but
+ /// it's otherwise wholly independent. If requests are made on one, they don't
+ /// move the other forward; if one is closed, the other is still open.
+ ///
+ /// The underlying stream will only be paused when all forks have no
+ /// outstanding requests, and only canceled when all forks are canceled.
+ StreamQueue<T> fork() {
+ if (_isClosed) throw _failClosed();
+ var request = new _ForkRequest<T>(this);
+ _addRequest(request);
+ return request.queue;
+ }
+ /// Cancels the underlying stream subscription.
+ ///
+ /// If [immediate] is `false` (the default), the cancel operation waits until
+ /// all previously requested events have been processed, then it cancels the
+ /// subscription providing the events.
+ ///
+ /// If [immediate] is `true`, the subscription is instead canceled
+ /// immediately. Any pending events complete with a 'closed'-event, as though
+ /// the stream had closed by itself.
+ ///
+ /// The returned future completes with the result of calling
+ /// `cancel`.
+ ///
+ /// After calling `cancel`, no further events can be requested.
+ /// None of [next], [rest], [skip], [take] or [cancel] may be
+ /// called again.
+ Future cancel({bool immediate: false}) {
+ if (_isClosed) throw _failClosed();
+ _isClosed = true;
+ if (_isDone) return new Future.value();
+ if (_subscription == null) _subscription = _sourceStream.listen(null);
+ if (!immediate) {
+ var request = new _CancelRequest(this);
+ _addRequest(request);
+ return request.future;
+ }
+ var future = _subscription.cancel();
+ _onDone();
+ return future;
+ }
+ /// Returns an error for when a request is made after cancel.
+ ///
+ /// Returns a [StateError] with a message saying that either
+ /// [cancel] or [rest] have already been called.
+ Error _failClosed() {
+ return new StateError("Already cancelled");
+ }
+ // Callbacks receiving the events of the source stream.
+ void _onData(T data) {
+ _eventQueue.add(new Result.value(data));
+ _checkQueues();
+ }
+ void _onError(error, StackTrace stack) {
+ _eventQueue.add(new Result.error(error, stack));
+ _checkQueues();
+ }
+ void _onDone() {
+ _subscription = null;
+ _isDone = true;
+ _closeAllRequests();
+ }
+ // Request queue management.
+ /// Adds a new request to the queue.
+ void _addRequest(_EventRequest request) {
+ if (_isDone) {
+ assert(_requestQueue.isEmpty);
+ if (!request.addEvents(_eventQueue)) {
+ request.close(_eventQueue);
+ }
+ return;
+ }
+ if (_requestQueue.isEmpty) {
+ if (request.addEvents(_eventQueue)) return;
+ _ensureListening();
+ }
+ _requestQueue.add(request);
+ }
+ /// Ensures that we are listening on events from [_sourceStream].
+ ///
+ /// Resumes subscription on [_sourceStream], or creates it if necessary.
+ void _ensureListening() {
+ assert(!_isDone);
+ if (_subscription == null) {
+ _subscription =
+ _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
+ } else {
+ _subscription.resume();
+ }
+ }
+ /// Removes all requests and closes them.
+ ///
+ /// Used when the source stream is done.
+ /// After this, no further requests will be added to the queue,
+ /// requests are immediately served entirely by events already in the event
+ /// queue, if any.
+ void _closeAllRequests() {
+ assert(_isDone);
+ while (_requestQueue.isNotEmpty) {
+ var request = _requestQueue.removeFirst();
+ if (!request.addEvents(_eventQueue)) {
+ request.close(_eventQueue);
+ }
+ }
+ }
+ /// Matches events with requests.
+ ///
+ /// Called after receiving an event.
+ void _checkQueues() {
+ while (_requestQueue.isNotEmpty) {
+ if (_requestQueue.first.addEvents(_eventQueue)) {
+ _requestQueue.removeFirst();
+ } else {
+ return;
+ }
+ }
+ if (!_isDone) {
+ _subscription.pause();
+ }
+ }
+ /// Extracts the subscription and makes this stream queue unusable.
+ ///
+ /// Can only be used by the very last request.
+ StreamSubscription _dispose() {
+ assert(_isClosed);
+ var subscription = _subscription;
+ _subscription = null;
+ _isDone = true;
+ return subscription;
+ }
+/// Request object that receives events when they arrive, until fulfilled.
+/// Each request that cannot be fulfilled immediately is represented by
+/// an `_EventRequest` object in the request queue.
+/// Events from the source stream are sent to the first request in the
+/// queue until it reports itself as [isComplete].
+/// When the first request in the queue `isComplete`, either when becoming
+/// the first request or after receiving an event, its [close] methods is
+/// called.
+/// The [close] method is also called immediately when the source stream
+/// is done.
+abstract class _EventRequest {
+ /// Handle available events.
+ ///
+ /// The available events are provided as a queue. The `addEvents` function
+ /// should only remove events from the front of the event queue, e.g.,
+ /// using [removeFirst].
+ ///
+ /// Returns `true` if the request is completed, or `false` if it needs
+ /// more events.
+ /// The call may keep events in the queue until the requeust is complete,
+ /// or it may remove them immediately.
+ ///
+ /// If the method returns true, the request is considered fulfilled, and
+ /// will never be called again.
+ ///
+ /// This method is called when a request reaches the front of the request
+ /// queue, and if it returns `false`, it's called again every time a new event
+ /// becomes available, or when the stream closes.
+ bool addEvents(Queue<Result> events);
+ /// Complete the request.
+ ///
+ /// This is called when the source stream is done before the request
+ /// had a chance to receive all its events. That is, after a call
+ /// to [addEvents] has returned `false`.
+ /// If there are any unused events available, they are in the [events] queue.
+ /// No further events will become available.
+ ///
+ /// The queue should only remove events from the front of the event queue,
+ /// e.g., using [removeFirst].
+ ///
+ /// If the request kept events in the queue after an [addEvents] call,
+ /// this is the last chance to use them.
+ void close(Queue<Result> events);
+/// Request for a [] call.
+/// Completes the returned future when receiving the first event,
+/// and is then complete.
+class _NextRequest<T> implements _EventRequest {
+ /// Completer for the future returned by [].
+ final Completer _completer;
+ _NextRequest() : _completer = new Completer<T>();
+ Future<T> get future => _completer.future;
+ bool addEvents(Queue<Result> events) {
+ if (events.isEmpty) return false;
+ events.removeFirst().complete(_completer);
+ return true;
+ }
+ void close(Queue<Result> events) {
+ var errorFuture =
+ new Future.sync(() => throw new StateError("No elements"));
+ _completer.complete(errorFuture);
+ }
+/// Request for a [StreamQueue.skip] call.
+class _SkipRequest implements _EventRequest {
+ /// Completer for the future returned by the skip call.
+ final Completer _completer = new Completer<int>();
+ /// Number of remaining events to skip.
+ ///
+ /// The request [isComplete] when the values reaches zero.
+ ///
+ /// Decremented when an event is seen.
+ /// Set to zero when an error is seen since errors abort the skip request.
+ int _eventsToSkip;
+ _SkipRequest(this._eventsToSkip);
+ /// The future completed when the correct number of events have been skipped.
+ Future get future => _completer.future;
+ bool addEvents(Queue<Result> events) {
+ while (_eventsToSkip > 0) {
+ if (events.isEmpty) return false;
+ _eventsToSkip--;
+ var event = events.removeFirst();
+ if (event.isError) {
+ event.complete(_completer);
+ return true;
+ }
+ }
+ _completer.complete(0);
+ return true;
+ }
+ void close(Queue<Result> events) {
+ _completer.complete(_eventsToSkip);
+ }
+/// Request for a [StreamQueue.take] call.
+class _TakeRequest<T> implements _EventRequest {
+ /// Completer for the future returned by the take call.
+ final Completer _completer;
+ /// List collecting events until enough have been seen.
+ final List _list = <T>[];
+ /// Number of events to capture.
+ ///
+ /// The request [isComplete] when the length of [_list] reaches
+ /// this value.
+ final int _eventsToTake;
+ _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
+ /// The future completed when the correct number of events have been captured.
+ Future get future => _completer.future;
+ bool addEvents(Queue<Result> events) {
+ while (_list.length < _eventsToTake) {
+ if (events.isEmpty) return false;
+ var result = events.removeFirst();
+ if (result.isError) {
+ result.complete(_completer);
+ return true;
+ }
+ _list.add(result.asValue.value);
+ }
+ _completer.complete(_list);
+ return true;
+ }
+ void close(Queue<Result> events) {
+ _completer.complete(_list);
+ }
+/// Request for a [StreamQueue.cancel] call.
+/// The request needs no events, it just waits in the request queue
+/// until all previous events are fulfilled, then it cancels the stream queue
+/// source subscription.
+class _CancelRequest implements _EventRequest {
+ /// Completer for the future returned by the `cancel` call.
+ final Completer _completer = new Completer();
+ /// The [StreamQueue] object that has this request queued.
+ ///
+ /// When the event is completed, it needs to cancel the active subscription
+ /// of the `StreamQueue` object, if any.
+ final StreamQueue _streamQueue;
+ _CancelRequest(this._streamQueue);
+ /// The future completed when the cancel request is completed.
+ Future get future => _completer.future;
+ bool addEvents(Queue<Result> events) {
+ _shutdown();
+ return true;
+ }
+ void close(_) {
+ _shutdown();
+ }
+ void _shutdown() {
+ if (_streamQueue._isDone) {
+ _completer.complete();
+ } else {
+ _streamQueue._ensureListening();
+ _completer.complete(_streamQueue._dispose().cancel());
+ }
+ }
+/// Request for a [] call.
+/// The request is always complete, it just waits in the request queue
+/// until all previous events are fulfilled, then it takes over the
+/// stream events subscription and creates a stream from it.
+class _RestRequest<T> implements _EventRequest {
+ /// Completer for the stream returned by the `rest` call.
+ final StreamCompleter _completer = new StreamCompleter<T>();
+ /// The [StreamQueue] object that has this request queued.
+ ///
+ /// When the event is completed, it needs to cancel the active subscription
+ /// of the `StreamQueue` object, if any.
+ final StreamQueue _streamQueue;
+ _RestRequest(this._streamQueue);
+ /// The stream which will contain the remaining events of [_streamQueue].
+ Stream<T> get stream =>;
+ bool addEvents(Queue<Result> events) {
+ _completeStream(events);
+ return true;
+ }
+ void close(Queue<Result> events) {
+ _completeStream(events);
+ }
+ void _completeStream(Queue<Result> events) {
+ if (events.isEmpty) {
+ if (_streamQueue._isDone) {
+ _completer.setEmpty();
+ } else {
+ _completer.setSourceStream(_getRestStream());
+ }
+ } else {
+ // There are prefetched events which needs to be added before the
+ // remaining stream.
+ var controller = new StreamController<T>();
+ for (var event in events) {
+ event.addTo(controller);
+ }
+ controller.addStream(_getRestStream(), cancelOnError: false)
+ .whenComplete(controller.close);
+ _completer.setSourceStream(;
+ }
+ }
+ /// Create a stream from the rest of [_streamQueue]'s subscription.
+ Stream _getRestStream() {
+ if (_streamQueue._isDone) {
+ var controller = new StreamController<T>()..close();
+ return;
+ // TODO(lrn). Use the following when 1.11 is released.
+ // return new Stream<T>.empty();
+ }
+ if (_streamQueue._subscription == null) {
+ return _streamQueue._sourceStream;
+ }
+ var subscription = _streamQueue._dispose();
+ subscription.resume();
+ return new SubscriptionStream<T>(subscription);
+ }
+/// Request for a [StreamQueue.hasNext] call.
+/// Completes the [future] with `true` if it sees any event,
+/// but doesn't consume the event.
+/// If the request is closed without seeing an event, then
+/// the [future] is completed with `false`.
+class _HasNextRequest<T> implements _EventRequest {
+ final Completer _completer = new Completer<bool>();
+ Future<bool> get future => _completer.future;
+ bool addEvents(Queue<Result> events) {
+ if (events.isNotEmpty) {
+ _completer.complete(true);
+ return true;
+ }
+ return false;
+ }
+ void close(_) {
+ _completer.complete(false);
+ }
+/// Request for a [StreamQueue.fork] call.
+class _ForkRequest<T> implements _EventRequest {
+ /// Completer for the stream used by the queue by the `fork` call.
+ StreamCompleter _completer;
+ StreamQueue<T> queue;
+ /// The [StreamQueue] object that has this request queued.
+ final StreamQueue _streamQueue;
+ _ForkRequest(this._streamQueue) {
+ _completer = new StreamCompleter<T>();
+ queue = new StreamQueue<T>(;
+ }
+ bool addEvents(Queue<Result> events) {
+ _completeStream(events);
+ return true;
+ }
+ void close(Queue<Result> events) {
+ _completeStream(events);
+ }
+ void _completeStream(Queue<Result> events) {
+ if (events.isEmpty) {
+ if (_streamQueue._isDone) {
+ _completer.setEmpty();
+ } else {
+ _completer.setSourceStream(_streamQueue._sourceStream.fork());
+ }
+ } else {
+ // There are prefetched events which need to be added before the
+ // remaining stream.
+ var controller = new StreamController<T>();
+ for (var event in events) {
+ event.addTo(controller);
+ }
+ var fork = _streamQueue._sourceStream.fork();
+ controller.addStream(fork, cancelOnError: false)
+ .whenComplete(controller.close);
+ _completer.setSourceStream(;
+ }
+ }

Powered by Google App Engine
This is Rietveld 408576698