Chromium Code Reviews| Index: sdk/lib/async/stream.dart |
| diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart |
| index e3f3aa471074b4548a5f545f89b65e5c7a791cfb..7ced1511146ee9564a3ae8b9410f8e7a3adb10cd 100644 |
| --- a/sdk/lib/async/stream.dart |
| +++ b/sdk/lib/async/stream.dart |
| @@ -113,6 +113,48 @@ abstract class Stream<T> { |
| } |
| /** |
| + * Create a stream from a group of futures. |
| + * |
| + * The stream reports the results of the futures on the stream in the order |
| + * in which the futures complete. |
| + * |
| + * If some futures have completed before calling `Stream.fromFutures`, |
| + * their result will be output on the created stream in some unspecified |
| + * order. |
| + * |
| + * When all futures have completed, the stream is closed. |
| + * |
| + * If no future is passed, the stream closes as soon as possible. |
| + */ |
| + factory Stream.fromFutures(Iterable<Future<T>> futures) { |
| + var controller; |
| + controller = new StreamController<T>(sync: true); |
|
floitsch
2016/01/07 13:57:13
Merge these two lines?
Lasse Reichstein Nielsen
2016/01/08 09:18:48
Done.
|
| + int count = 0; |
| + var onValue = (value) { |
| + if (!controller.isClosed) { |
| + controller._add(value); |
| + if (--count == 0) controller._closeUnchecked(); |
| + } |
| + }; |
| + var onError = (error, stack) { |
| + if (!controller.isClosed) { |
| + controller._addError(error, stack); |
| + if (--count == 0) controller._closeUnchecked(); |
| + } |
| + }; |
| + // The futures are already running, so start listening to them immediately |
| + // (instead of waiting for the stream to be listened on). |
| + // If we wait, we might not catch errors in the futures in time. |
| + for (var future in futures) { |
| + count++; |
| + future.then(onValue, onError: onError); |
| + } |
| + // Use schedule microtask since controller is sync. |
| + if (count == 0) scheduleMicrotask(controller.close); |
| + return controller.stream; |
| + } |
| + |
| + /** |
| * Creates a single-subscription stream that gets its data from [data]. |
| * |
| * The iterable is iterated when the stream receives a listener, and stops |