Index: lib/src/utils.dart |
diff --git a/lib/src/utils.dart b/lib/src/utils.dart |
index 1e9ca1f2eaee9e5360466a3ed1cd70f63a793b99..2f4ff61b7a978770558fa92e422dba3f706013f2 100644 |
--- a/lib/src/utils.dart |
+++ b/lib/src/utils.dart |
@@ -14,7 +14,9 @@ import 'package:shelf/shelf.dart' as shelf; |
import 'package:stack_trace/stack_trace.dart'; |
import 'backend/operating_system.dart'; |
+import 'util/cancelable_future.dart'; |
import 'util/path_handler.dart'; |
+import 'util/stream_queue.dart'; |
/// The maximum console line length. |
const _lineLength = 100; |
@@ -321,6 +323,33 @@ Future maybeFirst(Stream stream) { |
return completer.future; |
} |
+CancelableFuture cancelableNext(StreamQueue queue) { |
+ var fork = queue.fork(); |
+ var completer = new CancelableCompleter(() => fork.cancel(immediate: true)); |
+ completer.complete(fork.next.then((_) { |
+ fork.cancel(); |
+ return queue.next; |
+ })); |
+ return completer.future; |
+} |
+ |
+Future race(Iterable<CancelableFuture> futures) { |
+ var completer = new Completer.sync(); |
+ for (var future in futures) { |
+ future.then((value) { |
+ if (!completer.isCompleted) completer.complete(value); |
+ }).catchError((error, stackTrace) { |
+ if (!completer.isCompleted) completer.completeError(error, stackTrace); |
+ }); |
+ } |
+ |
+ return completer.future.whenComplete(() { |
+ for (var future in futures) { |
+ future.cancel(); |
+ } |
+ }); |
+} |
+ |
/// Returns a stream that emits [error] and [stackTrace], then closes. |
/// |
/// This is useful for adding errors to streams defined via `async*`. |