Index: lib/src/utils.dart |
diff --git a/lib/src/utils.dart b/lib/src/utils.dart |
index 7793d3084deaa94f9493ecd1f7b64039ea32b181..68f2de02b1a2219f54eafb80334ccf86d1956fb3 100644 |
--- a/lib/src/utils.dart |
+++ b/lib/src/utils.dart |
@@ -8,13 +8,13 @@ import 'dart:async'; |
import 'dart:convert'; |
import 'dart:math' as math; |
+import 'package:async/async.dart' hide StreamQueue; |
import 'package:crypto/crypto.dart'; |
import 'package:path/path.dart' as p; |
import 'package:shelf/shelf.dart' as shelf; |
import 'package:stack_trace/stack_trace.dart'; |
import 'backend/operating_system.dart'; |
-import 'util/cancelable_future.dart'; |
import 'util/path_handler.dart'; |
import 'util/stream_queue.dart'; |
@@ -323,40 +323,39 @@ Future maybeFirst(Stream stream) { |
return completer.future; |
} |
-/// Returns a [CancelableFuture] that returns the next value of [queue] unless |
-/// it's canceled. |
+/// Returns a [CancelableOperation] that returns the next value of [queue] |
+/// unless it's canceled. |
/// |
-/// If the future is canceled, [queue] is not moved forward at all. Note that |
-/// it's not safe to call further methods on [queue] until this future has |
+/// If the operation is canceled, [queue] is not moved forward at all. Note that |
+/// it's not safe to call further methods on [queue] until this operation has |
/// either completed or been canceled. |
-CancelableFuture cancelableNext(StreamQueue queue) { |
+CancelableOperation cancelableNext(StreamQueue queue) { |
var fork = queue.fork(); |
- var completer = new CancelableCompleter(() => fork.cancel(immediate: true)); |
+ var completer = new CancelableCompleter( |
+ onCancel: () => fork.cancel(immediate: true)); |
completer.complete(fork.next.then((_) { |
fork.cancel(); |
return queue.next; |
})); |
- return completer.future; |
+ return completer.operation; |
} |
-/// Returns a single-subscription stream that emits the results of [futures] in |
-/// the order they complete. |
+/// Returns a single-subscription stream that emits the results of [operations] |
+/// in the order they complete. |
/// |
-/// If any futures in [futures] are [CancelableFuture]s, this will cancel them |
-/// if the subscription is canceled. |
-Stream inCompletionOrder(Iterable<Future> futures) { |
- var futureSet = futures.toSet(); |
+/// If the subscription is canceled, any pending operations are canceled as |
+/// well. |
+Stream inCompletionOrder(Iterable<CancelableOperation> operations) { |
+ var operationSet = operations.toSet(); |
var controller = new StreamController(sync: true, onCancel: () { |
- return Future.wait(futureSet.map((future) { |
- return future is CancelableFuture ? future.cancel() : null; |
- }).where((future) => future != null)); |
+ return Future.wait(operationSet.map((operation) => operation.cancel())); |
}); |
- for (var future in futureSet) { |
- future.then(controller.add).catchError(controller.addError) |
+ for (var operation in operationSet) { |
+ operation.value.then(controller.add).catchError(controller.addError) |
.whenComplete(() { |
- futureSet.remove(future); |
- if (futureSet.isEmpty) controller.close(); |
+ operationSet.remove(operation); |
+ if (operationSet.isEmpty) controller.close(); |
}); |
} |