Index: lib/src/util/stream_queue.dart |
diff --git a/lib/src/util/stream_queue.dart b/lib/src/util/stream_queue.dart |
index edbd4183e1b1502964a1232fda01d0720a416359..1c2e3b2252ac854e65a0376343c1c5101af94e8a 100644 |
--- a/lib/src/util/stream_queue.dart |
+++ b/lib/src/util/stream_queue.dart |
@@ -78,13 +78,13 @@ class StreamQueue<T> { |
// by the content of the fifth event. |
/// Source of events. |
- final ForkableStream _sourceStream; |
+ final ForkableStream<T> _sourceStream; |
/// Subscription on [_sourceStream] while listening for events. |
/// |
/// Set to subscription when listening, and set to `null` when the |
/// subscription is done (and [_isDone] is set to true). |
- StreamSubscription _subscription; |
+ StreamSubscription<T> _subscription; |
/// Whether we have listened on [_sourceStream] and the subscription is done. |
bool _isDone = false; |
@@ -103,7 +103,7 @@ class StreamQueue<T> { |
final Queue<_EventRequest> _requestQueue = new Queue(); |
/// Create a `StreamQueue` of the events of [source]. |
- StreamQueue(Stream source) |
+ StreamQueue(Stream<T> source) |
: _sourceStream = source is ForkableStream |
? source |
: new ForkableStream(source); |
@@ -361,7 +361,7 @@ class StreamQueue<T> { |
/// Extracts the subscription and makes this stream queue unusable. |
/// |
/// Can only be used by the very last request. |
- StreamSubscription _dispose() { |
+ StreamSubscription<T> _dispose() { |
assert(_isClosed); |
var subscription = _subscription; |
_subscription = null; |
@@ -426,9 +426,9 @@ abstract class _EventRequest { |
/// and is then complete. |
class _NextRequest<T> implements _EventRequest { |
/// Completer for the future returned by [StreamQueue.next]. |
- final Completer _completer; |
+ final _completer = new Completer<T>(); |
- _NextRequest() : _completer = new Completer<T>(); |
+ _NextRequest(); |
Future<T> get future => _completer.future; |
@@ -448,7 +448,7 @@ class _NextRequest<T> implements _EventRequest { |
/// Request for a [StreamQueue.skip] call. |
class _SkipRequest implements _EventRequest { |
/// Completer for the future returned by the skip call. |
- final Completer _completer = new Completer<int>(); |
+ final _completer = new Completer<int>(); |
/// Number of remaining events to skip. |
/// |
@@ -461,7 +461,7 @@ class _SkipRequest implements _EventRequest { |
_SkipRequest(this._eventsToSkip); |
/// The future completed when the correct number of events have been skipped. |
- Future get future => _completer.future; |
+ Future<int> get future => _completer.future; |
bool addEvents(Queue<Result> events) { |
while (_eventsToSkip > 0) { |
@@ -485,7 +485,7 @@ class _SkipRequest implements _EventRequest { |
/// Request for a [StreamQueue.take] call. |
class _TakeRequest<T> implements _EventRequest { |
/// Completer for the future returned by the take call. |
- final Completer _completer; |
+ final _completer = new Completer<List<T>>(); |
/// List collecting events until enough have been seen. |
final List _list = <T>[]; |
@@ -496,10 +496,10 @@ class _TakeRequest<T> implements _EventRequest { |
/// this value. |
final int _eventsToTake; |
- _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); |
+ _TakeRequest(this._eventsToTake); |
/// The future completed when the correct number of events have been captured. |
- Future get future => _completer.future; |
+ Future<List<T>> get future => _completer.future; |
bool addEvents(Queue<Result> events) { |
while (_list.length < _eventsToTake) { |
@@ -566,13 +566,13 @@ class _CancelRequest implements _EventRequest { |
/// stream events subscription and creates a stream from it. |
class _RestRequest<T> implements _EventRequest { |
/// Completer for the stream returned by the `rest` call. |
- final StreamCompleter _completer = new StreamCompleter<T>(); |
+ final _completer = new StreamCompleter<T>(); |
/// The [StreamQueue] object that has this request queued. |
/// |
/// When the event is completed, it needs to cancel the active subscription |
/// of the `StreamQueue` object, if any. |
- final StreamQueue _streamQueue; |
+ final StreamQueue<T> _streamQueue; |
_RestRequest(this._streamQueue); |
@@ -609,7 +609,7 @@ class _RestRequest<T> implements _EventRequest { |
} |
/// Create a stream from the rest of [_streamQueue]'s subscription. |
- Stream _getRestStream() { |
+ Stream<T> _getRestStream() { |
if (_streamQueue._isDone) { |
var controller = new StreamController<T>()..close(); |
return controller.stream; |
@@ -632,7 +632,7 @@ class _RestRequest<T> implements _EventRequest { |
/// If the request is closed without seeing an event, then |
/// the [future] is completed with `false`. |
class _HasNextRequest<T> implements _EventRequest { |
- final Completer _completer = new Completer<bool>(); |
+ final _completer = new Completer<bool>(); |
Future<bool> get future => _completer.future; |
@@ -652,16 +652,16 @@ class _HasNextRequest<T> implements _EventRequest { |
/// Request for a [StreamQueue.fork] call. |
class _ForkRequest<T> implements _EventRequest { |
/// Completer for the stream used by the queue by the `fork` call. |
- StreamCompleter _completer; |
+ StreamCompleter<T> _completer; |
StreamQueue<T> queue; |
/// The [StreamQueue] object that has this request queued. |
- final StreamQueue _streamQueue; |
+ final StreamQueue<T> _streamQueue; |
_ForkRequest(this._streamQueue) { |
- _completer = new StreamCompleter<T>(); |
- queue = new StreamQueue<T>(_completer.stream); |
+ _completer = new StreamCompleter(); |
+ queue = new StreamQueue(_completer.stream); |
} |
bool addEvents(Queue<Result> events) { |