Index: sdk/lib/async/stream.dart |
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
index e3f3aa471074b4548a5f545f89b65e5c7a791cfb..c2b6c68984ad5dde9097cc28cd5247060e89d2a4 100644 |
--- a/sdk/lib/async/stream.dart |
+++ b/sdk/lib/async/stream.dart |
@@ -113,6 +113,47 @@ abstract class Stream<T> { |
} |
/** |
+ * Create a stream from a group of futures. |
+ * |
+ * The stream reports the results of the futures on the stream in the order |
+ * in which the futures complete. |
+ * |
+ * If some futures have completed before calling `Stream.fromFutures`, |
+ * their result will be output on the created stream in some unspecified |
+ * order. |
+ * |
+ * When all futures have completed, the stream is closed. |
+ * |
+ * If no future is passed, the stream closes as soon as possible. |
+ */ |
+ factory Stream.fromFutures(Iterable<Future<T>> futures) { |
+ var controller = new StreamController<T>(sync: true); |
+ int count = 0; |
+ var onValue = (value) { |
+ if (!controller.isClosed) { |
+ controller._add(value); |
+ if (--count == 0) controller._closeUnchecked(); |
+ } |
+ }; |
+ var onError = (error, stack) { |
+ if (!controller.isClosed) { |
+ controller._addError(error, stack); |
+ if (--count == 0) controller._closeUnchecked(); |
+ } |
+ }; |
+ // The futures are already running, so start listening to them immediately |
+ // (instead of waiting for the stream to be listened on). |
+ // If we wait, we might not catch errors in the futures in time. |
+ for (var future in futures) { |
+ count++; |
+ future.then(onValue, onError: onError); |
+ } |
+ // Use schedule microtask since controller is sync. |
+ if (count == 0) scheduleMicrotask(controller.close); |
+ return controller.stream; |
+ } |
+ |
+ /** |
* Creates a single-subscription stream that gets its data from [data]. |
* |
* The iterable is iterated when the stream receives a listener, and stops |