| Index: lib/src/utils.dart
|
| diff --git a/lib/src/utils.dart b/lib/src/utils.dart
|
| index e44e304d7435c43255ffc5d24111fd1cd94f210f..41211f335aff7ea90945d06b46c7e354799965c5 100644
|
| --- a/lib/src/utils.dart
|
| +++ b/lib/src/utils.dart
|
| @@ -339,23 +339,28 @@ CancelableFuture cancelableNext(StreamQueue queue) {
|
| return completer.future;
|
| }
|
|
|
| -/// Returns the result of whichever of [futures] completes first, and cancels
|
| -/// the others.
|
| -Future race(Iterable<CancelableFuture> futures) {
|
| - var completer = new Completer.sync();
|
| - for (var future in futures) {
|
| - future.then((value) {
|
| - if (!completer.isCompleted) completer.complete(value);
|
| - }).catchError((error, stackTrace) {
|
| - if (!completer.isCompleted) completer.completeError(error, stackTrace);
|
| +/// Returns a single-subscription stream that emits the results of [futures] in
|
| +/// the order they complete.
|
| +///
|
| +/// If any futures in [futures] are [CancelableFuture]s, this will cancel them
|
| +/// if the subscription is canceled.
|
| +Stream inCompletionOrder(Iterable<Future> futures) {
|
| + var futureSet = futures.toSet();
|
| + var controller = new StreamController(sync: true, onCancel: () {
|
| + return Future.wait(futureSet.map((future) {
|
| + return future is CancelableFuture ? future.cancel() : null;
|
| + }).where((future) => future != null));
|
| + });
|
| +
|
| + for (var future in futureSet) {
|
| + future.then(controller.add).catchError(controller.addError)
|
| + .whenComplete(() {
|
| + futureSet.remove(future);
|
| + if (futureSet.isEmpty) controller.close();
|
| });
|
| }
|
|
|
| - return completer.future.whenComplete(() {
|
| - for (var future in futures) {
|
| - future.cancel();
|
| - }
|
| - });
|
| + return controller.stream;
|
| }
|
|
|
| /// Returns a stream that emits [error] and [stackTrace], then closes.
|
|
|