| Index: lib/src/utils.dart
|
| diff --git a/lib/src/utils.dart b/lib/src/utils.dart
|
| index 7793d3084deaa94f9493ecd1f7b64039ea32b181..68f2de02b1a2219f54eafb80334ccf86d1956fb3 100644
|
| --- a/lib/src/utils.dart
|
| +++ b/lib/src/utils.dart
|
| @@ -8,13 +8,13 @@ import 'dart:async';
|
| import 'dart:convert';
|
| import 'dart:math' as math;
|
|
|
| +import 'package:async/async.dart' hide StreamQueue;
|
| import 'package:crypto/crypto.dart';
|
| import 'package:path/path.dart' as p;
|
| import 'package:shelf/shelf.dart' as shelf;
|
| import 'package:stack_trace/stack_trace.dart';
|
|
|
| import 'backend/operating_system.dart';
|
| -import 'util/cancelable_future.dart';
|
| import 'util/path_handler.dart';
|
| import 'util/stream_queue.dart';
|
|
|
| @@ -323,40 +323,39 @@ Future maybeFirst(Stream stream) {
|
| return completer.future;
|
| }
|
|
|
| -/// Returns a [CancelableFuture] that returns the next value of [queue] unless
|
| -/// it's canceled.
|
| +/// Returns a [CancelableOperation] that returns the next value of [queue]
|
| +/// unless it's canceled.
|
| ///
|
| -/// If the future is canceled, [queue] is not moved forward at all. Note that
|
| -/// it's not safe to call further methods on [queue] until this future has
|
| +/// If the operation is canceled, [queue] is not moved forward at all. Note that
|
| +/// it's not safe to call further methods on [queue] until this operation has
|
| /// either completed or been canceled.
|
| -CancelableFuture cancelableNext(StreamQueue queue) {
|
| +CancelableOperation cancelableNext(StreamQueue queue) {
|
| var fork = queue.fork();
|
| - var completer = new CancelableCompleter(() => fork.cancel(immediate: true));
|
| + var completer = new CancelableCompleter(
|
| + onCancel: () => fork.cancel(immediate: true));
|
| completer.complete(fork.next.then((_) {
|
| fork.cancel();
|
| return queue.next;
|
| }));
|
| - return completer.future;
|
| + return completer.operation;
|
| }
|
|
|
| -/// Returns a single-subscription stream that emits the results of [futures] in
|
| -/// the order they complete.
|
| +/// Returns a single-subscription stream that emits the results of [operations]
|
| +/// in the order they complete.
|
| ///
|
| -/// If any futures in [futures] are [CancelableFuture]s, this will cancel them
|
| -/// if the subscription is canceled.
|
| -Stream inCompletionOrder(Iterable<Future> futures) {
|
| - var futureSet = futures.toSet();
|
| +/// If the subscription is canceled, any pending operations are canceled as
|
| +/// well.
|
| +Stream inCompletionOrder(Iterable<CancelableOperation> operations) {
|
| + var operationSet = operations.toSet();
|
| var controller = new StreamController(sync: true, onCancel: () {
|
| - return Future.wait(futureSet.map((future) {
|
| - return future is CancelableFuture ? future.cancel() : null;
|
| - }).where((future) => future != null));
|
| + return Future.wait(operationSet.map((operation) => operation.cancel()));
|
| });
|
|
|
| - for (var future in futureSet) {
|
| - future.then(controller.add).catchError(controller.addError)
|
| + for (var operation in operationSet) {
|
| + operation.value.then(controller.add).catchError(controller.addError)
|
| .whenComplete(() {
|
| - futureSet.remove(future);
|
| - if (futureSet.isEmpty) controller.close();
|
| + operationSet.remove(operation);
|
| + if (operationSet.isEmpty) controller.close();
|
| });
|
| }
|
|
|
|
|