Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(478)

Unified Diff: pkg/async/lib/stream_zip.dart

Issue 118783004: Add missing files from previous commit. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | pkg/async/test/stream_zip_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: pkg/async/lib/stream_zip.dart
diff --git a/pkg/async/lib/stream_zip.dart b/pkg/async/lib/stream_zip.dart
new file mode 100644
index 0000000000000000000000000000000000000000..055489dbd38cd5dd6eb9dc7015887ee5a8df1f12
--- /dev/null
+++ b/pkg/async/lib/stream_zip.dart
@@ -0,0 +1,119 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Help for combining multiple streams into a single stream.
+ */
+library dart.pkg.async.stream_zip;
+
+import "dart:async";
+
+/**
+ * A stream that combines the values of other streams.
+ */
+class StreamZip extends Stream<List> {
+ final Iterable<Stream> _streams;
+ StreamZip(Iterable<Stream> streams) : _streams = streams;
+
+ StreamSubscription<List> listen(void onData(List data), {
+ Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ cancelOnError = identical(true, cancelOnError);
+ List<StreamSubscription> subscriptions = <StreamSubscription>[];
+ StreamController controller;
+ List current;
+ int dataCount = 0;
+
+ /// Called for each data from a subscription in [subscriptions].
+ void handleData(int index, data) {
+ current[index] = data;
+ dataCount++;
+ if (dataCount == subscriptions.length) {
+ List data = current;
+ current = new List(subscriptions.length);
+ dataCount = 0;
+ for (int i = 0; i < subscriptions.length; i++) {
+ if (i != index) subscriptions[i].resume();
+ }
+ controller.add(data);
+ } else {
+ subscriptions[index].pause();
+ }
+ }
+
+ /// Called for each error from a subscription in [subscriptions].
+ /// Except if [cancelOnError] is true, in which case the function below
+ /// is used instead.
+ void handleError(Object error, StackTrace stackTrace) {
+ controller.addError(error, stackTrace);
+ }
+
+ /// Called when a subscription has an error and [cancelOnError] is true.
+ ///
+ /// Prematurely cancels all subscriptions since we know that we won't
+ /// be needing any more values.
+ void handleErrorCancel(Object error, StackTrace stackTrace) {
+ for (int i = 0; i < subscriptions.length; i++) {
+ subscriptions[i].cancel();
+ }
+ controller.addError(error, stackTrace);
+ }
+
+ void handleDone() {
+ for (int i = 0; i < subscriptions.length; i++) {
+ subscriptions[i].cancel();
+ }
+ controller.close();
+ }
+
+ try {
+ for (Stream stream in _streams) {
+ int index = subscriptions.length;
+ subscriptions.add(stream.listen(
+ (data) { handleData(index, data); },
+ onError: cancelOnError ? handleError : handleErrorCancel,
+ onDone: handleDone,
+ cancelOnError: cancelOnError));
+ }
+ } catch (e) {
+ for (int i = subscriptions.length - 1; i >= 0; i--) {
+ subscriptions[i].cancel();
+ }
+ rethrow;
+ }
+
+ current = new List(subscriptions.length);
+
+ controller = new StreamController<List>(
+ onPause: () {
+ for (int i = 0; i < subscriptions.length; i++) {
+ // This may pause some subscriptions more than once.
+ // These will not be resumed by onResume below, but must wait for the
+ // next round.
+ subscriptions[i].pause();
+ }
+ },
+ onResume: () {
+ for (int i = 0; i < subscriptions.length; i++) {
+ subscriptions[i].resume();
+ }
+ },
+ onCancel: () {
+ for (int i = 0; i < subscriptions.length; i++) {
+ // Canceling more than once is safe.
+ subscriptions[i].cancel();
+ }
+ }
+ );
+
+ if (subscriptions.isEmpty) {
+ controller.close();
+ }
+ return controller.stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ cancelOnError: cancelOnError);
+ }
+}
« no previous file with comments | « no previous file | pkg/async/test/stream_zip_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698