Index: packages/async/lib/src/stream_zip.dart |
diff --git a/packages/async/lib/stream_zip.dart b/packages/async/lib/src/stream_zip.dart |
similarity index 54% |
copy from packages/async/lib/stream_zip.dart |
copy to packages/async/lib/src/stream_zip.dart |
index 055489dbd38cd5dd6eb9dc7015887ee5a8df1f12..3d5a8113bcdc56e9c904a1b6abaa950eeb82a5a7 100644 |
--- a/packages/async/lib/stream_zip.dart |
+++ b/packages/async/lib/src/stream_zip.dart |
@@ -1,37 +1,36 @@ |
-// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
// for details. All rights reserved. Use of this source code is governed by a |
// BSD-style license that can be found in the LICENSE file. |
-/** |
- * Help for combining multiple streams into a single stream. |
- */ |
-library dart.pkg.async.stream_zip; |
- |
import "dart:async"; |
-/** |
- * A stream that combines the values of other streams. |
- */ |
-class StreamZip extends Stream<List> { |
- final Iterable<Stream> _streams; |
- StreamZip(Iterable<Stream> streams) : _streams = streams; |
+/// A stream that combines the values of other streams. |
+/// |
+/// This emits lists of collected values from each input stream. The first list |
+/// contains the first value emitted by each stream, the second contains the |
+/// second value, and so on. The lists have the same ordering as the iterable |
+/// passed to [new StreamZip]. |
+/// |
+/// Any errors from any of the streams are forwarded directly to this stream. |
+class StreamZip<T> extends Stream<List<T>> { |
+ final Iterable<Stream<T>> _streams; |
+ |
+ StreamZip(Iterable<Stream<T>> streams) : _streams = streams; |
- StreamSubscription<List> listen(void onData(List data), { |
- Function onError, |
- void onDone(), |
- bool cancelOnError}) { |
+ StreamSubscription<List<T>> listen(void onData(List<T> data), |
+ {Function onError, void onDone(), bool cancelOnError}) { |
cancelOnError = identical(true, cancelOnError); |
- List<StreamSubscription> subscriptions = <StreamSubscription>[]; |
- StreamController controller; |
- List current; |
+ var subscriptions = <StreamSubscription<T>>[]; |
+ StreamController<List<T>> controller; |
+ List<T> current; |
int dataCount = 0; |
/// Called for each data from a subscription in [subscriptions]. |
- void handleData(int index, data) { |
+ void handleData(int index, T data) { |
current[index] = data; |
dataCount++; |
if (dataCount == subscriptions.length) { |
- List data = current; |
+ var data = current; |
current = new List(subscriptions.length); |
dataCount = 0; |
for (int i = 0; i < subscriptions.length; i++) { |
@@ -69,10 +68,11 @@ class StreamZip extends Stream<List> { |
} |
try { |
- for (Stream stream in _streams) { |
+ for (var stream in _streams) { |
int index = subscriptions.length; |
- subscriptions.add(stream.listen( |
- (data) { handleData(index, data); }, |
+ subscriptions.add(stream.listen((data) { |
+ handleData(index, data); |
+ }, |
onError: cancelOnError ? handleError : handleErrorCancel, |
onDone: handleDone, |
cancelOnError: cancelOnError)); |
@@ -86,34 +86,28 @@ class StreamZip extends Stream<List> { |
current = new List(subscriptions.length); |
- controller = new StreamController<List>( |
- onPause: () { |
- for (int i = 0; i < subscriptions.length; i++) { |
- // This may pause some subscriptions more than once. |
- // These will not be resumed by onResume below, but must wait for the |
- // next round. |
- subscriptions[i].pause(); |
- } |
- }, |
- onResume: () { |
- for (int i = 0; i < subscriptions.length; i++) { |
- subscriptions[i].resume(); |
- } |
- }, |
- onCancel: () { |
- for (int i = 0; i < subscriptions.length; i++) { |
- // Canceling more than once is safe. |
- subscriptions[i].cancel(); |
- } |
+ controller = new StreamController<List<T>>(onPause: () { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ // This may pause some subscriptions more than once. |
+ // These will not be resumed by onResume below, but must wait for the |
+ // next round. |
+ subscriptions[i].pause(); |
+ } |
+ }, onResume: () { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ subscriptions[i].resume(); |
+ } |
+ }, onCancel: () { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ // Canceling more than once is safe. |
+ subscriptions[i].cancel(); |
} |
- ); |
+ }); |
if (subscriptions.isEmpty) { |
controller.close(); |
} |
return controller.stream.listen(onData, |
- onError: onError, |
- onDone: onDone, |
- cancelOnError: cancelOnError); |
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
} |
} |