Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3167)

Unified Diff: sdk/lib/async/stream.dart

Issue 1563223002: Add Future.any and Stream.fromFutures. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Address comment Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sdk/lib/async/future.dart ('k') | tests/lib/async/future_test.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sdk/lib/async/stream.dart
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index e3f3aa471074b4548a5f545f89b65e5c7a791cfb..c2b6c68984ad5dde9097cc28cd5247060e89d2a4 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -113,6 +113,47 @@ abstract class Stream<T> {
}
/**
+ * Create a stream from a group of futures.
+ *
+ * The stream reports the results of the futures on the stream in the order
+ * in which the futures complete.
+ *
+ * If some futures have completed before calling `Stream.fromFutures`,
+ * their result will be output on the created stream in some unspecified
+ * order.
+ *
+ * When all futures have completed, the stream is closed.
+ *
+ * If no future is passed, the stream closes as soon as possible.
+ */
+ factory Stream.fromFutures(Iterable<Future<T>> futures) {
+ var controller = new StreamController<T>(sync: true);
+ int count = 0;
+ var onValue = (value) {
+ if (!controller.isClosed) {
+ controller._add(value);
+ if (--count == 0) controller._closeUnchecked();
+ }
+ };
+ var onError = (error, stack) {
+ if (!controller.isClosed) {
+ controller._addError(error, stack);
+ if (--count == 0) controller._closeUnchecked();
+ }
+ };
+ // The futures are already running, so start listening to them immediately
+ // (instead of waiting for the stream to be listened on).
+ // If we wait, we might not catch errors in the futures in time.
+ for (var future in futures) {
+ count++;
+ future.then(onValue, onError: onError);
+ }
+ // Use schedule microtask since controller is sync.
+ if (count == 0) scheduleMicrotask(controller.close);
+ return controller.stream;
+ }
+
+ /**
* Creates a single-subscription stream that gets its data from [data].
*
* The iterable is iterated when the stream receives a listener, and stops
« no previous file with comments | « sdk/lib/async/future.dart ('k') | tests/lib/async/future_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698