Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index 8482e0c65afb5b1dc2993689da335fc5a5d3c8d7..4ce9e1301e45c76cbcbdadb4fd40be3d7564a205 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -94,7 +94,7 @@ abstract class StreamQueue<T> { |
final Queue<_EventRequest> _requestQueue = new Queue(); |
/// Create a `StreamQueue` of the events of [source]. |
- factory StreamQueue(Stream source) = _StreamQueue<T>; |
+ factory StreamQueue(Stream<T> source) = _StreamQueue<T>; |
StreamQueue._(); |
@@ -274,7 +274,7 @@ abstract class StreamQueue<T> { |
/// Can only be used by the very last request (the stream queue must |
/// be closed by that request). |
/// Only used by [rest]. |
- Stream _extractStream(); |
+ Stream<T> _extractStream(); |
/// Requests that the event source pauses events. |
/// |
@@ -342,13 +342,13 @@ abstract class StreamQueue<T> { |
/// to when a request needs events. |
class _StreamQueue<T> extends StreamQueue<T> { |
/// Source of events. |
- final Stream _sourceStream; |
+ final Stream<T> _sourceStream; |
/// Subscription on [_sourceStream] while listening for events. |
/// |
/// Set to subscription when listening, and set to `null` when the |
/// subscription is done (and [_isDone] is set to true). |
- StreamSubscription _subscription; |
+ StreamSubscription<T> _subscription; |
_StreamQueue(this._sourceStream) : super._(); |
@@ -422,7 +422,7 @@ class _StreamQueue<T> extends StreamQueue<T> { |
/// |
/// The [close] method is also called immediately when the source stream |
/// is done. |
-abstract class _EventRequest { |
+abstract class _EventRequest<T> { |
/// Handle available events. |
/// |
/// The available events are provided as a queue. The `update` function |
@@ -443,22 +443,22 @@ abstract class _EventRequest { |
/// If the function returns `false` when the stream has already closed |
/// ([isDone] is true), then the request must call |
/// [StreamQueue._updateRequests] itself when it's ready to continue. |
- bool update(Queue<Result> events, bool isDone); |
+ bool update(Queue<Result<T>> events, bool isDone); |
} |
/// Request for a [StreamQueue.next] call. |
/// |
/// Completes the returned future when receiving the first event, |
/// and is then complete. |
-class _NextRequest<T> implements _EventRequest { |
+class _NextRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by [StreamQueue.next]. |
- final Completer _completer; |
+ final _completer = new Completer<T>(); |
- _NextRequest() : _completer = new Completer<T>(); |
+ _NextRequest(); |
Future<T> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(Queue<Result<T>> events, bool isDone) { |
if (events.isNotEmpty) { |
events.removeFirst().complete(_completer); |
return true; |
@@ -474,9 +474,9 @@ class _NextRequest<T> implements _EventRequest { |
} |
/// Request for a [StreamQueue.skip] call. |
-class _SkipRequest implements _EventRequest { |
+class _SkipRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the skip call. |
- final Completer _completer = new Completer<int>(); |
+ final _completer = new Completer<int>(); |
/// Number of remaining events to skip. |
/// |
@@ -489,9 +489,9 @@ class _SkipRequest implements _EventRequest { |
_SkipRequest(this._eventsToSkip); |
/// The future completed when the correct number of events have been skipped. |
- Future get future => _completer.future; |
+ Future<int> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(Queue<Result<T>> events, bool isDone) { |
while (_eventsToSkip > 0) { |
if (events.isEmpty) { |
if (isDone) break; |
@@ -501,7 +501,7 @@ class _SkipRequest implements _EventRequest { |
var event = events.removeFirst(); |
if (event.isError) { |
- event.complete(_completer); |
+ _completer.completeError(event.asError.error, event.asError.stackTrace); |
return true; |
} |
} |
@@ -511,12 +511,12 @@ class _SkipRequest implements _EventRequest { |
} |
/// Request for a [StreamQueue.take] call. |
-class _TakeRequest<T> implements _EventRequest { |
+class _TakeRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the take call. |
- final Completer _completer; |
+ final _completer = new Completer<List<T>>(); |
/// List collecting events until enough have been seen. |
- final List _list = <T>[]; |
+ final _list = <T>[]; |
/// Number of events to capture. |
/// |
@@ -524,24 +524,24 @@ class _TakeRequest<T> implements _EventRequest { |
/// this value. |
final int _eventsToTake; |
- _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
+ _TakeRequest(this._eventsToTake); |
/// The future completed when the correct number of events have been captured. |
- Future get future => _completer.future; |
+ Future<List<T>> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(Queue<Result<T>> events, bool isDone) { |
while (_list.length < _eventsToTake) { |
if (events.isEmpty) { |
if (isDone) break; |
return false; |
} |
- var result = events.removeFirst(); |
- if (result.isError) { |
- result.complete(_completer); |
+ var event = events.removeFirst(); |
+ if (event.isError) { |
+ _completer.completeError(event.asError.error, event.asError.stackTrace); |
return true; |
} |
- _list.add(result.asValue.value); |
+ _list.add(event.asValue.value); |
} |
_completer.complete(_list); |
return true; |
@@ -553,9 +553,9 @@ class _TakeRequest<T> implements _EventRequest { |
/// The request needs no events, it just waits in the request queue |
/// until all previous events are fulfilled, then it cancels the stream queue |
/// source subscription. |
-class _CancelRequest implements _EventRequest { |
+class _CancelRequest<T> implements _EventRequest<T> { |
/// Completer for the future returned by the `cancel` call. |
- final Completer _completer = new Completer(); |
+ final _completer = new Completer(); |
/// The [StreamQueue] object that has this request queued. |
/// |
@@ -568,7 +568,7 @@ class _CancelRequest implements _EventRequest { |
/// The future completed when the cancel request is completed. |
Future get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(Queue<Result<T>> events, bool isDone) { |
if (_streamQueue._isDone) { |
_completer.complete(); |
} else { |
@@ -584,22 +584,22 @@ class _CancelRequest implements _EventRequest { |
/// The request is always complete, it just waits in the request queue |
/// until all previous events are fulfilled, then it takes over the |
/// stream events subscription and creates a stream from it. |
-class _RestRequest<T> implements _EventRequest { |
+class _RestRequest<T> implements _EventRequest<T> { |
/// Completer for the stream returned by the `rest` call. |
- final StreamCompleter _completer = new StreamCompleter<T>(); |
+ final _completer = new StreamCompleter<T>(); |
/// The [StreamQueue] object that has this request queued. |
/// |
/// When the event is completed, it needs to cancel the active subscription |
/// of the `StreamQueue` object, if any. |
- final StreamQueue _streamQueue; |
+ final StreamQueue<T> _streamQueue; |
_RestRequest(this._streamQueue); |
/// The stream which will contain the remaining events of [_streamQueue]. |
Stream<T> get stream => _completer.stream; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(Queue<Result<T>> events, bool isDone) { |
if (events.isEmpty) { |
if (_streamQueue._isDone) { |
_completer.setEmpty(); |
@@ -627,12 +627,12 @@ class _RestRequest<T> implements _EventRequest { |
/// but doesn't consume the event. |
/// If the request is closed without seeing an event, then |
/// the [future] is completed with `false`. |
-class _HasNextRequest<T> implements _EventRequest { |
- final Completer _completer = new Completer<bool>(); |
+class _HasNextRequest<T> implements _EventRequest<T> { |
+ final _completer = new Completer<bool>(); |
Future<bool> get future => _completer.future; |
- bool update(Queue<Result> events, bool isDone) { |
+ bool update(Queue<Result<T>> events, bool isDone) { |
if (events.isNotEmpty) { |
_completer.complete(true); |
return true; |