Index: packages/async/lib/stream_zip.dart |
diff --git a/packages/async/lib/stream_zip.dart b/packages/async/lib/stream_zip.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..055489dbd38cd5dd6eb9dc7015887ee5a8df1f12 |
--- /dev/null |
+++ b/packages/async/lib/stream_zip.dart |
@@ -0,0 +1,119 @@ |
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+/** |
+ * Help for combining multiple streams into a single stream. |
+ */ |
+library dart.pkg.async.stream_zip; |
+ |
+import "dart:async"; |
+ |
+/** |
+ * A stream that combines the values of other streams. |
+ */ |
+class StreamZip extends Stream<List> { |
+ final Iterable<Stream> _streams; |
+ StreamZip(Iterable<Stream> streams) : _streams = streams; |
+ |
+ StreamSubscription<List> listen(void onData(List data), { |
+ Function onError, |
+ void onDone(), |
+ bool cancelOnError}) { |
+ cancelOnError = identical(true, cancelOnError); |
+ List<StreamSubscription> subscriptions = <StreamSubscription>[]; |
+ StreamController controller; |
+ List current; |
+ int dataCount = 0; |
+ |
+ /// Called for each data from a subscription in [subscriptions]. |
+ void handleData(int index, data) { |
+ current[index] = data; |
+ dataCount++; |
+ if (dataCount == subscriptions.length) { |
+ List data = current; |
+ current = new List(subscriptions.length); |
+ dataCount = 0; |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ if (i != index) subscriptions[i].resume(); |
+ } |
+ controller.add(data); |
+ } else { |
+ subscriptions[index].pause(); |
+ } |
+ } |
+ |
+ /// Called for each error from a subscription in [subscriptions]. |
+ /// Except if [cancelOnError] is true, in which case the function below |
+ /// is used instead. |
+ void handleError(Object error, StackTrace stackTrace) { |
+ controller.addError(error, stackTrace); |
+ } |
+ |
+ /// Called when a subscription has an error and [cancelOnError] is true. |
+ /// |
+ /// Prematurely cancels all subscriptions since we know that we won't |
+ /// be needing any more values. |
+ void handleErrorCancel(Object error, StackTrace stackTrace) { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ subscriptions[i].cancel(); |
+ } |
+ controller.addError(error, stackTrace); |
+ } |
+ |
+ void handleDone() { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ subscriptions[i].cancel(); |
+ } |
+ controller.close(); |
+ } |
+ |
+ try { |
+ for (Stream stream in _streams) { |
+ int index = subscriptions.length; |
+ subscriptions.add(stream.listen( |
+ (data) { handleData(index, data); }, |
+ onError: cancelOnError ? handleError : handleErrorCancel, |
+ onDone: handleDone, |
+ cancelOnError: cancelOnError)); |
+ } |
+ } catch (e) { |
+ for (int i = subscriptions.length - 1; i >= 0; i--) { |
+ subscriptions[i].cancel(); |
+ } |
+ rethrow; |
+ } |
+ |
+ current = new List(subscriptions.length); |
+ |
+ controller = new StreamController<List>( |
+ onPause: () { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ // This may pause some subscriptions more than once. |
+ // These will not be resumed by onResume below, but must wait for the |
+ // next round. |
+ subscriptions[i].pause(); |
+ } |
+ }, |
+ onResume: () { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ subscriptions[i].resume(); |
+ } |
+ }, |
+ onCancel: () { |
+ for (int i = 0; i < subscriptions.length; i++) { |
+ // Canceling more than once is safe. |
+ subscriptions[i].cancel(); |
+ } |
+ } |
+ ); |
+ |
+ if (subscriptions.isEmpty) { |
+ controller.close(); |
+ } |
+ return controller.stream.listen(onData, |
+ onError: onError, |
+ onDone: onDone, |
+ cancelOnError: cancelOnError); |
+ } |
+} |