Index: packages/async/lib/src/stream_group.dart |
diff --git a/packages/async/lib/src/stream_group.dart b/packages/async/lib/src/stream_group.dart |
index d99f5151467855dc97fb3f92d804b53c21c78a6a..6361a5cb96f78bd4341dce75415d05a84c6f33e3 100644 |
--- a/packages/async/lib/src/stream_group.dart |
+++ b/packages/async/lib/src/stream_group.dart |
@@ -2,8 +2,6 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library async.stream_group; |
- |
import 'dart:async'; |
/// A collection of streams whose events are unified and sent through a central |
@@ -14,8 +12,8 @@ import 'dart:async'; |
/// this means that events emitted by broadcast streams will be dropped until |
/// [stream] has a listener.** |
/// |
-/// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will be |
-/// single-subscription. In this case, if [stream] is paused or canceled, all |
+/// If the `StreamGroup` is constructed using [new StreamGroup], [stream] will |
+/// be single-subscription. In this case, if [stream] is paused or canceled, all |
/// streams in the group will likewise be paused or canceled, respectively. |
/// |
/// If the `StreamGroup` is constructed using [new StreamGroup.broadcast], |
@@ -55,8 +53,8 @@ class StreamGroup<T> implements Sink<Stream<T>> { |
/// |
/// This is equivalent to adding [streams] to a group, closing that group, and |
/// returning its stream. |
- static Stream merge(Iterable<Stream> streams) { |
- var group = new StreamGroup(); |
+ static Stream<T> merge<T>(Iterable<Stream<T>> streams) { |
+ var group = new StreamGroup<T>(); |
streams.forEach(group.add); |
group.close(); |
return group.stream; |
@@ -75,9 +73,7 @@ class StreamGroup<T> implements Sink<Stream<T>> { |
/// Creates a new stream group where [stream] is a broadcast stream. |
StreamGroup.broadcast() { |
_controller = new StreamController<T>.broadcast( |
- onListen: _onListen, |
- onCancel: _onCancelBroadcast, |
- sync: true); |
+ onListen: _onListen, onCancel: _onCancelBroadcast, sync: true); |
} |
/// Adds [stream] as a member of this group. |
@@ -194,11 +190,9 @@ class StreamGroup<T> implements Sink<Stream<T>> { |
/// Starts actively forwarding events from [stream] to [_controller]. |
/// |
/// This will pause the resulting subscription if [this] is paused. |
- StreamSubscription _listenToStream(Stream stream) { |
- var subscription = stream.listen( |
- _controller.add, |
- onError: _controller.addError, |
- onDone: () => remove(stream)); |
+ StreamSubscription<T> _listenToStream(Stream<T> stream) { |
+ var subscription = stream.listen(_controller.add, |
+ onError: _controller.addError, onDone: () => remove(stream)); |
if (_state == _StreamGroupState.paused) subscription.pause(); |
return subscription; |
} |