Index: lib/src/util/forkable_stream.dart |
diff --git a/lib/src/util/forkable_stream.dart b/lib/src/util/forkable_stream.dart |
index 012fa5829dd1c239e79bb81a0764b5e12646f3e3..6a8694dc027cf2dc7c0c19c48535a8b62f45be06 100644 |
--- a/lib/src/util/forkable_stream.dart |
+++ b/lib/src/util/forkable_stream.dart |
@@ -23,7 +23,7 @@ import 'package:async/async.dart' hide ForkableStream; |
/// then canceled. |
class ForkableStream<T> extends StreamView<T> { |
/// The underlying stream. |
- final Stream _sourceStream; |
+ final Stream<T> _sourceStream; |
/// The subscription to [_sourceStream]. |
/// |
@@ -39,13 +39,13 @@ class ForkableStream<T> extends StreamView<T> { |
final _controllers = new Set<StreamController<T>>(); |
/// Creates a new forkable stream wrapping [sourceStream]. |
- ForkableStream(Stream sourceStream) |
+ ForkableStream(Stream<T> sourceStream) |
// Use a completer here so that we can provide its stream to the |
// superclass constructor while also adding the stream controller to |
// [_controllers]. |
- : this._(sourceStream, new StreamCompleter()); |
+ : this._(sourceStream, new StreamCompleter<T>()); |
- ForkableStream._(this._sourceStream, StreamCompleter completer) |
+ ForkableStream._(this._sourceStream, StreamCompleter<T> completer) |
: super(completer.stream) { |
completer.setSourceStream(_fork(primary: true)); |
} |
@@ -72,7 +72,7 @@ class ForkableStream<T> extends StreamView<T> { |
return controller.stream; |
} |
- var controller; |
+ StreamController<T> controller; |
controller = new StreamController<T>( |
onListen: () => _onListenOrResume(controller), |
onCancel: () => _onCancel(controller, primary: primary), |
@@ -134,7 +134,7 @@ class ForkableStream<T> extends StreamView<T> { |
} |
/// Forwards data events to all branches. |
- void _onData(value) { |
+ void _onData(T value) { |
// Don't iterate directly over the set because [controller.add] might cause |
// it to be modified synchronously. |
for (var controller in _controllers.toList()) { |