| Index: lib/src/stream_queue.dart
|
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
|
| index 8482e0c65afb5b1dc2993689da335fc5a5d3c8d7..4ce9e1301e45c76cbcbdadb4fd40be3d7564a205 100644
|
| --- a/lib/src/stream_queue.dart
|
| +++ b/lib/src/stream_queue.dart
|
| @@ -94,7 +94,7 @@ abstract class StreamQueue<T> {
|
| final Queue<_EventRequest> _requestQueue = new Queue();
|
|
|
| /// Create a `StreamQueue` of the events of [source].
|
| - factory StreamQueue(Stream source) = _StreamQueue<T>;
|
| + factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
|
|
|
| StreamQueue._();
|
|
|
| @@ -274,7 +274,7 @@ abstract class StreamQueue<T> {
|
| /// Can only be used by the very last request (the stream queue must
|
| /// be closed by that request).
|
| /// Only used by [rest].
|
| - Stream _extractStream();
|
| + Stream<T> _extractStream();
|
|
|
| /// Requests that the event source pauses events.
|
| ///
|
| @@ -342,13 +342,13 @@ abstract class StreamQueue<T> {
|
| /// to when a request needs events.
|
| class _StreamQueue<T> extends StreamQueue<T> {
|
| /// Source of events.
|
| - final Stream _sourceStream;
|
| + final Stream<T> _sourceStream;
|
|
|
| /// Subscription on [_sourceStream] while listening for events.
|
| ///
|
| /// Set to subscription when listening, and set to `null` when the
|
| /// subscription is done (and [_isDone] is set to true).
|
| - StreamSubscription _subscription;
|
| + StreamSubscription<T> _subscription;
|
|
|
| _StreamQueue(this._sourceStream) : super._();
|
|
|
| @@ -422,7 +422,7 @@ class _StreamQueue<T> extends StreamQueue<T> {
|
| ///
|
| /// The [close] method is also called immediately when the source stream
|
| /// is done.
|
| -abstract class _EventRequest {
|
| +abstract class _EventRequest<T> {
|
| /// Handle available events.
|
| ///
|
| /// The available events are provided as a queue. The `update` function
|
| @@ -443,22 +443,22 @@ abstract class _EventRequest {
|
| /// If the function returns `false` when the stream has already closed
|
| /// ([isDone] is true), then the request must call
|
| /// [StreamQueue._updateRequests] itself when it's ready to continue.
|
| - bool update(Queue<Result> events, bool isDone);
|
| + bool update(Queue<Result<T>> events, bool isDone);
|
| }
|
|
|
| /// Request for a [StreamQueue.next] call.
|
| ///
|
| /// Completes the returned future when receiving the first event,
|
| /// and is then complete.
|
| -class _NextRequest<T> implements _EventRequest {
|
| +class _NextRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by [StreamQueue.next].
|
| - final Completer _completer;
|
| + final _completer = new Completer<T>();
|
|
|
| - _NextRequest() : _completer = new Completer<T>();
|
| + _NextRequest();
|
|
|
| Future<T> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(Queue<Result<T>> events, bool isDone) {
|
| if (events.isNotEmpty) {
|
| events.removeFirst().complete(_completer);
|
| return true;
|
| @@ -474,9 +474,9 @@ class _NextRequest<T> implements _EventRequest {
|
| }
|
|
|
| /// Request for a [StreamQueue.skip] call.
|
| -class _SkipRequest implements _EventRequest {
|
| +class _SkipRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the skip call.
|
| - final Completer _completer = new Completer<int>();
|
| + final _completer = new Completer<int>();
|
|
|
| /// Number of remaining events to skip.
|
| ///
|
| @@ -489,9 +489,9 @@ class _SkipRequest implements _EventRequest {
|
| _SkipRequest(this._eventsToSkip);
|
|
|
| /// The future completed when the correct number of events have been skipped.
|
| - Future get future => _completer.future;
|
| + Future<int> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(Queue<Result<T>> events, bool isDone) {
|
| while (_eventsToSkip > 0) {
|
| if (events.isEmpty) {
|
| if (isDone) break;
|
| @@ -501,7 +501,7 @@ class _SkipRequest implements _EventRequest {
|
|
|
| var event = events.removeFirst();
|
| if (event.isError) {
|
| - event.complete(_completer);
|
| + _completer.completeError(event.asError.error, event.asError.stackTrace);
|
| return true;
|
| }
|
| }
|
| @@ -511,12 +511,12 @@ class _SkipRequest implements _EventRequest {
|
| }
|
|
|
| /// Request for a [StreamQueue.take] call.
|
| -class _TakeRequest<T> implements _EventRequest {
|
| +class _TakeRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the take call.
|
| - final Completer _completer;
|
| + final _completer = new Completer<List<T>>();
|
|
|
| /// List collecting events until enough have been seen.
|
| - final List _list = <T>[];
|
| + final _list = <T>[];
|
|
|
| /// Number of events to capture.
|
| ///
|
| @@ -524,24 +524,24 @@ class _TakeRequest<T> implements _EventRequest {
|
| /// this value.
|
| final int _eventsToTake;
|
|
|
| - _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
|
| + _TakeRequest(this._eventsToTake);
|
|
|
| /// The future completed when the correct number of events have been captured.
|
| - Future get future => _completer.future;
|
| + Future<List<T>> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(Queue<Result<T>> events, bool isDone) {
|
| while (_list.length < _eventsToTake) {
|
| if (events.isEmpty) {
|
| if (isDone) break;
|
| return false;
|
| }
|
|
|
| - var result = events.removeFirst();
|
| - if (result.isError) {
|
| - result.complete(_completer);
|
| + var event = events.removeFirst();
|
| + if (event.isError) {
|
| + _completer.completeError(event.asError.error, event.asError.stackTrace);
|
| return true;
|
| }
|
| - _list.add(result.asValue.value);
|
| + _list.add(event.asValue.value);
|
| }
|
| _completer.complete(_list);
|
| return true;
|
| @@ -553,9 +553,9 @@ class _TakeRequest<T> implements _EventRequest {
|
| /// The request needs no events, it just waits in the request queue
|
| /// until all previous events are fulfilled, then it cancels the stream queue
|
| /// source subscription.
|
| -class _CancelRequest implements _EventRequest {
|
| +class _CancelRequest<T> implements _EventRequest<T> {
|
| /// Completer for the future returned by the `cancel` call.
|
| - final Completer _completer = new Completer();
|
| + final _completer = new Completer();
|
|
|
| /// The [StreamQueue] object that has this request queued.
|
| ///
|
| @@ -568,7 +568,7 @@ class _CancelRequest implements _EventRequest {
|
| /// The future completed when the cancel request is completed.
|
| Future get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(Queue<Result<T>> events, bool isDone) {
|
| if (_streamQueue._isDone) {
|
| _completer.complete();
|
| } else {
|
| @@ -584,22 +584,22 @@ class _CancelRequest implements _EventRequest {
|
| /// The request is always complete, it just waits in the request queue
|
| /// until all previous events are fulfilled, then it takes over the
|
| /// stream events subscription and creates a stream from it.
|
| -class _RestRequest<T> implements _EventRequest {
|
| +class _RestRequest<T> implements _EventRequest<T> {
|
| /// Completer for the stream returned by the `rest` call.
|
| - final StreamCompleter _completer = new StreamCompleter<T>();
|
| + final _completer = new StreamCompleter<T>();
|
|
|
| /// The [StreamQueue] object that has this request queued.
|
| ///
|
| /// When the event is completed, it needs to cancel the active subscription
|
| /// of the `StreamQueue` object, if any.
|
| - final StreamQueue _streamQueue;
|
| + final StreamQueue<T> _streamQueue;
|
|
|
| _RestRequest(this._streamQueue);
|
|
|
| /// The stream which will contain the remaining events of [_streamQueue].
|
| Stream<T> get stream => _completer.stream;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(Queue<Result<T>> events, bool isDone) {
|
| if (events.isEmpty) {
|
| if (_streamQueue._isDone) {
|
| _completer.setEmpty();
|
| @@ -627,12 +627,12 @@ class _RestRequest<T> implements _EventRequest {
|
| /// but doesn't consume the event.
|
| /// If the request is closed without seeing an event, then
|
| /// the [future] is completed with `false`.
|
| -class _HasNextRequest<T> implements _EventRequest {
|
| - final Completer _completer = new Completer<bool>();
|
| +class _HasNextRequest<T> implements _EventRequest<T> {
|
| + final _completer = new Completer<bool>();
|
|
|
| Future<bool> get future => _completer.future;
|
|
|
| - bool update(Queue<Result> events, bool isDone) {
|
| + bool update(Queue<Result<T>> events, bool isDone) {
|
| if (events.isNotEmpty) {
|
| _completer.complete(true);
|
| return true;
|
|
|