Index: packages/async/lib/src/stream_splitter.dart |
diff --git a/packages/async/lib/src/stream_splitter.dart b/packages/async/lib/src/stream_splitter.dart |
index 6ec440f5dd8e5c2c83754e73020f8f34a3e1852d..6cec98c7d4ae4bb991c4871583d96be904b95583 100644 |
--- a/packages/async/lib/src/stream_splitter.dart |
+++ b/packages/async/lib/src/stream_splitter.dart |
@@ -2,13 +2,10 @@ |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-library async.stream_splitter; |
- |
import 'dart:async'; |
-import 'dart:collection'; |
-import '../result.dart'; |
import 'future_group.dart'; |
+import 'result.dart'; |
/// A class that splits a single source stream into an arbitrary number of |
/// (single-subscription) streams (called "branch") that emit the same events. |
@@ -60,10 +57,10 @@ class StreamSplitter<T> { |
/// |
/// [count] defaults to 2. This is the same as creating [count] branches and |
/// then closing the [StreamSplitter]. |
- static List<Stream> splitFrom(Stream stream, [int count]) { |
+ static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) { |
if (count == null) count = 2; |
- var splitter = new StreamSplitter(stream); |
- var streams = new List.generate(count, (_) => splitter.split()); |
+ var splitter = new StreamSplitter<T>(stream); |
+ var streams = new List<Stream>.generate(count, (_) => splitter.split()); |
splitter.close(); |
return streams; |
} |
@@ -78,12 +75,9 @@ class StreamSplitter<T> { |
throw new StateError("Can't call split() on a closed StreamSplitter."); |
} |
- var controller; |
- controller = new StreamController<T>( |
- onListen: _onListen, |
- onPause: _onPause, |
- onResume: _onResume, |
- onCancel: () => _onCancel(controller)); |
+ var controller = new StreamController<T>( |
+ onListen: _onListen, onPause: _onPause, onResume: _onResume); |
+ controller.onCancel = () => _onCancel(controller); |
for (var result in _buffer) { |
result.addTo(controller); |
@@ -150,8 +144,8 @@ class StreamSplitter<T> { |
// wasn't paused, this will be a no-op. |
_subscription.resume(); |
} else { |
- _subscription = _stream.listen( |
- _onData, onError: _onError, onDone: _onDone); |
+ _subscription = |
+ _stream.listen(_onData, onError: _onError, onDone: _onDone); |
} |
} |