| Index: lib/src/stream_queue.dart
|
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
|
| index 36d03ef13a88d97f07952f6d061dba09017a2e69..09b3a75b2360a9adf2a18ad2715fe2fef08cc656 100644
|
| --- a/lib/src/stream_queue.dart
|
| +++ b/lib/src/stream_queue.dart
|
| @@ -60,15 +60,17 @@ import "../result.dart";
|
| ///
|
| /// When you need no further events the `StreamQueue` should be closed
|
| /// using [cancel]. This releases the underlying stream subscription.
|
| -class StreamQueue<T> {
|
| +abstract class StreamQueue<T> {
|
| // This class maintains two queues: one of events and one of requests.
|
| // The active request (the one in front of the queue) is called with
|
| - // the current event queue when it becomes active.
|
| + // the current event queue when it becomes active, every time a
|
| + // new event arrives, and when the event source closes.
|
| //
|
| - // If the request returns true, it's complete and will be removed from the
|
| + // If the request returns `true`, it's complete and will be removed from the
|
| // request queue.
|
| - // If the request returns false, it needs more events, and will be called
|
| - // again when new events are available.
|
| + // If the request returns `false`, it needs more events, and will be called
|
| + // again when new events are available. It may trigger a call itself by
|
| + // calling [_updateRequests].
|
| // The request can remove events that it uses, or keep them in the event
|
| // queue until it has all that it needs.
|
| //
|
| @@ -77,16 +79,7 @@ class StreamQueue<T> {
|
| // potentially a request that takes either five or zero events, determined
|
| // by the content of the fifth event.
|
|
|
| - /// Source of events.
|
| - final Stream _sourceStream;
|
| -
|
| - /// Subscription on [_sourceStream] while listening for events.
|
| - ///
|
| - /// Set to subscription when listening, and set to `null` when the
|
| - /// subscription is done (and [_isDone] is set to true).
|
| - StreamSubscription _subscription;
|
| -
|
| - /// Whether we have listened on [_sourceStream] and the subscription is done.
|
| + /// Whether the event source is done.
|
| bool _isDone = false;
|
|
|
| /// Whether a closing operation has been performed on the stream queue.
|
| @@ -103,8 +96,9 @@ class StreamQueue<T> {
|
| final Queue<_EventRequest> _requestQueue = new Queue();
|
|
|
| /// Create a `StreamQueue` of the events of [source].
|
| - StreamQueue(Stream source)
|
| - : _sourceStream = source;
|
| + factory StreamQueue(Stream source) = _StreamQueue<T>;
|
| +
|
| + StreamQueue._();
|
|
|
| /// Asks if the stream has any more events.
|
| ///
|
| @@ -115,6 +109,8 @@ class StreamQueue<T> {
|
| ///
|
| /// Can be used before using [next] to avoid getting an error in the
|
| /// future returned by `next` in the case where there are no more events.
|
| + /// Another alternative is to use `take(1)` which returns either zero or
|
| + /// one events.
|
| Future<bool> get hasNext {
|
| if (!_isClosed) {
|
| var hasNextRequest = new _HasNextRequest();
|
| @@ -216,15 +212,15 @@ class StreamQueue<T> {
|
| throw _failClosed();
|
| }
|
|
|
| - /// Cancels the underlying stream subscription.
|
| + /// Cancels the underlying event source.
|
| ///
|
| /// If [immediate] is `false` (the default), the cancel operation waits until
|
| /// all previously requested events have been processed, then it cancels the
|
| /// subscription providing the events.
|
| ///
|
| - /// If [immediate] is `true`, the subscription is instead canceled
|
| - /// immediately. Any pending events complete with a 'closed'-event, as though
|
| - /// the stream had closed by itself.
|
| + /// If [immediate] is `true`, the source is instead canceled
|
| + /// immediately. Any pending events are completed as though the underlying
|
| + /// stream had closed.
|
| ///
|
| /// The returned future completes with the result of calling
|
| /// `cancel`.
|
| @@ -242,114 +238,178 @@ class StreamQueue<T> {
|
| return request.future;
|
| }
|
|
|
| - if (_isDone) return new Future.value();
|
| - if (_subscription == null) _subscription = _sourceStream.listen(null);
|
| - var future = _subscription.cancel();
|
| - _onDone();
|
| - return future;
|
| + if (_isDone && _eventQueue.isEmpty) return new Future.value();
|
| + return _cancel();
|
| }
|
|
|
| - /// Returns an error for when a request is made after cancel.
|
| + // ------------------------------------------------------------------
|
| + // Methods that may be called from the request implementations to
|
| + // control the even stream.
|
| +
|
| + /// Matches events with requests.
|
| ///
|
| - /// Returns a [StateError] with a message saying that either
|
| - /// [cancel] or [rest] have already been called.
|
| - Error _failClosed() {
|
| - return new StateError("Already cancelled");
|
| + /// Called after receiving an event or when the event source closes.
|
| + ///
|
| + /// May be called by requests which have returned `false` (saying they
|
| + /// are not yet done) so they can be checked again before any new
|
| + /// events arrive.
|
| + /// Any request returing `false` from `update` when `isDone` is `true`
|
| + /// *must* call `_updateRequests` when they are ready to continue
|
| + /// (since no further events will trigger the call).
|
| + void _updateRequests() {
|
| + while (_requestQueue.isNotEmpty) {
|
| + if (_requestQueue.first.update(_eventQueue, _isDone)) {
|
| + _requestQueue.removeFirst();
|
| + } else {
|
| + return;
|
| + }
|
| + }
|
| +
|
| + if (!_isDone) {
|
| + _pause();
|
| + }
|
| }
|
|
|
| - // Callbacks receiving the events of the source stream.
|
| + /// Extracts a stream from the event source and makes this stream queue
|
| + /// unusable.
|
| + ///
|
| + /// Can only be used by the very last request (the stream queue must
|
| + /// be closed by that request).
|
| + /// Only used by [rest].
|
| + Stream _extractStream();
|
|
|
| - void _onData(T data) {
|
| - _eventQueue.add(new Result.value(data));
|
| - _checkQueues();
|
| - }
|
| + /// Requests that the event source pauses events.
|
| + ///
|
| + /// This is called automatically when the request queue is empty.
|
| + ///
|
| + /// The event source is restarted by the next call to [_ensureListening].
|
| + void _pause();
|
|
|
| - void _onError(error, StackTrace stack) {
|
| - _eventQueue.add(new Result.error(error, stack));
|
| - _checkQueues();
|
| + /// Ensures that we are listening on events from the event source.
|
| + ///
|
| + /// Starts listening for the first time or resumes after a [_pause].
|
| + ///
|
| + /// Is called automatically if a request requires more events.
|
| + void _ensureListening();
|
| +
|
| + /// Cancels the underlying event source.
|
| + Future _cancel();
|
| +
|
| + // ------------------------------------------------------------------
|
| + // Methods called by the event source to add events or say that it's
|
| + // done.
|
| +
|
| + /// Called when the event source adds a new data or error event.
|
| + /// Always calls [_updateRequests] after adding.
|
| + void _addResult(Result result) {
|
| + _eventQueue.add(result);
|
| + _updateRequests();
|
| }
|
|
|
| - void _onDone() {
|
| - _subscription = null;
|
| + /// Called when the event source is done.
|
| + /// Always calls [_updateRequests] after adding.
|
| + void _close() {
|
| _isDone = true;
|
| - _closeAllRequests();
|
| + _updateRequests();
|
| }
|
|
|
| - // Request queue management.
|
| + // ------------------------------------------------------------------
|
| + // Internal helper methods.
|
| +
|
| + /// Returns an error for when a request is made after cancel.
|
| + ///
|
| + /// Returns a [StateError] with a message saying that either
|
| + /// [cancel] or [rest] have already been called.
|
| + Error _failClosed() {
|
| + return new StateError("Already cancelled");
|
| + }
|
|
|
| /// Adds a new request to the queue.
|
| + ///
|
| + /// If the request queue is empty and the request can be completed
|
| + /// immediately, it skips the queue.
|
| void _addRequest(_EventRequest request) {
|
| - if (_isDone) {
|
| - assert(_requestQueue.isEmpty);
|
| - if (!request.addEvents(_eventQueue)) {
|
| - request.close(_eventQueue);
|
| - }
|
| - return;
|
| - }
|
| if (_requestQueue.isEmpty) {
|
| - if (request.addEvents(_eventQueue)) return;
|
| + if (request.update(_eventQueue, _isDone)) return;
|
| _ensureListening();
|
| }
|
| _requestQueue.add(request);
|
| }
|
| +}
|
| +
|
|
|
| - /// Ensures that we are listening on events from [_sourceStream].
|
| +/// The default implementation of [StreamQueue].
|
| +///
|
| +/// This queue gets its events from a stream which is listened
|
| +/// to when a request needs events.
|
| +class _StreamQueue<T> extends StreamQueue<T> {
|
| + /// Source of events.
|
| + final Stream _sourceStream;
|
| +
|
| + /// Subscription on [_sourceStream] while listening for events.
|
| ///
|
| - /// Resumes subscription on [_sourceStream], or creates it if necessary.
|
| + /// Set to subscription when listening, and set to `null` when the
|
| + /// subscription is done (and [_isDone] is set to true).
|
| + StreamSubscription _subscription;
|
| +
|
| + _StreamQueue(this._sourceStream) : super._();
|
| +
|
| + Future _cancel() {
|
| + if (_isDone) return null;
|
| + if (_subscription == null) _subscription = _sourceStream.listen(null);
|
| + var future = _subscription.cancel();
|
| + _close();
|
| + return future;
|
| + }
|
| +
|
| void _ensureListening() {
|
| assert(!_isDone);
|
| if (_subscription == null) {
|
| _subscription =
|
| - _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
|
| + _sourceStream.listen(
|
| + (data) {
|
| + _addResult(new Result.value(data));
|
| + },
|
| + onError: (error, StackTrace stackTrace) {
|
| + _addResult(new Result.error(error, stackTrace));
|
| + },
|
| + onDone: () {
|
| + _subscription = null;
|
| + this._close();
|
| + });
|
| } else {
|
| _subscription.resume();
|
| }
|
| }
|
|
|
| - /// Removes all requests and closes them.
|
| - ///
|
| - /// Used when the source stream is done.
|
| - /// After this, no further requests will be added to the queue,
|
| - /// requests are immediately served entirely by events already in the event
|
| - /// queue, if any.
|
| - void _closeAllRequests() {
|
| - assert(_isDone);
|
| - while (_requestQueue.isNotEmpty) {
|
| - var request = _requestQueue.removeFirst();
|
| - if (!request.addEvents(_eventQueue)) {
|
| - request.close(_eventQueue);
|
| - }
|
| - }
|
| + void _pause() {
|
| + _subscription.pause();
|
| }
|
|
|
| - /// Matches events with requests.
|
| - ///
|
| - /// Called after receiving an event.
|
| - void _checkQueues() {
|
| - while (_requestQueue.isNotEmpty) {
|
| - if (_requestQueue.first.addEvents(_eventQueue)) {
|
| - _requestQueue.removeFirst();
|
| - } else {
|
| - return;
|
| - }
|
| + Stream<T> _extractStream() {
|
| + assert(_isClosed);
|
| + if (_isDone) {
|
| + return new Stream<T>.empty();
|
| }
|
| - if (!_isDone) {
|
| - _subscription.pause();
|
| +
|
| + if (_subscription == null) {
|
| + return _sourceStream;
|
| }
|
| - }
|
|
|
| - /// Extracts the subscription and makes this stream queue unusable.
|
| - ///
|
| - /// Can only be used by the very last request.
|
| - StreamSubscription _dispose() {
|
| - assert(_isClosed);
|
| var subscription = _subscription;
|
| _subscription = null;
|
| _isDone = true;
|
| - return subscription;
|
| +
|
| + var wasPaused = subscription.isPaused;
|
| + var result = new SubscriptionStream<T>(subscription);
|
| + // Resume after creating stream because that pauses the subscription too.
|
| + // This way there won't be a short resumption in the middle.
|
| + if (wasPaused) subscription.resume();
|
| + return result;
|
| }
|
| }
|
|
|
| +
|
| /// Request object that receives events when they arrive, until fulfilled.
|
| ///
|
| /// Each request that cannot be fulfilled immediately is represented by
|
| @@ -367,7 +427,7 @@ class StreamQueue<T> {
|
| abstract class _EventRequest {
|
| /// Handle available events.
|
| ///
|
| - /// The available events are provided as a queue. The `addEvents` function
|
| + /// The available events are provided as a queue. The `update` function
|
| /// should only remove events from the front of the event queue, e.g.,
|
| /// using [removeFirst].
|
| ///
|
| @@ -382,22 +442,10 @@ abstract class _EventRequest {
|
| /// This method is called when a request reaches the front of the request
|
| /// queue, and if it returns `false`, it's called again every time a new event
|
| /// becomes available, or when the stream closes.
|
| - bool addEvents(Queue<Result> events);
|
| -
|
| - /// Complete the request.
|
| - ///
|
| - /// This is called when the source stream is done before the request
|
| - /// had a chance to receive all its events. That is, after a call
|
| - /// to [addEvents] has returned `false`.
|
| - /// If there are any unused events available, they are in the [events] queue.
|
| - /// No further events will become available.
|
| - ///
|
| - /// The queue should only remove events from the front of the event queue,
|
| - /// e.g., using [removeFirst].
|
| - ///
|
| - /// If the request kept events in the queue after an [addEvents] call,
|
| - /// this is the last chance to use them.
|
| - void close(Queue<Result> events);
|
| + /// If the function returns `false` when the stream has already closed
|
| + /// ([isDone] is true), then the request must call
|
| + /// [StreamQueue._updateRequests] itself when it's ready to continue.
|
| + bool update(Queue<Result> events, bool isDone);
|
| }
|
|
|
| /// Request for a [StreamQueue.next] call.
|
| @@ -412,16 +460,18 @@ class _NextRequest<T> implements _EventRequest {
|
|
|
| Future<T> get future => _completer.future;
|
|
|
| - bool addEvents(Queue<Result> events) {
|
| - if (events.isEmpty) return false;
|
| - events.removeFirst().complete(_completer);
|
| - return true;
|
| - }
|
| -
|
| - void close(Queue<Result> events) {
|
| - var errorFuture =
|
| - new Future.sync(() => throw new StateError("No elements"));
|
| - _completer.complete(errorFuture);
|
| + bool update(Queue<Result> events, bool isDone) {
|
| + if (events.isNotEmpty) {
|
| + events.removeFirst().complete(_completer);
|
| + return true;
|
| + }
|
| + if (isDone) {
|
| + var errorFuture =
|
| + new Future.sync(() => throw new StateError("No elements"));
|
| + _completer.complete(errorFuture);
|
| + return true;
|
| + }
|
| + return false;
|
| }
|
| }
|
|
|
| @@ -443,22 +493,22 @@ class _SkipRequest implements _EventRequest {
|
| /// The future completed when the correct number of events have been skipped.
|
| Future get future => _completer.future;
|
|
|
| - bool addEvents(Queue<Result> events) {
|
| + bool update(Queue<Result> events, bool isDone) {
|
| while (_eventsToSkip > 0) {
|
| - if (events.isEmpty) return false;
|
| + if (events.isEmpty) {
|
| + if (isDone) break;
|
| + return false;
|
| + }
|
| _eventsToSkip--;
|
| +
|
| var event = events.removeFirst();
|
| if (event.isError) {
|
| event.complete(_completer);
|
| return true;
|
| }
|
| }
|
| - _completer.complete(0);
|
| - return true;
|
| - }
|
| -
|
| - void close(Queue<Result> events) {
|
| _completer.complete(_eventsToSkip);
|
| + return true;
|
| }
|
| }
|
|
|
| @@ -481,9 +531,13 @@ class _TakeRequest<T> implements _EventRequest {
|
| /// The future completed when the correct number of events have been captured.
|
| Future get future => _completer.future;
|
|
|
| - bool addEvents(Queue<Result> events) {
|
| + bool update(Queue<Result> events, bool isDone) {
|
| while (_list.length < _eventsToTake) {
|
| - if (events.isEmpty) return false;
|
| + if (events.isEmpty) {
|
| + if (isDone) break;
|
| + return false;
|
| + }
|
| +
|
| var result = events.removeFirst();
|
| if (result.isError) {
|
| result.complete(_completer);
|
| @@ -494,10 +548,6 @@ class _TakeRequest<T> implements _EventRequest {
|
| _completer.complete(_list);
|
| return true;
|
| }
|
| -
|
| - void close(Queue<Result> events) {
|
| - _completer.complete(_list);
|
| - }
|
| }
|
|
|
| /// Request for a [StreamQueue.cancel] call.
|
| @@ -520,22 +570,14 @@ class _CancelRequest implements _EventRequest {
|
| /// The future completed when the cancel request is completed.
|
| Future get future => _completer.future;
|
|
|
| - bool addEvents(Queue<Result> events) {
|
| - _shutdown();
|
| - return true;
|
| - }
|
| -
|
| - void close(_) {
|
| - _shutdown();
|
| - }
|
| -
|
| - void _shutdown() {
|
| + bool update(Queue<Result> events, bool isDone) {
|
| if (_streamQueue._isDone) {
|
| _completer.complete();
|
| } else {
|
| _streamQueue._ensureListening();
|
| - _completer.complete(_streamQueue._dispose().cancel());
|
| + _completer.complete(_streamQueue._extractStream().listen(null).cancel());
|
| }
|
| + return true;
|
| }
|
| }
|
|
|
| @@ -559,21 +601,12 @@ class _RestRequest<T> implements _EventRequest {
|
| /// The stream which will contain the remaining events of [_streamQueue].
|
| Stream<T> get stream => _completer.stream;
|
|
|
| - bool addEvents(Queue<Result> events) {
|
| - _completeStream(events);
|
| - return true;
|
| - }
|
| -
|
| - void close(Queue<Result> events) {
|
| - _completeStream(events);
|
| - }
|
| -
|
| - void _completeStream(Queue<Result> events) {
|
| + bool update(Queue<Result> events, bool isDone) {
|
| if (events.isEmpty) {
|
| if (_streamQueue._isDone) {
|
| _completer.setEmpty();
|
| } else {
|
| - _completer.setSourceStream(_getRestStream());
|
| + _completer.setSourceStream(_streamQueue._extractStream());
|
| }
|
| } else {
|
| // There are prefetched events which needs to be added before the
|
| @@ -582,26 +615,11 @@ class _RestRequest<T> implements _EventRequest {
|
| for (var event in events) {
|
| event.addTo(controller);
|
| }
|
| - controller.addStream(_getRestStream(), cancelOnError: false)
|
| + controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
|
| .whenComplete(controller.close);
|
| _completer.setSourceStream(controller.stream);
|
| }
|
| - }
|
| -
|
| - /// Create a stream from the rest of [_streamQueue]'s subscription.
|
| - Stream _getRestStream() {
|
| - if (_streamQueue._isDone) {
|
| - var controller = new StreamController<T>()..close();
|
| - return controller.stream;
|
| - // TODO(lrn). Use the following when 1.11 is released.
|
| - // return new Stream<T>.empty();
|
| - }
|
| - if (_streamQueue._subscription == null) {
|
| - return _streamQueue._sourceStream;
|
| - }
|
| - var subscription = _streamQueue._dispose();
|
| - subscription.resume();
|
| - return new SubscriptionStream<T>(subscription);
|
| + return true;
|
| }
|
| }
|
|
|
| @@ -616,15 +634,15 @@ class _HasNextRequest<T> implements _EventRequest {
|
|
|
| Future<bool> get future => _completer.future;
|
|
|
| - bool addEvents(Queue<Result> events) {
|
| + bool update(Queue<Result> events, bool isDone) {
|
| if (events.isNotEmpty) {
|
| _completer.complete(true);
|
| return true;
|
| }
|
| + if (isDone) {
|
| + _completer.complete(false);
|
| + return true;
|
| + }
|
| return false;
|
| }
|
| -
|
| - void close(_) {
|
| - _completer.complete(false);
|
| - }
|
| }
|
|
|