Index: lib/src/utils.dart |
diff --git a/lib/src/utils.dart b/lib/src/utils.dart |
index e44e304d7435c43255ffc5d24111fd1cd94f210f..41211f335aff7ea90945d06b46c7e354799965c5 100644 |
--- a/lib/src/utils.dart |
+++ b/lib/src/utils.dart |
@@ -339,23 +339,28 @@ CancelableFuture cancelableNext(StreamQueue queue) { |
return completer.future; |
} |
-/// Returns the result of whichever of [futures] completes first, and cancels |
-/// the others. |
-Future race(Iterable<CancelableFuture> futures) { |
- var completer = new Completer.sync(); |
- for (var future in futures) { |
- future.then((value) { |
- if (!completer.isCompleted) completer.complete(value); |
- }).catchError((error, stackTrace) { |
- if (!completer.isCompleted) completer.completeError(error, stackTrace); |
+/// Returns a single-subscription stream that emits the results of [futures] in |
+/// the order they complete. |
+/// |
+/// If any futures in [futures] are [CancelableFuture]s, this will cancel them |
+/// if the subscription is canceled. |
+Stream inCompletionOrder(Iterable<Future> futures) { |
+ var futureSet = futures.toSet(); |
+ var controller = new StreamController(sync: true, onCancel: () { |
+ return Future.wait(futureSet.map((future) { |
+ return future is CancelableFuture ? future.cancel() : null; |
+ }).where((future) => future != null)); |
+ }); |
+ |
+ for (var future in futureSet) { |
+ future.then(controller.add).catchError(controller.addError) |
+ .whenComplete(() { |
+ futureSet.remove(future); |
+ if (futureSet.isEmpty) controller.close(); |
}); |
} |
- return completer.future.whenComplete(() { |
- for (var future in futures) { |
- future.cancel(); |
- } |
- }); |
+ return controller.stream; |
} |
/// Returns a stream that emits [error] and [stackTrace], then closes. |