| Index: lib/src/util/stream_queue.dart
|
| diff --git a/lib/src/util/stream_queue.dart b/lib/src/util/stream_queue.dart
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..dc7c7840727481e5e4f6160db39981e3540918d6
|
| --- /dev/null
|
| +++ b/lib/src/util/stream_queue.dart
|
| @@ -0,0 +1,699 @@
|
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
|
| +// for details. All rights reserved. Use of this source code is governed by a
|
| +// BSD-style license that can be found in the LICENSE file.
|
| +
|
| +// TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
|
| +// lands.
|
| +library test.util.forkable_stream_queue;
|
| +
|
| +import 'dart:async';
|
| +import 'dart:collection';
|
| +
|
| +import "package:async/async.dart" hide ForkableStream, StreamQueue;
|
| +
|
| +import "forkable_stream.dart";
|
| +
|
| +/// An asynchronous pull-based interface for accessing stream events.
|
| +///
|
| +/// Wraps a stream and makes individual events available on request.
|
| +///
|
| +/// You can request (and reserve) one or more events from the stream,
|
| +/// and after all previous requests have been fulfilled, stream events
|
| +/// go towards fulfilling your request.
|
| +///
|
| +/// For example, if you ask for [next] two times, the returned futures
|
| +/// will be completed by the next two unrequested events from the stream.
|
| +///
|
| +/// The stream subscription is paused when there are no active
|
| +/// requests.
|
| +///
|
| +/// Some streams, including broadcast streams, will buffer
|
| +/// events while paused, so waiting too long between requests may
|
| +/// cause memory bloat somewhere else.
|
| +///
|
| +/// This is similar to, but more convenient than, a [StreamIterator].
|
| +/// A `StreamIterator` requires you to manually check when a new event is
|
| +/// available and you can only access the value of that event until you
|
| +/// check for the next one. A `StreamQueue` allows you to request, for example,
|
| +/// three events at a time, either individually, as a group using [take]
|
| +/// or [skip], or in any combination.
|
| +///
|
| +/// You can also ask to have the [rest] of the stream provided as
|
| +/// a new stream. This allows, for example, taking the first event
|
| +/// out of a stream and continuing to use the rest of the stream as a stream.
|
| +///
|
| +/// Example:
|
| +///
|
| +/// var events = new StreamQueue<String>(someStreamOfLines);
|
| +/// var first = await events.next;
|
| +/// while (first.startsWith('#')) {
|
| +/// // Skip comments.
|
| +/// first = await events.next;
|
| +/// }
|
| +///
|
| +/// if (first.startsWith(MAGIC_MARKER)) {
|
| +/// var headerCount =
|
| +/// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
|
| +/// handleMessage(headers: await events.take(headerCount),
|
| +/// body: events.rest);
|
| +/// return;
|
| +/// }
|
| +/// // Error handling.
|
| +///
|
| +/// When you need no further events the `StreamQueue` should be closed
|
| +/// using [cancel]. This releases the underlying stream subscription.
|
| +class StreamQueue<T> {
|
| + // This class maintains two queues: one of events and one of requests.
|
| + // The active request (the one in front of the queue) is called with
|
| + // the current event queue when it becomes active.
|
| + //
|
| + // If the request returns true, it's complete and will be removed from the
|
| + // request queue.
|
| + // If the request returns false, it needs more events, and will be called
|
| + // again when new events are available.
|
| + // The request can remove events that it uses, or keep them in the event
|
| + // queue until it has all that it needs.
|
| + //
|
| + // This model is very flexible and easily extensible.
|
| + // It allows requests that don't consume events (like [hasNext]) or
|
| + // potentially a request that takes either five or zero events, determined
|
| + // by the content of the fifth event.
|
| +
|
| + /// Source of events.
|
| + final ForkableStream _sourceStream;
|
| +
|
| + /// Subscription on [_sourceStream] while listening for events.
|
| + ///
|
| + /// Set to subscription when listening, and set to `null` when the
|
| + /// subscription is done (and [_isDone] is set to true).
|
| + StreamSubscription _subscription;
|
| +
|
| + /// Whether we have listened on [_sourceStream] and the subscription is done.
|
| + bool _isDone = false;
|
| +
|
| + /// Whether a closing operation has been performed on the stream queue.
|
| + ///
|
| + /// Closing operations are [cancel] and [rest].
|
| + bool _isClosed = false;
|
| +
|
| + /// Queue of events not used by a request yet.
|
| + final Queue<Result> _eventQueue = new Queue();
|
| +
|
| + /// Queue of pending requests.
|
| + ///
|
| + /// Access through methods below to ensure consistency.
|
| + final Queue<_EventRequest> _requestQueue = new Queue();
|
| +
|
| + /// Create a `StreamQueue` of the events of [source].
|
| + StreamQueue(Stream source)
|
| + : _sourceStream = source is ForkableStream
|
| + ? source
|
| + : new ForkableStream(source);
|
| +
|
| + /// Asks if the stream has any more events.
|
| + ///
|
| + /// Returns a future that completes with `true` if the stream has any
|
| + /// more events, whether data or error.
|
| + /// If the stream closes without producing any more events, the returned
|
| + /// future completes with `false`.
|
| + ///
|
| + /// Can be used before using [next] to avoid getting an error in the
|
| + /// future returned by `next` in the case where there are no more events.
|
| + Future<bool> get hasNext {
|
| + if (!_isClosed) {
|
| + var hasNextRequest = new _HasNextRequest();
|
| + _addRequest(hasNextRequest);
|
| + return hasNextRequest.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Requests the next (yet unrequested) event from the stream.
|
| + ///
|
| + /// When the requested event arrives, the returned future is completed with
|
| + /// the event.
|
| + /// If the event is a data event, the returned future completes
|
| + /// with its value.
|
| + /// If the event is an error event, the returned future completes with
|
| + /// its error and stack trace.
|
| + /// If the stream closes before an event arrives, the returned future
|
| + /// completes with a [StateError].
|
| + ///
|
| + /// It's possible to have several pending [next] calls (or other requests),
|
| + /// and they will be completed in the order they were requested, by the
|
| + /// first events that were not consumed by previous requeusts.
|
| + Future<T> get next {
|
| + if (!_isClosed) {
|
| + var nextRequest = new _NextRequest<T>();
|
| + _addRequest(nextRequest);
|
| + return nextRequest.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Returns a stream of all the remaning events of the source stream.
|
| + ///
|
| + /// All requested [next], [skip] or [take] operations are completed
|
| + /// first, and then any remaining events are provided as events of
|
| + /// the returned stream.
|
| + ///
|
| + /// Using `rest` closes this stream queue. After getting the
|
| + /// `rest` the caller may no longer request other events, like
|
| + /// after calling [cancel].
|
| + Stream<T> get rest {
|
| + if (_isClosed) {
|
| + throw _failClosed();
|
| + }
|
| + var request = new _RestRequest<T>(this);
|
| + _isClosed = true;
|
| + _addRequest(request);
|
| + return request.stream;
|
| + }
|
| +
|
| + /// Skips the next [count] *data* events.
|
| + ///
|
| + /// The [count] must be non-negative.
|
| + ///
|
| + /// When successful, this is equivalent to using [take]
|
| + /// and ignoring the result.
|
| + ///
|
| + /// If an error occurs before `count` data events have been skipped,
|
| + /// the returned future completes with that error instead.
|
| + ///
|
| + /// If the stream closes before `count` data events,
|
| + /// the remaining unskipped event count is returned.
|
| + /// If the returned future completes with the integer `0`,
|
| + /// then all events were succssfully skipped. If the value
|
| + /// is greater than zero then the stream ended early.
|
| + Future<int> skip(int count) {
|
| + if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + if (!_isClosed) {
|
| + var request = new _SkipRequest(count);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Requests the next [count] data events as a list.
|
| + ///
|
| + /// The [count] must be non-negative.
|
| + ///
|
| + /// Equivalent to calling [next] `count` times and
|
| + /// storing the data values in a list.
|
| + ///
|
| + /// If an error occurs before `count` data events has
|
| + /// been collected, the returned future completes with
|
| + /// that error instead.
|
| + ///
|
| + /// If the stream closes before `count` data events,
|
| + /// the returned future completes with the list
|
| + /// of data collected so far. That is, the returned
|
| + /// list may have fewer than [count] elements.
|
| + Future<List<T>> take(int count) {
|
| + if (count < 0) throw new RangeError.range(count, 0, null, "count");
|
| + if (!_isClosed) {
|
| + var request = new _TakeRequest<T>(count);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| + throw _failClosed();
|
| + }
|
| +
|
| + /// Creates a new stream queue in the same position as this one.
|
| + ///
|
| + /// The fork is subscribed to the same underlying stream as this queue, but
|
| + /// it's otherwise wholly independent. If requests are made on one, they don't
|
| + /// move the other forward; if one is closed, the other is still open.
|
| + ///
|
| + /// The underlying stream will only be paused when all forks have no
|
| + /// outstanding requests, and only canceled when all forks are canceled.
|
| + StreamQueue<T> fork() {
|
| + if (_isClosed) throw _failClosed();
|
| +
|
| + var request = new _ForkRequest<T>(this);
|
| + _addRequest(request);
|
| + return request.queue;
|
| + }
|
| +
|
| + /// Cancels the underlying stream subscription.
|
| + ///
|
| + /// If [immediate] is `false` (the default), the cancel operation waits until
|
| + /// all previously requested events have been processed, then it cancels the
|
| + /// subscription providing the events.
|
| + ///
|
| + /// If [immediate] is `true`, the subscription is instead canceled
|
| + /// immediately. Any pending events complete with a 'closed'-event, as though
|
| + /// the stream had closed by itself.
|
| + ///
|
| + /// The returned future completes with the result of calling
|
| + /// `cancel`.
|
| + ///
|
| + /// After calling `cancel`, no further events can be requested.
|
| + /// None of [next], [rest], [skip], [take] or [cancel] may be
|
| + /// called again.
|
| + Future cancel({bool immediate: false}) {
|
| + if (_isClosed) throw _failClosed();
|
| + _isClosed = true;
|
| +
|
| + if (_isDone) return new Future.value();
|
| + if (_subscription == null) _subscription = _sourceStream.listen(null);
|
| +
|
| + if (!immediate) {
|
| + var request = new _CancelRequest(this);
|
| + _addRequest(request);
|
| + return request.future;
|
| + }
|
| +
|
| + var future = _subscription.cancel();
|
| + _onDone();
|
| + return future;
|
| + }
|
| +
|
| + /// Returns an error for when a request is made after cancel.
|
| + ///
|
| + /// Returns a [StateError] with a message saying that either
|
| + /// [cancel] or [rest] have already been called.
|
| + Error _failClosed() {
|
| + return new StateError("Already cancelled");
|
| + }
|
| +
|
| + // Callbacks receiving the events of the source stream.
|
| +
|
| + void _onData(T data) {
|
| + _eventQueue.add(new Result.value(data));
|
| + _checkQueues();
|
| + }
|
| +
|
| + void _onError(error, StackTrace stack) {
|
| + _eventQueue.add(new Result.error(error, stack));
|
| + _checkQueues();
|
| + }
|
| +
|
| + void _onDone() {
|
| + _subscription = null;
|
| + _isDone = true;
|
| + _closeAllRequests();
|
| + }
|
| +
|
| + // Request queue management.
|
| +
|
| + /// Adds a new request to the queue.
|
| + void _addRequest(_EventRequest request) {
|
| + if (_isDone) {
|
| + assert(_requestQueue.isEmpty);
|
| + if (!request.addEvents(_eventQueue)) {
|
| + request.close(_eventQueue);
|
| + }
|
| + return;
|
| + }
|
| + if (_requestQueue.isEmpty) {
|
| + if (request.addEvents(_eventQueue)) return;
|
| + _ensureListening();
|
| + }
|
| + _requestQueue.add(request);
|
| + }
|
| +
|
| + /// Ensures that we are listening on events from [_sourceStream].
|
| + ///
|
| + /// Resumes subscription on [_sourceStream], or creates it if necessary.
|
| + void _ensureListening() {
|
| + assert(!_isDone);
|
| + if (_subscription == null) {
|
| + _subscription =
|
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
|
| + } else {
|
| + _subscription.resume();
|
| + }
|
| + }
|
| +
|
| + /// Removes all requests and closes them.
|
| + ///
|
| + /// Used when the source stream is done.
|
| + /// After this, no further requests will be added to the queue,
|
| + /// requests are immediately served entirely by events already in the event
|
| + /// queue, if any.
|
| + void _closeAllRequests() {
|
| + assert(_isDone);
|
| + while (_requestQueue.isNotEmpty) {
|
| + var request = _requestQueue.removeFirst();
|
| + if (!request.addEvents(_eventQueue)) {
|
| + request.close(_eventQueue);
|
| + }
|
| + }
|
| + }
|
| +
|
| + /// Matches events with requests.
|
| + ///
|
| + /// Called after receiving an event.
|
| + void _checkQueues() {
|
| + while (_requestQueue.isNotEmpty) {
|
| + if (_requestQueue.first.addEvents(_eventQueue)) {
|
| + _requestQueue.removeFirst();
|
| + } else {
|
| + return;
|
| + }
|
| + }
|
| +
|
| + if (!_isDone) {
|
| + _subscription.pause();
|
| + }
|
| + }
|
| +
|
| + /// Extracts the subscription and makes this stream queue unusable.
|
| + ///
|
| + /// Can only be used by the very last request.
|
| + StreamSubscription _dispose() {
|
| + assert(_isClosed);
|
| + var subscription = _subscription;
|
| + _subscription = null;
|
| + _isDone = true;
|
| + return subscription;
|
| + }
|
| +}
|
| +
|
| +/// Request object that receives events when they arrive, until fulfilled.
|
| +///
|
| +/// Each request that cannot be fulfilled immediately is represented by
|
| +/// an `_EventRequest` object in the request queue.
|
| +///
|
| +/// Events from the source stream are sent to the first request in the
|
| +/// queue until it reports itself as [isComplete].
|
| +///
|
| +/// When the first request in the queue `isComplete`, either when becoming
|
| +/// the first request or after receiving an event, its [close] methods is
|
| +/// called.
|
| +///
|
| +/// The [close] method is also called immediately when the source stream
|
| +/// is done.
|
| +abstract class _EventRequest {
|
| + /// Handle available events.
|
| + ///
|
| + /// The available events are provided as a queue. The `addEvents` function
|
| + /// should only remove events from the front of the event queue, e.g.,
|
| + /// using [removeFirst].
|
| + ///
|
| + /// Returns `true` if the request is completed, or `false` if it needs
|
| + /// more events.
|
| + /// The call may keep events in the queue until the requeust is complete,
|
| + /// or it may remove them immediately.
|
| + ///
|
| + /// If the method returns true, the request is considered fulfilled, and
|
| + /// will never be called again.
|
| + ///
|
| + /// This method is called when a request reaches the front of the request
|
| + /// queue, and if it returns `false`, it's called again every time a new event
|
| + /// becomes available, or when the stream closes.
|
| + bool addEvents(Queue<Result> events);
|
| +
|
| + /// Complete the request.
|
| + ///
|
| + /// This is called when the source stream is done before the request
|
| + /// had a chance to receive all its events. That is, after a call
|
| + /// to [addEvents] has returned `false`.
|
| + /// If there are any unused events available, they are in the [events] queue.
|
| + /// No further events will become available.
|
| + ///
|
| + /// The queue should only remove events from the front of the event queue,
|
| + /// e.g., using [removeFirst].
|
| + ///
|
| + /// If the request kept events in the queue after an [addEvents] call,
|
| + /// this is the last chance to use them.
|
| + void close(Queue<Result> events);
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.next] call.
|
| +///
|
| +/// Completes the returned future when receiving the first event,
|
| +/// and is then complete.
|
| +class _NextRequest<T> implements _EventRequest {
|
| + /// Completer for the future returned by [StreamQueue.next].
|
| + final Completer _completer;
|
| +
|
| + _NextRequest() : _completer = new Completer<T>();
|
| +
|
| + Future<T> get future => _completer.future;
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + if (events.isEmpty) return false;
|
| + events.removeFirst().complete(_completer);
|
| + return true;
|
| + }
|
| +
|
| + void close(Queue<Result> events) {
|
| + var errorFuture =
|
| + new Future.sync(() => throw new StateError("No elements"));
|
| + _completer.complete(errorFuture);
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.skip] call.
|
| +class _SkipRequest implements _EventRequest {
|
| + /// Completer for the future returned by the skip call.
|
| + final Completer _completer = new Completer<int>();
|
| +
|
| + /// Number of remaining events to skip.
|
| + ///
|
| + /// The request [isComplete] when the values reaches zero.
|
| + ///
|
| + /// Decremented when an event is seen.
|
| + /// Set to zero when an error is seen since errors abort the skip request.
|
| + int _eventsToSkip;
|
| +
|
| + _SkipRequest(this._eventsToSkip);
|
| +
|
| + /// The future completed when the correct number of events have been skipped.
|
| + Future get future => _completer.future;
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + while (_eventsToSkip > 0) {
|
| + if (events.isEmpty) return false;
|
| + _eventsToSkip--;
|
| + var event = events.removeFirst();
|
| + if (event.isError) {
|
| + event.complete(_completer);
|
| + return true;
|
| + }
|
| + }
|
| + _completer.complete(0);
|
| + return true;
|
| + }
|
| +
|
| + void close(Queue<Result> events) {
|
| + _completer.complete(_eventsToSkip);
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.take] call.
|
| +class _TakeRequest<T> implements _EventRequest {
|
| + /// Completer for the future returned by the take call.
|
| + final Completer _completer;
|
| +
|
| + /// List collecting events until enough have been seen.
|
| + final List _list = <T>[];
|
| +
|
| + /// Number of events to capture.
|
| + ///
|
| + /// The request [isComplete] when the length of [_list] reaches
|
| + /// this value.
|
| + final int _eventsToTake;
|
| +
|
| + _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
|
| +
|
| + /// The future completed when the correct number of events have been captured.
|
| + Future get future => _completer.future;
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + while (_list.length < _eventsToTake) {
|
| + if (events.isEmpty) return false;
|
| + var result = events.removeFirst();
|
| + if (result.isError) {
|
| + result.complete(_completer);
|
| + return true;
|
| + }
|
| + _list.add(result.asValue.value);
|
| + }
|
| + _completer.complete(_list);
|
| + return true;
|
| + }
|
| +
|
| + void close(Queue<Result> events) {
|
| + _completer.complete(_list);
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.cancel] call.
|
| +///
|
| +/// The request needs no events, it just waits in the request queue
|
| +/// until all previous events are fulfilled, then it cancels the stream queue
|
| +/// source subscription.
|
| +class _CancelRequest implements _EventRequest {
|
| + /// Completer for the future returned by the `cancel` call.
|
| + final Completer _completer = new Completer();
|
| +
|
| + /// The [StreamQueue] object that has this request queued.
|
| + ///
|
| + /// When the event is completed, it needs to cancel the active subscription
|
| + /// of the `StreamQueue` object, if any.
|
| + final StreamQueue _streamQueue;
|
| +
|
| + _CancelRequest(this._streamQueue);
|
| +
|
| + /// The future completed when the cancel request is completed.
|
| + Future get future => _completer.future;
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + _shutdown();
|
| + return true;
|
| + }
|
| +
|
| + void close(_) {
|
| + _shutdown();
|
| + }
|
| +
|
| + void _shutdown() {
|
| + if (_streamQueue._isDone) {
|
| + _completer.complete();
|
| + } else {
|
| + _streamQueue._ensureListening();
|
| + _completer.complete(_streamQueue._dispose().cancel());
|
| + }
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.rest] call.
|
| +///
|
| +/// The request is always complete, it just waits in the request queue
|
| +/// until all previous events are fulfilled, then it takes over the
|
| +/// stream events subscription and creates a stream from it.
|
| +class _RestRequest<T> implements _EventRequest {
|
| + /// Completer for the stream returned by the `rest` call.
|
| + final StreamCompleter _completer = new StreamCompleter<T>();
|
| +
|
| + /// The [StreamQueue] object that has this request queued.
|
| + ///
|
| + /// When the event is completed, it needs to cancel the active subscription
|
| + /// of the `StreamQueue` object, if any.
|
| + final StreamQueue _streamQueue;
|
| +
|
| + _RestRequest(this._streamQueue);
|
| +
|
| + /// The stream which will contain the remaining events of [_streamQueue].
|
| + Stream<T> get stream => _completer.stream;
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + _completeStream(events);
|
| + return true;
|
| + }
|
| +
|
| + void close(Queue<Result> events) {
|
| + _completeStream(events);
|
| + }
|
| +
|
| + void _completeStream(Queue<Result> events) {
|
| + if (events.isEmpty) {
|
| + if (_streamQueue._isDone) {
|
| + _completer.setEmpty();
|
| + } else {
|
| + _completer.setSourceStream(_getRestStream());
|
| + }
|
| + } else {
|
| + // There are prefetched events which needs to be added before the
|
| + // remaining stream.
|
| + var controller = new StreamController<T>();
|
| + for (var event in events) {
|
| + event.addTo(controller);
|
| + }
|
| + controller.addStream(_getRestStream(), cancelOnError: false)
|
| + .whenComplete(controller.close);
|
| + _completer.setSourceStream(controller.stream);
|
| + }
|
| + }
|
| +
|
| + /// Create a stream from the rest of [_streamQueue]'s subscription.
|
| + Stream _getRestStream() {
|
| + if (_streamQueue._isDone) {
|
| + var controller = new StreamController<T>()..close();
|
| + return controller.stream;
|
| + // TODO(lrn). Use the following when 1.11 is released.
|
| + // return new Stream<T>.empty();
|
| + }
|
| + if (_streamQueue._subscription == null) {
|
| + return _streamQueue._sourceStream;
|
| + }
|
| + var subscription = _streamQueue._dispose();
|
| + subscription.resume();
|
| + return new SubscriptionStream<T>(subscription);
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.hasNext] call.
|
| +///
|
| +/// Completes the [future] with `true` if it sees any event,
|
| +/// but doesn't consume the event.
|
| +/// If the request is closed without seeing an event, then
|
| +/// the [future] is completed with `false`.
|
| +class _HasNextRequest<T> implements _EventRequest {
|
| + final Completer _completer = new Completer<bool>();
|
| +
|
| + Future<bool> get future => _completer.future;
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + if (events.isNotEmpty) {
|
| + _completer.complete(true);
|
| + return true;
|
| + }
|
| + return false;
|
| + }
|
| +
|
| + void close(_) {
|
| + _completer.complete(false);
|
| + }
|
| +}
|
| +
|
| +/// Request for a [StreamQueue.fork] call.
|
| +class _ForkRequest<T> implements _EventRequest {
|
| + /// Completer for the stream used by the queue by the `fork` call.
|
| + StreamCompleter _completer;
|
| +
|
| + StreamQueue<T> queue;
|
| +
|
| + /// The [StreamQueue] object that has this request queued.
|
| + final StreamQueue _streamQueue;
|
| +
|
| + _ForkRequest(this._streamQueue) {
|
| + _completer = new StreamCompleter<T>();
|
| + queue = new StreamQueue<T>(_completer.stream);
|
| + }
|
| +
|
| + bool addEvents(Queue<Result> events) {
|
| + _completeStream(events);
|
| + return true;
|
| + }
|
| +
|
| + void close(Queue<Result> events) {
|
| + _completeStream(events);
|
| + }
|
| +
|
| + void _completeStream(Queue<Result> events) {
|
| + if (events.isEmpty) {
|
| + if (_streamQueue._isDone) {
|
| + _completer.setEmpty();
|
| + } else {
|
| + _completer.setSourceStream(_streamQueue._sourceStream.fork());
|
| + }
|
| + } else {
|
| + // There are prefetched events which need to be added before the
|
| + // remaining stream.
|
| + var controller = new StreamController<T>();
|
| + for (var event in events) {
|
| + event.addTo(controller);
|
| + }
|
| +
|
| + var fork = _streamQueue._sourceStream.fork();
|
| + controller.addStream(fork, cancelOnError: false)
|
| + .whenComplete(controller.close);
|
| + _completer.setSourceStream(controller.stream);
|
| + }
|
| + }
|
| +}
|
|
|