Chromium Code Reviews| Index: lib/src/stream_queue.dart |
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
| index 8482e0c65afb5b1dc2993689da335fc5a5d3c8d7..4ce9e1301e45c76cbcbdadb4fd40be3d7564a205 100644 |
| --- a/lib/src/stream_queue.dart |
| +++ b/lib/src/stream_queue.dart |
| @@ -94,7 +94,7 @@ abstract class StreamQueue<T> { |
| final Queue<_EventRequest> _requestQueue = new Queue(); |
| /// Create a `StreamQueue` of the events of [source]. |
| - factory StreamQueue(Stream source) = _StreamQueue<T>; |
| + factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
| StreamQueue._(); |
| @@ -274,7 +274,7 @@ abstract class StreamQueue<T> { |
| /// Can only be used by the very last request (the stream queue must |
| /// be closed by that request). |
| /// Only used by [rest]. |
| - Stream _extractStream(); |
| + Stream<T> _extractStream(); |
| /// Requests that the event source pauses events. |
| /// |
| @@ -342,13 +342,13 @@ abstract class StreamQueue<T> { |
| /// to when a request needs events. |
| class _StreamQueue<T> extends StreamQueue<T> { |
| /// Source of events. |
| - final Stream _sourceStream; |
| + final Stream<T> _sourceStream; |
| /// Subscription on [_sourceStream] while listening for events. |
| /// |
| /// Set to subscription when listening, and set to `null` when the |
| /// subscription is done (and [_isDone] is set to true). |
| - StreamSubscription _subscription; |
| + StreamSubscription<T> _subscription; |
| _StreamQueue(this._sourceStream) : super._(); |
| @@ -422,7 +422,7 @@ class _StreamQueue<T> extends StreamQueue<T> { |
| /// |
| /// The [close] method is also called immediately when the source stream |
| /// is done. |
| -abstract class _EventRequest { |
| +abstract class _EventRequest<T> { |
| /// Handle available events. |
| /// |
| /// The available events are provided as a queue. The `update` function |
| @@ -443,22 +443,22 @@ abstract class _EventRequest { |
| /// If the function returns `false` when the stream has already closed |
| /// ([isDone] is true), then the request must call |
| /// [StreamQueue._updateRequests] itself when it's ready to continue. |
| - bool update(Queue<Result> events, bool isDone); |
| + bool update(Queue<Result<T>> events, bool isDone); |
| } |
| /// Request for a [StreamQueue.next] call. |
| /// |
| /// Completes the returned future when receiving the first event, |
| /// and is then complete. |
| -class _NextRequest<T> implements _EventRequest { |
| +class _NextRequest<T> implements _EventRequest<T> { |
| /// Completer for the future returned by [StreamQueue.next]. |
| - final Completer _completer; |
| + final _completer = new Completer<T>(); |
| - _NextRequest() : _completer = new Completer<T>(); |
| + _NextRequest(); |
| Future<T> get future => _completer.future; |
| - bool update(Queue<Result> events, bool isDone) { |
| + bool update(Queue<Result<T>> events, bool isDone) { |
| if (events.isNotEmpty) { |
| events.removeFirst().complete(_completer); |
| return true; |
| @@ -474,9 +474,9 @@ class _NextRequest<T> implements _EventRequest { |
| } |
| /// Request for a [StreamQueue.skip] call. |
| -class _SkipRequest implements _EventRequest { |
| +class _SkipRequest<T> implements _EventRequest<T> { |
| /// Completer for the future returned by the skip call. |
| - final Completer _completer = new Completer<int>(); |
| + final _completer = new Completer<int>(); |
| /// Number of remaining events to skip. |
| /// |
| @@ -489,9 +489,9 @@ class _SkipRequest implements _EventRequest { |
| _SkipRequest(this._eventsToSkip); |
| /// The future completed when the correct number of events have been skipped. |
| - Future get future => _completer.future; |
| + Future<int> get future => _completer.future; |
| - bool update(Queue<Result> events, bool isDone) { |
| + bool update(Queue<Result<T>> events, bool isDone) { |
| while (_eventsToSkip > 0) { |
| if (events.isEmpty) { |
| if (isDone) break; |
| @@ -501,7 +501,7 @@ class _SkipRequest implements _EventRequest { |
| var event = events.removeFirst(); |
| if (event.isError) { |
| - event.complete(_completer); |
| + _completer.completeError(event.asError.error, event.asError.stackTrace); |
|
Lasse Reichstein Nielsen
2016/03/29 21:53:59
Why not event.complete(_completer)?
Is it because
nweiz
2016/03/30 00:57:19
Yes, that's why.
|
| return true; |
| } |
| } |
| @@ -511,12 +511,12 @@ class _SkipRequest implements _EventRequest { |
| } |
| /// Request for a [StreamQueue.take] call. |
| -class _TakeRequest<T> implements _EventRequest { |
| +class _TakeRequest<T> implements _EventRequest<T> { |
| /// Completer for the future returned by the take call. |
| - final Completer _completer; |
| + final _completer = new Completer<List<T>>(); |
| /// List collecting events until enough have been seen. |
| - final List _list = <T>[]; |
| + final _list = <T>[]; |
| /// Number of events to capture. |
| /// |
| @@ -524,24 +524,24 @@ class _TakeRequest<T> implements _EventRequest { |
| /// this value. |
| final int _eventsToTake; |
| - _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
| + _TakeRequest(this._eventsToTake); |
| /// The future completed when the correct number of events have been captured. |
| - Future get future => _completer.future; |
| + Future<List<T>> get future => _completer.future; |
| - bool update(Queue<Result> events, bool isDone) { |
| + bool update(Queue<Result<T>> events, bool isDone) { |
| while (_list.length < _eventsToTake) { |
| if (events.isEmpty) { |
| if (isDone) break; |
| return false; |
| } |
| - var result = events.removeFirst(); |
| - if (result.isError) { |
| - result.complete(_completer); |
| + var event = events.removeFirst(); |
| + if (event.isError) { |
| + _completer.completeError(event.asError.error, event.asError.stackTrace); |
| return true; |
| } |
| - _list.add(result.asValue.value); |
| + _list.add(event.asValue.value); |
| } |
| _completer.complete(_list); |
| return true; |
| @@ -553,9 +553,9 @@ class _TakeRequest<T> implements _EventRequest { |
| /// The request needs no events, it just waits in the request queue |
| /// until all previous events are fulfilled, then it cancels the stream queue |
| /// source subscription. |
| -class _CancelRequest implements _EventRequest { |
| +class _CancelRequest<T> implements _EventRequest<T> { |
| /// Completer for the future returned by the `cancel` call. |
| - final Completer _completer = new Completer(); |
| + final _completer = new Completer(); |
| /// The [StreamQueue] object that has this request queued. |
| /// |
| @@ -568,7 +568,7 @@ class _CancelRequest implements _EventRequest { |
| /// The future completed when the cancel request is completed. |
| Future get future => _completer.future; |
| - bool update(Queue<Result> events, bool isDone) { |
| + bool update(Queue<Result<T>> events, bool isDone) { |
| if (_streamQueue._isDone) { |
| _completer.complete(); |
| } else { |
| @@ -584,22 +584,22 @@ class _CancelRequest implements _EventRequest { |
| /// The request is always complete, it just waits in the request queue |
| /// until all previous events are fulfilled, then it takes over the |
| /// stream events subscription and creates a stream from it. |
| -class _RestRequest<T> implements _EventRequest { |
| +class _RestRequest<T> implements _EventRequest<T> { |
| /// Completer for the stream returned by the `rest` call. |
| - final StreamCompleter _completer = new StreamCompleter<T>(); |
| + final _completer = new StreamCompleter<T>(); |
| /// The [StreamQueue] object that has this request queued. |
| /// |
| /// When the event is completed, it needs to cancel the active subscription |
| /// of the `StreamQueue` object, if any. |
| - final StreamQueue _streamQueue; |
| + final StreamQueue<T> _streamQueue; |
| _RestRequest(this._streamQueue); |
| /// The stream which will contain the remaining events of [_streamQueue]. |
| Stream<T> get stream => _completer.stream; |
| - bool update(Queue<Result> events, bool isDone) { |
| + bool update(Queue<Result<T>> events, bool isDone) { |
| if (events.isEmpty) { |
| if (_streamQueue._isDone) { |
| _completer.setEmpty(); |
| @@ -627,12 +627,12 @@ class _RestRequest<T> implements _EventRequest { |
| /// but doesn't consume the event. |
| /// If the request is closed without seeing an event, then |
| /// the [future] is completed with `false`. |
| -class _HasNextRequest<T> implements _EventRequest { |
| - final Completer _completer = new Completer<bool>(); |
| +class _HasNextRequest<T> implements _EventRequest<T> { |
| + final _completer = new Completer<bool>(); |
| Future<bool> get future => _completer.future; |
| - bool update(Queue<Result> events, bool isDone) { |
| + bool update(Queue<Result<T>> events, bool isDone) { |
| if (events.isNotEmpty) { |
| _completer.complete(true); |
| return true; |