| Index: lib/src/util/stream_queue.dart
|
| diff --git a/lib/src/util/stream_queue.dart b/lib/src/util/stream_queue.dart
|
| index edbd4183e1b1502964a1232fda01d0720a416359..1c2e3b2252ac854e65a0376343c1c5101af94e8a 100644
|
| --- a/lib/src/util/stream_queue.dart
|
| +++ b/lib/src/util/stream_queue.dart
|
| @@ -78,13 +78,13 @@ class StreamQueue<T> {
|
| // by the content of the fifth event.
|
|
|
| /// Source of events.
|
| - final ForkableStream _sourceStream;
|
| + final ForkableStream<T> _sourceStream;
|
|
|
| /// Subscription on [_sourceStream] while listening for events.
|
| ///
|
| /// Set to subscription when listening, and set to `null` when the
|
| /// subscription is done (and [_isDone] is set to true).
|
| - StreamSubscription _subscription;
|
| + StreamSubscription<T> _subscription;
|
|
|
| /// Whether we have listened on [_sourceStream] and the subscription is done.
|
| bool _isDone = false;
|
| @@ -103,7 +103,7 @@ class StreamQueue<T> {
|
| final Queue<_EventRequest> _requestQueue = new Queue();
|
|
|
| /// Create a `StreamQueue` of the events of [source].
|
| - StreamQueue(Stream source)
|
| + StreamQueue(Stream<T> source)
|
| : _sourceStream = source is ForkableStream
|
| ? source
|
| : new ForkableStream(source);
|
| @@ -361,7 +361,7 @@ class StreamQueue<T> {
|
| /// Extracts the subscription and makes this stream queue unusable.
|
| ///
|
| /// Can only be used by the very last request.
|
| - StreamSubscription _dispose() {
|
| + StreamSubscription<T> _dispose() {
|
| assert(_isClosed);
|
| var subscription = _subscription;
|
| _subscription = null;
|
| @@ -426,9 +426,9 @@ abstract class _EventRequest {
|
| /// and is then complete.
|
| class _NextRequest<T> implements _EventRequest {
|
| /// Completer for the future returned by [StreamQueue.next].
|
| - final Completer _completer;
|
| + final _completer = new Completer<T>();
|
|
|
| - _NextRequest() : _completer = new Completer<T>();
|
| + _NextRequest();
|
|
|
| Future<T> get future => _completer.future;
|
|
|
| @@ -448,7 +448,7 @@ class _NextRequest<T> implements _EventRequest {
|
| /// Request for a [StreamQueue.skip] call.
|
| class _SkipRequest implements _EventRequest {
|
| /// Completer for the future returned by the skip call.
|
| - final Completer _completer = new Completer<int>();
|
| + final _completer = new Completer<int>();
|
|
|
| /// Number of remaining events to skip.
|
| ///
|
| @@ -461,7 +461,7 @@ class _SkipRequest implements _EventRequest {
|
| _SkipRequest(this._eventsToSkip);
|
|
|
| /// The future completed when the correct number of events have been skipped.
|
| - Future get future => _completer.future;
|
| + Future<int> get future => _completer.future;
|
|
|
| bool addEvents(Queue<Result> events) {
|
| while (_eventsToSkip > 0) {
|
| @@ -485,7 +485,7 @@ class _SkipRequest implements _EventRequest {
|
| /// Request for a [StreamQueue.take] call.
|
| class _TakeRequest<T> implements _EventRequest {
|
| /// Completer for the future returned by the take call.
|
| - final Completer _completer;
|
| + final _completer = new Completer<List<T>>();
|
|
|
| /// List collecting events until enough have been seen.
|
| final List _list = <T>[];
|
| @@ -496,10 +496,10 @@ class _TakeRequest<T> implements _EventRequest {
|
| /// this value.
|
| final int _eventsToTake;
|
|
|
| - _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
|
| + _TakeRequest(this._eventsToTake);
|
|
|
| /// The future completed when the correct number of events have been captured.
|
| - Future get future => _completer.future;
|
| + Future<List<T>> get future => _completer.future;
|
|
|
| bool addEvents(Queue<Result> events) {
|
| while (_list.length < _eventsToTake) {
|
| @@ -566,13 +566,13 @@ class _CancelRequest implements _EventRequest {
|
| /// stream events subscription and creates a stream from it.
|
| class _RestRequest<T> implements _EventRequest {
|
| /// Completer for the stream returned by the `rest` call.
|
| - final StreamCompleter _completer = new StreamCompleter<T>();
|
| + final _completer = new StreamCompleter<T>();
|
|
|
| /// The [StreamQueue] object that has this request queued.
|
| ///
|
| /// When the event is completed, it needs to cancel the active subscription
|
| /// of the `StreamQueue` object, if any.
|
| - final StreamQueue _streamQueue;
|
| + final StreamQueue<T> _streamQueue;
|
|
|
| _RestRequest(this._streamQueue);
|
|
|
| @@ -609,7 +609,7 @@ class _RestRequest<T> implements _EventRequest {
|
| }
|
|
|
| /// Create a stream from the rest of [_streamQueue]'s subscription.
|
| - Stream _getRestStream() {
|
| + Stream<T> _getRestStream() {
|
| if (_streamQueue._isDone) {
|
| var controller = new StreamController<T>()..close();
|
| return controller.stream;
|
| @@ -632,7 +632,7 @@ class _RestRequest<T> implements _EventRequest {
|
| /// If the request is closed without seeing an event, then
|
| /// the [future] is completed with `false`.
|
| class _HasNextRequest<T> implements _EventRequest {
|
| - final Completer _completer = new Completer<bool>();
|
| + final _completer = new Completer<bool>();
|
|
|
| Future<bool> get future => _completer.future;
|
|
|
| @@ -652,16 +652,16 @@ class _HasNextRequest<T> implements _EventRequest {
|
| /// Request for a [StreamQueue.fork] call.
|
| class _ForkRequest<T> implements _EventRequest {
|
| /// Completer for the stream used by the queue by the `fork` call.
|
| - StreamCompleter _completer;
|
| + StreamCompleter<T> _completer;
|
|
|
| StreamQueue<T> queue;
|
|
|
| /// The [StreamQueue] object that has this request queued.
|
| - final StreamQueue _streamQueue;
|
| + final StreamQueue<T> _streamQueue;
|
|
|
| _ForkRequest(this._streamQueue) {
|
| - _completer = new StreamCompleter<T>();
|
| - queue = new StreamQueue<T>(_completer.stream);
|
| + _completer = new StreamCompleter();
|
| + queue = new StreamQueue(_completer.stream);
|
| }
|
|
|
| bool addEvents(Queue<Result> events) {
|
|
|