Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index 36d03ef13a88d97f07952f6d061dba09017a2e69..09b3a75b2360a9adf2a18ad2715fe2fef08cc656 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -60,15 +60,17 @@ import "../result.dart"; |
/// |
/// When you need no further events the `StreamQueue` should be closed |
/// using [cancel]. This releases the underlying stream subscription. |
-class StreamQueue<T> { |
+abstract class StreamQueue<T> { |
// This class maintains two queues: one of events and one of requests. |
// The active request (the one in front of the queue) is called with |
- // the current event queue when it becomes active. |
+ // the current event queue when it becomes active, every time a |
+ // new event arrives, and when the event source closes. |
// |
- // If the request returns true, it's complete and will be removed from the |
+ // If the request returns `true`, it's complete and will be removed from the |
// request queue. |
- // If the request returns false, it needs more events, and will be called |
- // again when new events are available. |
+ // If the request returns `false`, it needs more events, and will be called |
+ // again when new events are available. It may trigger a call itself by |
+ // calling [_updateRequests]. |
// The request can remove events that it uses, or keep them in the event |
// queue until it has all that it needs. |
// |
@@ -77,16 +79,7 @@ class StreamQueue<T> { |
// potentially a request that takes either five or zero events, determined |
// by the content of the fifth event. |
- /// Source of events. |
- final Stream _sourceStream; |
- |
- /// Subscription on [_sourceStream] while listening for events. |
- /// |
- /// Set to subscription when listening, and set to `null` when the |
- /// subscription is done (and [_isDone] is set to true). |
- StreamSubscription _subscription; |
- |
- /// Whether we have listened on [_sourceStream] and the subscription is done. |
+ /// Whether the event source is done. |
bool _isDone = false; |
/// Whether a closing operation has been performed on the stream queue. |
@@ -103,8 +96,9 @@ class StreamQueue<T> { |
final Queue<_EventRequest> _requestQueue = new Queue(); |
/// Create a `StreamQueue` of the events of [source]. |
- StreamQueue(Stream source) |
- : _sourceStream = source; |
+ factory StreamQueue(Stream source) = _StreamQueue<T>; |
+ |
+ StreamQueue._(); |
/// Asks if the stream has any more events. |
/// |
@@ -115,6 +109,8 @@ class StreamQueue<T> { |
/// |
/// Can be used before using [next] to avoid getting an error in the |
/// future returned by `next` in the case where there are no more events. |
+ /// Another alternative is to use `take(1)` which returns either zero or |
+ /// one events. |
Future<bool> get hasNext { |
if (!_isClosed) { |
var hasNextRequest = new _HasNextRequest(); |
@@ -216,15 +212,15 @@ class StreamQueue<T> { |
throw _failClosed(); |
} |
- /// Cancels the underlying stream subscription. |
+ /// Cancels the underlying event source. |
/// |
/// If [immediate] is `false` (the default), the cancel operation waits until |
/// all previously requested events have been processed, then it cancels the |
/// subscription providing the events. |
/// |
- /// If [immediate] is `true`, the subscription is instead canceled |
- /// immediately. Any pending events complete with a 'closed'-event, as though |
- /// the stream had closed by itself. |
+ /// If [immediate] is `true`, the source is instead canceled |
+ /// immediately. Any pending events are completed as though the underlying |
+ /// stream had closed. |
/// |
/// The returned future completes with the result of calling |
/// `cancel`. |
@@ -242,114 +238,178 @@ class StreamQueue<T> { |
return request.future; |
} |
- if (_isDone) return new Future.value(); |
- if (_subscription == null) _subscription = _sourceStream.listen(null); |
- var future = _subscription.cancel(); |
- _onDone(); |
- return future; |
+ if (_isDone && _eventQueue.isEmpty) return new Future.value(); |
+ return _cancel(); |
} |
- /// Returns an error for when a request is made after cancel. |
+ // ------------------------------------------------------------------ |
+ // Methods that may be called from the request implementations to |
+ // control the even stream. |
+ |
+ /// Matches events with requests. |
/// |
- /// Returns a [StateError] with a message saying that either |
- /// [cancel] or [rest] have already been called. |
- Error _failClosed() { |
- return new StateError("Already cancelled"); |
+ /// Called after receiving an event or when the event source closes. |
+ /// |
+ /// May be called by requests which have returned `false` (saying they |
+ /// are not yet done) so they can be checked again before any new |
+ /// events arrive. |
+ /// Any request returing `false` from `update` when `isDone` is `true` |
+ /// *must* call `_updateRequests` when they are ready to continue |
+ /// (since no further events will trigger the call). |
+ void _updateRequests() { |
+ while (_requestQueue.isNotEmpty) { |
+ if (_requestQueue.first.update(_eventQueue, _isDone)) { |
+ _requestQueue.removeFirst(); |
+ } else { |
+ return; |
+ } |
+ } |
+ |
+ if (!_isDone) { |
+ _pause(); |
+ } |
} |
- // Callbacks receiving the events of the source stream. |
+ /// Extracts a stream from the event source and makes this stream queue |
+ /// unusable. |
+ /// |
+ /// Can only be used by the very last request (the stream queue must |
+ /// be closed by that request). |
+ /// Only used by [rest]. |
+ Stream _extractStream(); |
- void _onData(T data) { |
- _eventQueue.add(new Result.value(data)); |
- _checkQueues(); |
- } |
+ /// Requests that the event source pauses events. |
+ /// |
+ /// This is called automatically when the request queue is empty. |
+ /// |
+ /// The event source is restarted by the next call to [_ensureListening]. |
+ void _pause(); |
- void _onError(error, StackTrace stack) { |
- _eventQueue.add(new Result.error(error, stack)); |
- _checkQueues(); |
+ /// Ensures that we are listening on events from the event source. |
+ /// |
+ /// Starts listening for the first time or resumes after a [_pause]. |
+ /// |
+ /// Is called automatically if a request requires more events. |
+ void _ensureListening(); |
+ |
+ /// Cancels the underlying event source. |
+ Future _cancel(); |
+ |
+ // ------------------------------------------------------------------ |
+ // Methods called by the event source to add events or say that it's |
+ // done. |
+ |
+ /// Called when the event source adds a new data or error event. |
+ /// Always calls [_updateRequests] after adding. |
+ void _addResult(Result result) { |
+ _eventQueue.add(result); |
+ _updateRequests(); |
} |
- void _onDone() { |
- _subscription = null; |
+ /// Called when the event source is done. |
+ /// Always calls [_updateRequests] after adding. |
+ void _close() { |
_isDone = true; |
- _closeAllRequests(); |
+ _updateRequests(); |
} |
- // Request queue management. |
+ // ------------------------------------------------------------------ |
+ // Internal helper methods. |
+ |
+ /// Returns an error for when a request is made after cancel. |
+ /// |
+ /// Returns a [StateError] with a message saying that either |
+ /// [cancel] or [rest] have already been called. |
+ Error _failClosed() { |
+ return new StateError("Already cancelled"); |
+ } |
/// Adds a new request to the queue. |
+ /// |
+ /// If the request queue is empty and the request can be completed |
+ /// immediately, it skips the queue. |
void _addRequest(_EventRequest request) { |
- if (_isDone) { |
- assert(_requestQueue.isEmpty); |
- if (!request.addEvents(_eventQueue)) { |
- request.close(_eventQueue); |
- } |
- return; |
- } |
if (_requestQueue.isEmpty) { |
- if (request.addEvents(_eventQueue)) return; |
+ if (request.update(_eventQueue, _isDone)) return; |
_ensureListening(); |
} |
_requestQueue.add(request); |
} |
+} |
+ |
- /// Ensures that we are listening on events from [_sourceStream]. |
+/// The default implementation of [StreamQueue]. |
+/// |
+/// This queue gets its events from a stream which is listened |
+/// to when a request needs events. |
+class _StreamQueue<T> extends StreamQueue<T> { |
+ /// Source of events. |
+ final Stream _sourceStream; |
+ |
+ /// Subscription on [_sourceStream] while listening for events. |
/// |
- /// Resumes subscription on [_sourceStream], or creates it if necessary. |
+ /// Set to subscription when listening, and set to `null` when the |
+ /// subscription is done (and [_isDone] is set to true). |
+ StreamSubscription _subscription; |
+ |
+ _StreamQueue(this._sourceStream) : super._(); |
+ |
+ Future _cancel() { |
+ if (_isDone) return null; |
+ if (_subscription == null) _subscription = _sourceStream.listen(null); |
+ var future = _subscription.cancel(); |
+ _close(); |
+ return future; |
+ } |
+ |
void _ensureListening() { |
assert(!_isDone); |
if (_subscription == null) { |
_subscription = |
- _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
+ _sourceStream.listen( |
+ (data) { |
+ _addResult(new Result.value(data)); |
+ }, |
+ onError: (error, StackTrace stackTrace) { |
+ _addResult(new Result.error(error, stackTrace)); |
+ }, |
+ onDone: () { |
+ _subscription = null; |
+ this._close(); |
+ }); |
} else { |
_subscription.resume(); |
} |
} |
- /// Removes all requests and closes them. |
- /// |
- /// Used when the source stream is done. |
- /// After this, no further requests will be added to the queue, |
- /// requests are immediately served entirely by events already in the event |
- /// queue, if any. |
- void _closeAllRequests() { |
- assert(_isDone); |
- while (_requestQueue.isNotEmpty) { |
- var request = _requestQueue.removeFirst(); |
- if (!request.addEvents(_eventQueue)) { |
- request.close(_eventQueue); |
- } |
- } |
+ void _pause() { |
+ _subscription.pause(); |
} |
- /// Matches events with requests. |
- /// |
- /// Called after receiving an event. |
- void _checkQueues() { |
- while (_requestQueue.isNotEmpty) { |
- if (_requestQueue.first.addEvents(_eventQueue)) { |
- _requestQueue.removeFirst(); |
- } else { |
- return; |
- } |
+ Stream<T> _extractStream() { |
+ assert(_isClosed); |
+ if (_isDone) { |
+ return new Stream<T>.empty(); |
} |
- if (!_isDone) { |
- _subscription.pause(); |
+ |
+ if (_subscription == null) { |
+ return _sourceStream; |
} |
- } |
- /// Extracts the subscription and makes this stream queue unusable. |
- /// |
- /// Can only be used by the very last request. |
- StreamSubscription _dispose() { |
- assert(_isClosed); |
var subscription = _subscription; |
_subscription = null; |
_isDone = true; |
- return subscription; |
+ |
+ var wasPaused = subscription.isPaused; |
+ var result = new SubscriptionStream<T>(subscription); |
+ // Resume after creating stream because that pauses the subscription too. |
+ // This way there won't be a short resumption in the middle. |
+ if (wasPaused) subscription.resume(); |
+ return result; |
} |
} |
+ |
/// Request object that receives events when they arrive, until fulfilled. |
/// |
/// Each request that cannot be fulfilled immediately is represented by |
@@ -367,7 +427,7 @@ class StreamQueue<T> { |
abstract class _EventRequest { |
/// Handle available events. |
/// |
- /// The available events are provided as a queue. The `addEvents` function |
+ /// The available events are provided as a queue. The `update` function |
/// should only remove events from the front of the event queue, e.g., |
/// using [removeFirst]. |
/// |
@@ -382,22 +442,10 @@ abstract class _EventRequest { |
/// This method is called when a request reaches the front of the request |
/// queue, and if it returns `false`, it's called again every time a new event |
/// becomes available, or when the stream closes. |
- bool addEvents(Queue<Result> events); |
- |
- /// Complete the request. |
- /// |
- /// This is called when the source stream is done before the request |
- /// had a chance to receive all its events. That is, after a call |
- /// to [addEvents] has returned `false`. |
- /// If there are any unused events available, they are in the [events] queue. |
- /// No further events will become available. |
- /// |
- /// The queue should only remove events from the front of the event queue, |
- /// e.g., using [removeFirst]. |
- /// |
- /// If the request kept events in the queue after an [addEvents] call, |
- /// this is the last chance to use them. |
- void close(Queue<Result> events); |
+ /// If the function returns `false` when the stream has already closed |
+ /// ([isDone] is true), then the request must call |
+ /// [StreamQueue._updateRequests] itself when it's ready to continue. |
+ bool update(Queue<Result> events, bool isDone); |
} |
/// Request for a [StreamQueue.next] call. |
@@ -412,16 +460,18 @@ class _NextRequest<T> implements _EventRequest { |
Future<T> get future => _completer.future; |
- bool addEvents(Queue<Result> events) { |
- if (events.isEmpty) return false; |
- events.removeFirst().complete(_completer); |
- return true; |
- } |
- |
- void close(Queue<Result> events) { |
- var errorFuture = |
- new Future.sync(() => throw new StateError("No elements")); |
- _completer.complete(errorFuture); |
+ bool update(Queue<Result> events, bool isDone) { |
+ if (events.isNotEmpty) { |
+ events.removeFirst().complete(_completer); |
+ return true; |
+ } |
+ if (isDone) { |
+ var errorFuture = |
+ new Future.sync(() => throw new StateError("No elements")); |
+ _completer.complete(errorFuture); |
+ return true; |
+ } |
+ return false; |
} |
} |
@@ -443,22 +493,22 @@ class _SkipRequest implements _EventRequest { |
/// The future completed when the correct number of events have been skipped. |
Future get future => _completer.future; |
- bool addEvents(Queue<Result> events) { |
+ bool update(Queue<Result> events, bool isDone) { |
while (_eventsToSkip > 0) { |
- if (events.isEmpty) return false; |
+ if (events.isEmpty) { |
+ if (isDone) break; |
+ return false; |
+ } |
_eventsToSkip--; |
+ |
var event = events.removeFirst(); |
if (event.isError) { |
event.complete(_completer); |
return true; |
} |
} |
- _completer.complete(0); |
- return true; |
- } |
- |
- void close(Queue<Result> events) { |
_completer.complete(_eventsToSkip); |
+ return true; |
} |
} |
@@ -481,9 +531,13 @@ class _TakeRequest<T> implements _EventRequest { |
/// The future completed when the correct number of events have been captured. |
Future get future => _completer.future; |
- bool addEvents(Queue<Result> events) { |
+ bool update(Queue<Result> events, bool isDone) { |
while (_list.length < _eventsToTake) { |
- if (events.isEmpty) return false; |
+ if (events.isEmpty) { |
+ if (isDone) break; |
+ return false; |
+ } |
+ |
var result = events.removeFirst(); |
if (result.isError) { |
result.complete(_completer); |
@@ -494,10 +548,6 @@ class _TakeRequest<T> implements _EventRequest { |
_completer.complete(_list); |
return true; |
} |
- |
- void close(Queue<Result> events) { |
- _completer.complete(_list); |
- } |
} |
/// Request for a [StreamQueue.cancel] call. |
@@ -520,22 +570,14 @@ class _CancelRequest implements _EventRequest { |
/// The future completed when the cancel request is completed. |
Future get future => _completer.future; |
- bool addEvents(Queue<Result> events) { |
- _shutdown(); |
- return true; |
- } |
- |
- void close(_) { |
- _shutdown(); |
- } |
- |
- void _shutdown() { |
+ bool update(Queue<Result> events, bool isDone) { |
if (_streamQueue._isDone) { |
_completer.complete(); |
} else { |
_streamQueue._ensureListening(); |
- _completer.complete(_streamQueue._dispose().cancel()); |
+ _completer.complete(_streamQueue._extractStream().listen(null).cancel()); |
} |
+ return true; |
} |
} |
@@ -559,21 +601,12 @@ class _RestRequest<T> implements _EventRequest { |
/// The stream which will contain the remaining events of [_streamQueue]. |
Stream<T> get stream => _completer.stream; |
- bool addEvents(Queue<Result> events) { |
- _completeStream(events); |
- return true; |
- } |
- |
- void close(Queue<Result> events) { |
- _completeStream(events); |
- } |
- |
- void _completeStream(Queue<Result> events) { |
+ bool update(Queue<Result> events, bool isDone) { |
if (events.isEmpty) { |
if (_streamQueue._isDone) { |
_completer.setEmpty(); |
} else { |
- _completer.setSourceStream(_getRestStream()); |
+ _completer.setSourceStream(_streamQueue._extractStream()); |
} |
} else { |
// There are prefetched events which needs to be added before the |
@@ -582,26 +615,11 @@ class _RestRequest<T> implements _EventRequest { |
for (var event in events) { |
event.addTo(controller); |
} |
- controller.addStream(_getRestStream(), cancelOnError: false) |
+ controller.addStream(_streamQueue._extractStream(), cancelOnError: false) |
.whenComplete(controller.close); |
_completer.setSourceStream(controller.stream); |
} |
- } |
- |
- /// Create a stream from the rest of [_streamQueue]'s subscription. |
- Stream _getRestStream() { |
- if (_streamQueue._isDone) { |
- var controller = new StreamController<T>()..close(); |
- return controller.stream; |
- // TODO(lrn). Use the following when 1.11 is released. |
- // return new Stream<T>.empty(); |
- } |
- if (_streamQueue._subscription == null) { |
- return _streamQueue._sourceStream; |
- } |
- var subscription = _streamQueue._dispose(); |
- subscription.resume(); |
- return new SubscriptionStream<T>(subscription); |
+ return true; |
} |
} |
@@ -616,15 +634,15 @@ class _HasNextRequest<T> implements _EventRequest { |
Future<bool> get future => _completer.future; |
- bool addEvents(Queue<Result> events) { |
+ bool update(Queue<Result> events, bool isDone) { |
if (events.isNotEmpty) { |
_completer.complete(true); |
return true; |
} |
+ if (isDone) { |
+ _completer.complete(false); |
+ return true; |
+ } |
return false; |
} |
- |
- void close(_) { |
- _completer.complete(false); |
- } |
} |