Chromium Code Reviews| Index: lib/src/stream_queue.dart |
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..7c2db4a53f775469c728bbd97e50893360f53543 |
| --- /dev/null |
| +++ b/lib/src/stream_queue.dart |
| @@ -0,0 +1,621 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.stream_events; |
| + |
| +import 'dart:async'; |
| +import 'dart:collection'; |
| + |
| +import "subscription_stream.dart"; |
| +import "stream_completer.dart"; |
| +import "../result.dart"; |
| + |
| +/// An asynchronous pull-based interface for accessing stream events. |
| +/// |
| +/// Wraps a stream and makes individual events available on request. |
| +/// |
| +/// You can request (and reserve) one or more events from the stream, |
| +/// and after all previous requestes have been fulfilled, stream events |
|
nweiz
2015/06/18 23:44:26
"requestes" -> "requests"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| +/// go towards fulfilling your request. |
| +/// |
| +/// For example, if you ask for [next] two times, the returned futures |
| +/// will be completed by the next two unreserved events from the stream. |
|
nweiz
2015/06/18 23:44:25
"unreserved" -> "unrequested"? Just to keep the te
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| +/// |
| +/// The stream subscription is paused when there are no active |
| +/// requests. |
| +/// |
| +/// Some streams, including broadcast streams, will buffer |
| +/// events while paused, so waiting too long between requests may |
| +/// cause memory bloat somewhere else. |
| +/// |
| +/// The individual requests are served in the order they are requested, |
| +/// and the stream subscription is paused when there are no active requests. |
|
nweiz
2015/06/18 23:44:27
Both of these are already mentioned above, so this
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| +/// |
| +/// This is similar to, but more convenient than, a [StreamIterator]. |
| +/// A `StreamIterator` requires you to manually check when a new event is |
| +/// available and you can only access the value of that event until you |
| +/// check for the next one. A `StreamQueue` allows you to request, for example, |
| +/// three events at a time, either individually, as a group using [take] |
| +/// or [skip], or in any combination. |
| +/// |
| +/// You can also ask to have the [rest] of the stream provided as |
| +/// a new stream. This allows, for example, taking the first event |
| +/// out of a stream and continue using the rest of the stream as a stream. |
|
nweiz
2015/06/18 23:44:27
"continue using" -> "continuing to use"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| +/// |
| +/// Example: |
| +/// |
| +/// var events = new StreamQueue<String>(someStreamOfLines); |
| +/// var first = await events.next; |
| +/// while (first.startsWith('#')) { |
| +/// // Skip comments. |
| +/// first = await events.next; |
| +/// } |
| +/// |
| +/// if (first.startsWith(MAGIC_MARKER)) { |
| +/// var headerCount = |
| +/// first.parseInt(first.substring(MAGIC_MARKER.length + 1)); |
| +/// handleMessage(headers: await events.take(headerCount), |
| +/// body: events.rest); |
| +/// return; |
| +/// } |
| +/// // Error handling. |
| +/// |
| +/// When you need no further events the `StreamQueue` should be closed |
| +/// using [cancel]. This releases the underlying stream subscription. |
| +/// |
| +/// The underlying stream subscription is paused when there |
| +/// are no requeusts. Some subscriptions, including those of broadcast streams, |
| +/// will still buffer events while paused. Creating a `StreamQueue` from |
| +/// such a stream and stopping to request events, will cause memory to fill up |
|
nweiz
2015/06/18 23:44:27
"stopping" -> "ceasing", get rid of the comma
|
| +/// unnecessarily. |
|
nweiz
2015/06/18 23:44:26
This paragraph also seems redundant with the sixth
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
|
| +class StreamQueue<T> { |
| + // This class maintains two queues: one of events and one of requests. |
| + // The active request (the one in front of the queue) is called with |
| + // the current event queue when it becomes active. |
|
nweiz
2015/06/18 23:44:27
"it" -> "the request" (to clarify whether you're r
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + // If it returns true, it's done and will be removed from the request queue. |
|
nweiz
2015/06/18 23:44:26
"it" -> "the request" again. The next "it" is fine
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + // If it returns false, it needs more events, and will be called again when |
| + // new events are available. |
| + // The request can remove events that it uses, or keep them in the event |
| + // queue until it has all that it needs. |
| + // |
| + // This model is very flexible and easily extensible. |
| + // It allows requests that don't consume events (like [hasNext]) or |
| + // potentially a request that takes either five or zero events, determined |
| + // by the content of the fifth event. |
| + |
| + /// Source of events. |
| + final Stream _sourceStream; |
| + |
| + /// Number of events that may be prefetched when there is no request. |
| + final int _prefetchCount; |
| + |
| + /// Subscription on [_sourceStream] while listening for events. |
| + /// |
| + /// Set to subscription when listening, and set to `null` when the |
| + /// subscription is done (and [_isDone] is set to true). |
| + StreamSubscription _subscription; |
| + |
| + /// Whether we have listened on [_sourceStream] and the subscription is done. |
| + bool _isDone = false; |
| + |
| + /// Whether a closing operation has been performed on the stream queue. |
| + /// |
| + /// Closing operations are [cancel] and [rest]. |
| + bool _isClosed = false; |
| + |
| + /// Queue of events not used by a request yet. |
| + final Queue<Result> _eventQueue = new Queue(); |
| + |
| + /// Queue of pending requests. |
| + /// Access through methods below to ensure consistency. |
|
nweiz
2015/06/18 23:44:26
Nit: add a newline above
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + final Queue<_EventRequest> _requestQueue = new Queue(); |
| + |
| + /// Create a `StreamQueue` of the events of source. |
|
nweiz
2015/06/18 23:44:26
"source" -> "[source]"
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
|
| + /// |
| + /// Allow prefetching [prefetch] events before pausing the source |
| + /// stream even if there are no current requests for them. |
| + /// The default is to pause immediately when there is no pending request. |
| + /// Even if `prefetch` is greater than zero, the stream won't listened on |
| + /// before the first request. |
|
nweiz
2015/06/18 23:44:25
Consider allowing "prefetch: -1" to mean "never pa
Lasse Reichstein Nielsen
2015/06/30 10:34:14
I don't like -1 being magical, the alternative wou
|
| + StreamQueue(Stream source, {int prefetch: 0}) |
|
nweiz
2015/06/18 23:44:26
Consider making [prefetch] default to null and ass
Lasse Reichstein Nielsen
2015/06/30 10:34:12
True. I'll just remove it for now (effectively fix
|
| + : _sourceStream = source, _prefetchCount = prefetch; |
| + |
| + /// Asks if the stream has any more events. |
| + /// |
| + /// Returns a future that completes with `true` if the stream has any |
| + /// more events, whether data or error. |
| + /// If the stream closes without producing any more events, the returned |
| + /// future completes with `false`. |
| + /// |
| + /// Can be used before using [next] to avoid getting an error in the |
| + /// future returned by `next` in the case where there are no more events. |
| + Future<bool> get hasNext { |
| + if (!_isClosed) { |
| + _HasNextRequest hasNextRequest = new _HasNextRequest(); |
|
nweiz
2015/06/18 23:44:26
Nit: "var" (also below)
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
|
| + _addRequest(hasNextRequest); |
| + return hasNextRequest.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Requests the next (yet unrequested) event from the stream. |
| + /// |
| + /// When the requested event arrives, the returned future is completed with |
| + /// the event. |
| + /// If the event is a data event, the returned future completes |
| + /// with its value. |
| + /// If the event is an error event, the returned future completes with |
| + /// its error and stack trace. |
| + /// If the stream closes before an event arrives, the returned future |
| + /// completes with a [StateError]. |
| + /// |
| + /// It's possible to have several pending [next] calls (or other requests), |
| + /// and they will be completed in the order they were requested, by the |
| + /// first events that were not used by previous requeusts. |
|
nweiz
2015/06/18 23:44:26
"used" -> "consumed" (slightly more correct for re
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + Future<T> get next { |
| + if (!_isClosed) { |
| + _NextRequest nextRequest = new _NextRequest<T>(); |
| + _addRequest(nextRequest); |
| + return nextRequest.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Returns a stream of all the remaning events of the source stream. |
| + /// |
| + /// All requested [next], [skip] or [take] operations are completed |
| + /// first, and then any remaining events are provided as events of |
| + /// the returned stream. |
| + /// |
| + /// Using `rest` closes the stream events object. After getting the |
|
nweiz
2015/06/18 23:44:26
"the stream events object" -> "this stream queue"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + /// `rest` the caller may no longer request other events, like |
| + /// after calling [cancel]. |
| + Stream<T> get rest { |
| + if (_isClosed) { |
| + throw _failClosed(); |
| + } |
| + var request = new _RestRequest<T>(this); |
| + _isClosed = true; |
| + _addRequest(request); |
| + return request.stream; |
| + } |
| + |
| + /// Skips the next [count] *data* events. |
| + /// |
| + /// The [count] must be non-negative. |
| + /// |
| + /// When successful, this is equivalent to using [take] |
| + /// and ignoring the result. |
| + /// |
| + /// If an error occurs before `count` data events have been skipped, |
| + /// the returned future completes with that error instead. |
| + /// |
| + /// If the stream closes before `count` data events, |
| + /// the remaining unskipped event count is returned. |
| + /// If the returned future completes with the integer `0`, |
| + /// then all events were succssfully skipped. If the value |
| + /// is greater than zero then the stream ended early. |
| + Future<int> skip(int count) { |
| + if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| + if (!_isClosed) { |
| + var request = new _SkipRequest(count); |
| + _addRequest(request); |
| + return request.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Requests the next [count] data events as a list. |
| + /// |
| + /// The [count] must be non-negative. |
| + /// |
| + /// Equivalent to calling [next] `count` times and |
| + /// storing the data values in a list. |
| + /// |
| + /// If an error occurs before `count` data events has |
| + /// been collected, the returned future completes with |
| + /// that error instead. |
| + /// |
| + /// If the stream closes before `count` data events, |
| + /// the returned future completes with the list |
| + /// of data collected so far. That is, the returned |
| + /// list may have fewer than [count] elements. |
| + Future<List<T>> take(int count) { |
| + if (count < 0) throw new RangeError.range(count, 0, null, "count"); |
| + if (!_isClosed) { |
| + var request = new _TakeRequest<T>(count); |
| + _addRequest(request); |
| + return request.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Cancels the underlying stream subscription. |
| + /// |
| + /// The cancel operation waits until all previously requested |
| + /// events have been processed, then it cancels the subscription |
| + /// providing the events. |
| + /// |
| + /// The returned future completes with the result of calling |
| + /// `cancel`. |
| + /// |
| + /// After calling `cancel`, no further events can be requested. |
| + /// None of [next], [rest], [skip], [take] or [cancel] may be |
| + /// called again. |
| + Future cancel() { |
| + if (!_isClosed) { |
| + _isClosed = true; |
| + var request = new _CancelRequest(this); |
| + _addRequest(request); |
| + return request.future; |
| + } |
| + throw _failClosed(); |
| + } |
| + |
| + /// Returns an error for when a request is made after cancel. |
| + /// |
| + /// Returns a [StateError] with a message saying that either |
| + /// [cancel] or [rest] have already been called. |
| + Error _failClosed() { |
| + return new StateError("Already cancelled"); |
| + } |
| + |
| + // Callbacks receiving the events of the source stream. |
| + |
| + void _onData(T data) { |
| + _eventQueue.add(new Result.value(data)); |
| + _checkQueues(); |
| + } |
| + |
| + void _onError(error, StackTrace stack) { |
| + _eventQueue.add(new Result.error(error, stack)); |
| + _checkQueues(); |
| + } |
| + |
| + void _onDone() { |
| + _subscription = null; |
| + _isDone = true; |
| + _closeAllRequests(); |
| + } |
| + |
| + // Request queue management. |
| + |
| + /// Add a new request to the queue. |
|
nweiz
2015/06/18 23:44:26
"Add" -> "Adds"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + void _addRequest(_EventRequest request) { |
| + if (_isDone) { |
| + assert(_requestQueue.isEmpty); |
| + if (!request.addEvents(_eventQueue)) { |
| + request.close(_eventQueue); |
| + } |
| + return; |
| + } |
| + if (_requestQueue.isEmpty) { |
| + if (request.addEvents(_eventQueue)) return; |
| + _ensureListening(); |
| + } |
| + _requestQueue.add(request); |
| + |
|
nweiz
2015/06/18 23:44:26
Nit: extra newline
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + } |
| + |
| + /// Ensures that we are listening on events from [_sourceStream]. |
| + /// |
| + /// Resumes subscription on [_sourceStream], or create it if necessary. |
|
nweiz
2015/06/18 23:44:26
"create" -> "creates"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + StreamSubscription _ensureListening() { |
| + assert(!_isDone); |
| + if (_subscription == null) { |
| + _subscription = |
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| + } else { |
| + _subscription.resume(); |
| + } |
| + } |
| + |
| + |
| + /// Remove all requests and close them. |
|
nweiz
2015/06/18 23:44:26
"Remove" -> "Removes", "close" -> "closes"
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + /// |
| + /// Used when the source stream is done. |
| + /// After this, no further requests will be added to the queue, |
| + /// requests are immediately served entirely by events already in the event |
| + /// queue, if any. |
| + void _closeAllRequests() { |
| + assert(_isDone); |
| + while (_requestQueue.isNotEmpty) { |
| + var request = _requestQueue.removeFirst(); |
| + if (!request.addEvents(_eventQueue)) { |
|
nweiz
2015/06/18 23:44:26
Isn't this guaranteed to return false? [request.ad
Lasse Reichstein Nielsen
2015/06/30 10:34:12
It's only guaranteed to return false for the first
|
| + request.close(_eventQueue); |
| + } |
| + } |
| + } |
| + |
| + /// Matches events with requests. |
| + /// |
| + /// Called after receiving an event. |
| + void _checkQueues() { |
| + while (_requestQueue.isNotEmpty) { |
| + if (_requestQueue.first.addEvents(_eventQueue)) { |
| + _requestQueue.removeFirst(); |
| + } else { |
| + return; |
| + } |
| + } |
| + if (!_isDone && _eventQueue.length >= _prefetchCount) { |
| + _subscription.pause(); |
| + } |
| + } |
| + |
| + /// Extracts the subscription and makes the events object unusable. |
|
nweiz
2015/06/18 23:44:26
"events object" -> "stream queue"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + /// |
| + /// Can only be used by the very last request. |
| + StreamSubscription _dispose() { |
| + assert(_isClosed); |
| + StreamSubscription subscription = _subscription; |
|
nweiz
2015/06/18 23:44:25
Nit: "var"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + _subscription = null; |
| + _isDone = true; |
| + return subscription; |
| + } |
| +} |
| + |
| +/// Request object that receives events when they arrive, until fulfilled. |
| +/// |
| +/// Each request that cannot be fulfilled immediately is represented by |
| +/// an `_EventRequest` object in the request queue. |
| +/// |
| +/// Events from the source stream are sent to the first request in the |
| +/// queue until it reports itself as [isComplete]. |
| +/// |
| +/// When the first request in the queue `isComplete`, either when becoming |
| +/// the first request or after receiving an event, its [close] methods is |
| +/// called. |
| +/// |
| +/// The [close] method is also called immediately when the source stream |
| +/// is done. |
| +abstract class _EventRequest implements EventSink { |
| + /// Handle available events. |
| + /// |
| + /// The available events are provided as a queue. The `addEvents` function |
| + /// should only access the queue from the start, e.g., using [removeFirst]. |
|
nweiz
2015/06/18 23:44:27
"access the queue from the start" -> "remove event
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + /// |
| + /// Returns `true` if if the request is completed, or `false` if it needs |
|
nweiz
2015/06/18 23:44:27
"if if" -> "if"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + /// more events. |
| + /// The call may keep events in the queue until the requeust is complete, |
| + /// or it may remove them immediately. |
| + /// |
| + /// This method is called when a request reaches the front of the request |
| + /// queue, and if it returns `false`, it's called again every time an event |
|
nweiz
2015/06/18 23:44:25
"an event" -> "a new event"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + /// becomes available. |
|
nweiz
2015/06/18 23:44:27
"and finally when the stream closes"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + bool addEvents(Queue<Result> events); |
| + |
| + /// Complete the request. |
| + /// |
| + /// This is called when the source stream is done before the request |
| + /// had a chance to receive events. If there are any events available, |
| + /// they are in the [events] queue. No further events will become available. |
| + /// |
| + /// The queue should only be accessed from the start, e.g., |
| + /// using [removeFirst]. |
| + /// |
| + /// If the requests kept events in the queue after an [addEvents] call, |
|
nweiz
2015/06/18 23:44:26
"requests" -> "request"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + /// it should remove them here. |
| + void close(Queue<Result> events); |
| +} |
| + |
| +/// Request for a [StreamQueue.next] call. |
| +/// |
| +/// Completes the returned future when receiving the first event, |
| +/// and is then complete. |
| +class _NextRequest<T> implements _EventRequest { |
| + /// Completer for the future returned by [StreamQueue.next]. |
| + /// |
| + /// Set to `null` when it is completed, to mark it as already complete. |
|
nweiz
2015/06/18 23:44:27
This is no longer accurate.
|
| + final Completer _completer; |
| + |
| + _NextRequest() : _completer = new Completer<T>(); |
|
nweiz
2015/06/18 23:44:26
Nit: assign _completer in the declaration
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
|
| + |
| + Future<T> get future => _completer.future; |
| + |
| + bool addEvents(Queue<Result> events) { |
| + if (events.isEmpty) return false; |
| + events.removeFirst().complete(_completer); |
| + return true; |
| + } |
| + |
| + void close(Queue<Result> events) { |
| + _completer.completeError(new StateError("no elements")); |
|
nweiz
2015/06/18 23:44:27
"no" -> "No"
Also include a stack trace here so t
Lasse Reichstein Nielsen
2015/06/30 10:34:14
Done.
|
| + } |
| +} |
| + |
| +/// Request for a [StreamQueue.skip] call. |
| +class _SkipRequest implements _EventRequest { |
| + /// Completer for the future returned by the skip call. |
| + final Completer _completer = new Completer<int>(); |
| + |
| + /// Number of remaining events to skip. |
| + /// |
| + /// The request [isComplete] when the values reaches zero. |
| + /// |
| + /// Decremented when an event is seen. |
| + /// Set to zero when an error is seen since errors abort the skip request. |
| + int _eventsToSkip; |
| + |
| + _SkipRequest(this._eventsToSkip); |
| + |
| + /// The future completed when the correct number of events have been skipped. |
| + Future get future => _completer.future; |
| + |
| + bool addEvents(Queue<Result> events) { |
| + while (_eventsToSkip > 0) { |
| + if (events.isEmpty) return false; |
| + _eventsToSkip--; |
| + var event = events.removeFirst(); |
| + if (event.isError) { |
| + event.complete(_completer); |
| + return true; |
| + } |
| + } |
| + _completer.complete(0); |
| + return true; |
| + } |
| + |
| + void close(Queue<Result> events) { |
| + _completer.complete(_eventsToSkip); |
| + } |
| +} |
| + |
| +/// Request for a [StreamQueue.take] call. |
| +class _TakeRequest<T> implements _EventRequest { |
| + /// Completer for the future returned by the take call. |
| + final Completer _completer; |
| + |
| + /// List collecting events until enough have been seen. |
| + final List _list = <T>[]; |
| + |
| + /// Number of events to capture. |
| + /// |
| + /// The request [isComplete] when the length of [_list] reaches |
| + /// this value. |
| + final int _eventsToTake; |
| + |
| + _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| + |
| + /// The future completed when the correct number of events have been captured. |
| + Future get future => _completer.future; |
| + |
| + bool addEvents(Queue<Events> events) { |
| + while (_list.length < _eventsToTake) { |
| + if (events.isEmpty) return false; |
| + var result = events.removeFirst(); |
| + if (result.isError) { |
| + result.complete(_completer); |
| + return true; |
| + } |
| + _list.add(result.asValue.value); |
| + } |
| + _completer.complete(_list); |
| + return true; |
| + } |
| + |
| + void close(Queue<Events> events) { |
| + _completer.complete(_list); |
| + } |
| +} |
| + |
| +/// Request for a [StreamQueue.cancel] call. |
| +/// |
| +/// The request is always complete, it just waits in the request queue |
| +/// until all previous events are fulfilled, then it cancels the stream events |
| +/// subscription. |
| +class _CancelRequest implements _EventRequest { |
| + /// Completer for the future returned by the `cancel` call. |
| + final Completer _completer = new Completer(); |
| + |
| + /// The [StreamQueue] object that has this request queued. |
| + /// |
| + /// When the event is completed, it needs to cancel the active subscription |
| + /// of the `StreamQueue` object, if any. |
| + final StreamQueue _streamQueue; |
| + |
| + _CancelRequest(this._streamQueue); |
| + |
| + /// The future completed when the cancel request is completed. |
| + Future get future => _completer.future; |
| + |
| + bool addEvents(Queue<Result> events) { |
| + _shutdown(); |
| + return true; |
| + } |
| + |
| + void close(_) { |
| + _shutdown(); |
| + } |
| + |
| + void _shutdown() { |
| + if (_streamQueue._subscription == null) { |
| + _completer.complete(); |
| + } else { |
| + _completer.complete(_streamQueue._dispose().cancel()); |
| + } |
| + } |
| +} |
| + |
| +/// Request for a [StreamQueue.rest] call. |
| +/// |
| +/// The request is always complete, it just waits in the request queue |
| +/// until all previous events are fulfilled, then it takes over the |
| +/// stream events subscription and creates a stream from it. |
| +class _RestRequest<T> implements _EventRequest { |
| + /// Completer for the stream returned by the `rest` call. |
| + final StreamCompleter _completer; |
| + |
| + /// The [StreamQueue] object that has this request queued. |
| + /// |
| + /// When the event is completed, it needs to cancel the active subscription |
| + /// of the `StreamQueue` object, if any. |
| + final StreamQueue _streamQueue; |
| + _RestRequest(this._streamQueue) : _completer = new StreamCompleter<T>(); |
|
nweiz
2015/06/18 23:44:27
Nit: newline above.
Also move [_completer]'s init
Lasse Reichstein Nielsen
2015/06/30 10:34:12
Done.
|
| + |
| + /// The future which will contain the remaining events of [_streamQueue]. |
|
nweiz
2015/06/18 23:44:26
"future" -> "stream"
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Done.
|
| + Stream<T> get stream => _completer.stream; |
| + |
| + bool addEvents(Queue<Result> events) { |
| + _completeStream(events); |
| + return true; |
| + } |
| + |
| + void close(Queue<Result> events) { |
| + _completeStream(events); |
| + } |
| + |
| + void _completeStream(Queue<Result> events) { |
| + Stream getRestStream() { |
|
nweiz
2015/06/18 23:44:26
Why is this a local method (as opposed to a privat
Lasse Reichstein Nielsen
2015/06/30 10:34:14
It's only used here, so I didn't see a need to giv
nweiz
2015/06/30 23:39:47
There are a lot of private methods at the top leve
Lasse Reichstein Nielsen
2015/07/01 08:24:56
Agree. Moved to private instance method.
|
| + if (_streamQueue._isDone) { |
| + return new Stream<T>.empty(); |
|
nweiz
2015/06/18 23:44:25
If you're going to use this here, you'll need to u
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Ack. I SOOO wish 1.11 would be released soon. All
nweiz
2015/06/30 23:39:47
Yeah, this is one of the burdens of developing in
|
| + } |
| + if (_streamQueue._subscription == null) { |
| + return _streamQueue._sourceStream; |
| + } |
| + StreamSubscription subscription = _streamQueue._dispose(); |
| + subscription.resume(); |
| + return new SubscriptionStream<T>(subscription); |
| + } |
| + if (events.isEmpty) { |
| + if (_streamQueue._isDone) { |
| + _completer.setEmpty(); |
| + } else { |
| + _completer.setSourceStream(getRestStream()); |
| + } |
| + } else { |
| + // There are prefetched events which needs to be added before the |
| + // remaining stream. |
| + StreamController controller = new StreamController<T>(); |
|
nweiz
2015/06/18 23:44:27
Nit: "var"
|
| + for (var event in events) event.addTo(controller); |
|
nweiz
2015/06/18 23:44:25
I like this style of loop, but unfortunately it's
Lasse Reichstein Nielsen
2015/06/30 10:34:13
Do they want me to write
events.forEach((even
|
| + controller.addStream(getRestStream(), cancelOnError: false) |
| + .whenComplete(controller.close); |
| + _completer.setSourceStream(controller.stream); |
| + } |
| + } |
| +} |
| + |
| +/// Request for a [StreamQueue.hasNext] call. |
| +/// |
| +/// Completes the [future] with `true` if it sees any event, |
| +/// but doesn't consume the event. |
| +/// If the request is closed without seeing an event, then |
| +/// the [future] is completed with `false`. |
| +class _HasNextRequest<T> implements _EventRequest { |
| + final Completer _completer = new Completer<bool>(); |
| + |
| + Future<bool> get future => _completer.future; |
| + |
| + bool addEvents(Queue<Result> events) { |
| + if (events.isNotEmpty) { |
| + _completer.complete(true); |
| + return true; |
| + } |
| + return false; |
| + } |
| + |
| + void close(_) { |
| + _completer.complete(false); |
| + } |
| +} |