Chromium Code Reviews| Index: lib/src/stream_queue.dart |
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
| index 7d78ac593c79181bb504897f8f3b611130a3b48b..63bc1a574e00dfdb2628a7c28b28caae46cea83a 100644 |
| --- a/lib/src/stream_queue.dart |
| +++ b/lib/src/stream_queue.dart |
| @@ -60,15 +60,17 @@ import "../result.dart"; |
| /// |
| /// When you need no further events the `StreamQueue` should be closed |
| /// using [cancel]. This releases the underlying stream subscription. |
| -class StreamQueue<T> { |
| +abstract class StreamQueue<T> { |
| // This class maintains two queues: one of events and one of requests. |
| // The active request (the one in front of the queue) is called with |
| - // the current event queue when it becomes active. |
| + // the current event queue when it becomes active, every time a |
| + // new event arrives, and when the event source closes. |
| // |
| - // If the request returns true, it's complete and will be removed from the |
| + // If the request returns `true`, it's complete and will be removed from the |
| // request queue. |
| - // If the request returns false, it needs more events, and will be called |
| - // again when new events are available. |
| + // If the request returns `false`, it needs more events, and will be called |
| + // again when new events are available. It may trigger a call itself by |
| + // calling [_updateRequests]. |
| // The request can remove events that it uses, or keep them in the event |
| // queue until it has all that it needs. |
| // |
| @@ -77,16 +79,7 @@ class StreamQueue<T> { |
| // potentially a request that takes either five or zero events, determined |
| // by the content of the fifth event. |
| - /// Source of events. |
| - final Stream _sourceStream; |
| - |
| - /// Subscription on [_sourceStream] while listening for events. |
| - /// |
| - /// Set to subscription when listening, and set to `null` when the |
| - /// subscription is done (and [_isDone] is set to true). |
| - StreamSubscription _subscription; |
| - |
| - /// Whether we have listened on [_sourceStream] and the subscription is done. |
| + /// Whether the event source is done. |
| bool _isDone = false; |
| /// Whether a closing operation has been performed on the stream queue. |
| @@ -103,8 +96,9 @@ class StreamQueue<T> { |
| final Queue<_EventRequest> _requestQueue = new Queue(); |
| /// Create a `StreamQueue` of the events of [source]. |
| - StreamQueue(Stream source) |
| - : _sourceStream = source; |
| + factory StreamQueue(Stream source) = _StreamQueue<T>; |
| + |
| + StreamQueue._(); |
| /// Asks if the stream has any more events. |
| /// |
| @@ -115,6 +109,8 @@ class StreamQueue<T> { |
| /// |
| /// Can be used before using [next] to avoid getting an error in the |
| /// future returned by `next` in the case where there are no more events. |
| + /// Another alternative is to use `take(1)` which returns either zero or |
| + /// one events. |
| Future<bool> get hasNext { |
| if (!_isClosed) { |
| var hasNextRequest = new _HasNextRequest(); |
| @@ -216,13 +212,13 @@ class StreamQueue<T> { |
| throw _failClosed(); |
| } |
| - /// Cancels the underlying stream subscription. |
| + /// Cancels the underlying event source. |
| /// |
| /// If [immediate] is `false` (the default), the cancel operation waits until |
| /// all previously requested events have been processed, then it cancels the |
| /// subscription providing the events. |
| /// |
| - /// If [immediate] is `true`, the subscription is instead canceled |
| + /// If [immediate] is `true`, the source is instead canceled |
| /// immediately. Any pending events are completed as though the underlying |
| /// stream had closed. |
| /// |
| @@ -242,114 +238,178 @@ class StreamQueue<T> { |
| return request.future; |
| } |
| - if (_isDone) return new Future.value(); |
| - if (_subscription == null) _subscription = _sourceStream.listen(null); |
| - var future = _subscription.cancel(); |
| - _onDone(); |
| - return future; |
| + if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
| + return _cancel(); |
| } |
| - /// Returns an error for when a request is made after cancel. |
| + // ------------------------------------------------------------------ |
| + // Methods that may be called from the request implementations to |
| + // control the even stream. |
| + |
| + /// Matches events with requests. |
| /// |
| - /// Returns a [StateError] with a message saying that either |
| - /// [cancel] or [rest] have already been called. |
| - Error _failClosed() { |
| - return new StateError("Already cancelled"); |
| + /// Called after receiving an event or when the event source closes. |
| + /// |
| + /// May be called by requests which have returned `false` (saying they |
| + /// are not yet done) so they can be checked again before any new |
| + /// events arrive. |
| + /// Any request returing `false` from `update` when `isDone` is `true` |
| + /// *must* call `_updateRequests` when they are ready to continue |
| + /// (since no further events will trigger the call). |
| + void _updateRequests() { |
| + while (_requestQueue.isNotEmpty) { |
| + if (_requestQueue.first.update(_eventQueue, _isDone)) { |
| + _requestQueue.removeFirst(); |
| + } else { |
| + return; |
| + } |
| + } |
| + |
| + if (!_isDone) { |
| + _pause(); |
| + } |
| } |
| - // Callbacks receiving the events of the source stream. |
| + /// Extracts a stream from the event source and makes this stream queue |
| + /// unusable. |
| + /// |
| + /// Can only be used by the very last request (the stream queue must |
| + /// be closed by that request). |
| + /// Only used by [rest]. |
| + Stream _extractStream(); |
| - void _onData(T data) { |
| - _eventQueue.add(new Result.value(data)); |
| - _checkQueues(); |
| - } |
| + /// Requests that the event source pauses events. |
| + /// |
| + /// This is called automatically when the request queue is empty. |
| + /// |
| + /// The event source is restarted by the next call to [_ensureListening]. |
| + void _pause(); |
| - void _onError(error, StackTrace stack) { |
| - _eventQueue.add(new Result.error(error, stack)); |
| - _checkQueues(); |
| + /// Ensures that we are listening on events from the event source. |
| + /// |
| + /// Starts listening for the first time or resumes after a [_pause]. |
| + /// |
| + /// Is called automatically if a request requires more events. |
| + void _ensureListening(); |
| + |
| + /// Cancels the underlying event source. |
| + Future _cancel(); |
| + |
| + // ------------------------------------------------------------------ |
| + // Methods called by the event source to add events or say that it's |
| + // done. |
| + |
| + /// Called when the event source adds a new data or error event. |
| + /// Always calls [_updateRequests] after adding. |
| + void _addResult(Result result) { |
| + _eventQueue.add(result); |
| + _updateRequests(); |
| } |
| - void _onDone() { |
| - _subscription = null; |
| + /// Called when the event source is done. |
| + /// Always calls [_updateRequests] after adding. |
| + void _close() { |
| _isDone = true; |
| - _closeAllRequests(); |
| + _updateRequests(); |
| } |
| - // Request queue management. |
| + // ------------------------------------------------------------------ |
| + // Internal helper methods. |
| + |
| + /// Returns an error for when a request is made after cancel. |
| + /// |
| + /// Returns a [StateError] with a message saying that either |
| + /// [cancel] or [rest] have already been called. |
| + Error _failClosed() { |
| + return new StateError("Already cancelled"); |
| + } |
| /// Adds a new request to the queue. |
| + /// |
| + /// If the request queue is empty and the request can be completed |
| + /// immediately, it skips the queue. |
| void _addRequest(_EventRequest request) { |
| - if (_isDone) { |
| - assert(_requestQueue.isEmpty); |
| - if (!request.addEvents(_eventQueue)) { |
| - request.close(_eventQueue); |
| - } |
| - return; |
| - } |
| if (_requestQueue.isEmpty) { |
| - if (request.addEvents(_eventQueue)) return; |
| + if (request.update(_eventQueue, _isDone)) return; |
| _ensureListening(); |
| } |
| _requestQueue.add(request); |
| } |
| +} |
| + |
| + |
| +/// The default implementation of [StreamQueue]. |
| +/// |
| +/// This queue gets its events from a stream which is listened |
| +/// to when a request needs events. |
| +class _StreamQueue<T> extends StreamQueue<T> { |
| + /// Source of events. |
| + final Stream _sourceStream; |
| - /// Ensures that we are listening on events from [_sourceStream]. |
| + /// Subscription on [_sourceStream] while listening for events. |
| /// |
| - /// Resumes subscription on [_sourceStream], or creates it if necessary. |
| + /// Set to subscription when listening, and set to `null` when the |
| + /// subscription is done (and [_isDone] is set to true). |
| + StreamSubscription _subscription; |
| + |
| + _StreamQueue(this._sourceStream) : super._(); |
| + |
| + Future _cancel() { |
| + if (_isDone) return null; |
| + if (_subscription == null) _subscription = _sourceStream.listen(null); |
| + var future = _subscription.cancel(); |
| + _close(); |
| + return future; |
| + } |
| + |
| void _ensureListening() { |
| assert(!_isDone); |
| if (_subscription == null) { |
| _subscription = |
| - _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| + _sourceStream.listen( |
| + (data) { |
| + _addResult(new Result.value(data)); |
| + }, |
| + onError: (error, StackTrace stackTrace) { |
| + _addResult(new Result.error(error, stackTrace)); |
| + }, |
| + onDone: () { |
| + _subscription = null; |
| + this._close(); |
| + }); |
| } else { |
| _subscription.resume(); |
| } |
| } |
| - /// Removes all requests and closes them. |
| - /// |
| - /// Used when the source stream is done. |
| - /// After this, no further requests will be added to the queue, |
| - /// requests are immediately served entirely by events already in the event |
| - /// queue, if any. |
| - void _closeAllRequests() { |
| - assert(_isDone); |
| - while (_requestQueue.isNotEmpty) { |
| - var request = _requestQueue.removeFirst(); |
| - if (!request.addEvents(_eventQueue)) { |
| - request.close(_eventQueue); |
| - } |
| - } |
| + void _pause() { |
| + _subscription.pause(); |
| } |
| - /// Matches events with requests. |
| - /// |
| - /// Called after receiving an event. |
| - void _checkQueues() { |
| - while (_requestQueue.isNotEmpty) { |
| - if (_requestQueue.first.addEvents(_eventQueue)) { |
| - _requestQueue.removeFirst(); |
| - } else { |
| - return; |
| - } |
| + Stream<T> _extractStream() { |
| + assert(_isClosed); |
| + if (_isDone) { |
| + return new Stream<T>.empty(); |
| } |
| - if (!_isDone) { |
| - _subscription.pause(); |
| + |
| + if (_subscription == null) { |
| + return _sourceStream; |
| } |
| - } |
| - /// Extracts the subscription and makes this stream queue unusable. |
| - /// |
| - /// Can only be used by the very last request. |
| - StreamSubscription _dispose() { |
| - assert(_isClosed); |
| var subscription = _subscription; |
| _subscription = null; |
| _isDone = true; |
| - return subscription; |
| + |
| + var wasPaused = subscription.isPaused; |
| + var result = new SubscriptionStream<T>(subscription); |
| + // Resume after creating stream because that pauses the subscription too. |
| + // This way there won't be a short resumption in the middle. |
| + if (wasPaused) subscription.resume(); |
| + return result; |
| } |
| } |
| + |
| /// Request object that receives events when they arrive, until fulfilled. |
| /// |
| /// Each request that cannot be fulfilled immediately is represented by |
| @@ -367,7 +427,7 @@ class StreamQueue<T> { |
| abstract class _EventRequest { |
| /// Handle available events. |
| /// |
| - /// The available events are provided as a queue. The `addEvents` function |
| + /// The available events are provided as a queue. The `update` function |
| /// should only remove events from the front of the event queue, e.g., |
| /// using [removeFirst]. |
| /// |
| @@ -382,22 +442,10 @@ abstract class _EventRequest { |
| /// This method is called when a request reaches the front of the request |
| /// queue, and if it returns `false`, it's called again every time a new event |
| /// becomes available, or when the stream closes. |
| - bool addEvents(Queue<Result> events); |
| - |
| - /// Complete the request. |
| - /// |
| - /// This is called when the source stream is done before the request |
| - /// had a chance to receive all its events. That is, after a call |
| - /// to [addEvents] has returned `false`. |
| - /// If there are any unused events available, they are in the [events] queue. |
| - /// No further events will become available. |
| - /// |
| - /// The queue should only remove events from the front of the event queue, |
| - /// e.g., using [removeFirst]. |
| - /// |
| - /// If the request kept events in the queue after an [addEvents] call, |
| - /// this is the last chance to use them. |
| - void close(Queue<Result> events); |
| + /// If the function returns `false` when the stream has already closed |
| + /// ([isDone] is true), then the request must call [StreamQueue._updateRequests] |
|
nweiz
2015/08/25 00:38:21
Long line
|
| + /// itself when it's ready to continue. |
| + bool update(Queue<Result> events, bool isDone); |
| } |
| /// Request for a [StreamQueue.next] call. |
| @@ -412,16 +460,18 @@ class _NextRequest<T> implements _EventRequest { |
| Future<T> get future => _completer.future; |
| - bool addEvents(Queue<Result> events) { |
| - if (events.isEmpty) return false; |
| - events.removeFirst().complete(_completer); |
| - return true; |
| - } |
| - |
| - void close(Queue<Result> events) { |
| - var errorFuture = |
| - new Future.sync(() => throw new StateError("No elements")); |
| - _completer.complete(errorFuture); |
| + bool update(Queue<Result> events, bool isDone) { |
| + if (events.isNotEmpty) { |
| + events.removeFirst().complete(_completer); |
| + return true; |
| + } |
| + if (isDone) { |
| + var errorFuture = |
| + new Future.sync(() => throw new StateError("No elements")); |
| + _completer.complete(errorFuture); |
| + return true; |
| + } |
| + return false; |
| } |
| } |
| @@ -443,22 +493,22 @@ class _SkipRequest implements _EventRequest { |
| /// The future completed when the correct number of events have been skipped. |
| Future get future => _completer.future; |
| - bool addEvents(Queue<Result> events) { |
| + bool update(Queue<Result> events, bool isDone) { |
| while (_eventsToSkip > 0) { |
| - if (events.isEmpty) return false; |
| + if (events.isEmpty) { |
| + if (isDone) break; |
| + return false; |
| + } |
| _eventsToSkip--; |
| + |
| var event = events.removeFirst(); |
| if (event.isError) { |
| event.complete(_completer); |
| return true; |
| } |
| } |
| - _completer.complete(0); |
| - return true; |
| - } |
| - |
| - void close(Queue<Result> events) { |
| _completer.complete(_eventsToSkip); |
| + return true; |
| } |
| } |
| @@ -481,9 +531,13 @@ class _TakeRequest<T> implements _EventRequest { |
| /// The future completed when the correct number of events have been captured. |
| Future get future => _completer.future; |
| - bool addEvents(Queue<Result> events) { |
| + bool update(Queue<Result> events, bool isDone) { |
| while (_list.length < _eventsToTake) { |
| - if (events.isEmpty) return false; |
| + if (events.isEmpty) { |
| + if (isDone) break; |
| + return false; |
| + } |
| + |
| var result = events.removeFirst(); |
| if (result.isError) { |
| result.complete(_completer); |
| @@ -494,10 +548,6 @@ class _TakeRequest<T> implements _EventRequest { |
| _completer.complete(_list); |
| return true; |
| } |
| - |
| - void close(Queue<Result> events) { |
| - _completer.complete(_list); |
| - } |
| } |
| /// Request for a [StreamQueue.cancel] call. |
| @@ -520,22 +570,10 @@ class _CancelRequest implements _EventRequest { |
| /// The future completed when the cancel request is completed. |
| Future get future => _completer.future; |
| - bool addEvents(Queue<Result> events) { |
| - _shutdown(); |
| + bool update(Queue<Result> events, bool isDone) { |
| + _completer.complete(_streamQueue._cancel()); |
| return true; |
| } |
| - |
| - void close(_) { |
| - _shutdown(); |
| - } |
| - |
| - void _shutdown() { |
| - if (_streamQueue._subscription == null) { |
| - _completer.complete(); |
| - } else { |
| - _completer.complete(_streamQueue._dispose().cancel()); |
| - } |
| - } |
| } |
| /// Request for a [StreamQueue.rest] call. |
| @@ -558,21 +596,12 @@ class _RestRequest<T> implements _EventRequest { |
| /// The stream which will contain the remaining events of [_streamQueue]. |
| Stream<T> get stream => _completer.stream; |
| - bool addEvents(Queue<Result> events) { |
| - _completeStream(events); |
| - return true; |
| - } |
| - |
| - void close(Queue<Result> events) { |
| - _completeStream(events); |
| - } |
| - |
| - void _completeStream(Queue<Result> events) { |
| + bool update(Queue<Result> events, bool isDone) { |
| if (events.isEmpty) { |
| if (_streamQueue._isDone) { |
| _completer.setEmpty(); |
| } else { |
| - _completer.setSourceStream(_getRestStream()); |
| + _completer.setSourceStream(_streamQueue._extractStream()); |
| } |
| } else { |
| // There are prefetched events which needs to be added before the |
| @@ -581,26 +610,11 @@ class _RestRequest<T> implements _EventRequest { |
| for (var event in events) { |
| event.addTo(controller); |
| } |
| - controller.addStream(_getRestStream(), cancelOnError: false) |
| + controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
| .whenComplete(controller.close); |
| _completer.setSourceStream(controller.stream); |
| } |
| - } |
| - |
| - /// Create a stream from the rest of [_streamQueue]'s subscription. |
| - Stream _getRestStream() { |
| - if (_streamQueue._isDone) { |
| - var controller = new StreamController<T>()..close(); |
| - return controller.stream; |
| - // TODO(lrn). Use the following when 1.11 is released. |
| - // return new Stream<T>.empty(); |
| - } |
| - if (_streamQueue._subscription == null) { |
| - return _streamQueue._sourceStream; |
| - } |
| - var subscription = _streamQueue._dispose(); |
| - subscription.resume(); |
| - return new SubscriptionStream<T>(subscription); |
| + return true; |
| } |
| } |
| @@ -615,15 +629,15 @@ class _HasNextRequest<T> implements _EventRequest { |
| Future<bool> get future => _completer.future; |
| - bool addEvents(Queue<Result> events) { |
| + bool update(Queue<Result> events, bool isDone) { |
| if (events.isNotEmpty) { |
| _completer.complete(true); |
| return true; |
| } |
| + if (isDone) { |
| + _completer.complete(false); |
| + return true; |
| + } |
| return false; |
| } |
| - |
| - void close(_) { |
| - _completer.complete(false); |
| - } |
| } |