Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1682)

Side by Side Diff: lib/src/utils.dart

Issue 1414793015: Use the async package's CancelableOperation. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: Created 5 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/util/cancelable_future.dart ('k') | pubspec.yaml » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library test.utils; 5 library test.utils;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:convert'; 8 import 'dart:convert';
9 import 'dart:math' as math; 9 import 'dart:math' as math;
10 10
11 import 'package:async/async.dart' hide StreamQueue;
11 import 'package:crypto/crypto.dart'; 12 import 'package:crypto/crypto.dart';
12 import 'package:path/path.dart' as p; 13 import 'package:path/path.dart' as p;
13 import 'package:shelf/shelf.dart' as shelf; 14 import 'package:shelf/shelf.dart' as shelf;
14 import 'package:stack_trace/stack_trace.dart'; 15 import 'package:stack_trace/stack_trace.dart';
15 16
16 import 'backend/operating_system.dart'; 17 import 'backend/operating_system.dart';
17 import 'util/cancelable_future.dart';
18 import 'util/path_handler.dart'; 18 import 'util/path_handler.dart';
19 import 'util/stream_queue.dart'; 19 import 'util/stream_queue.dart';
20 20
21 /// The maximum console line length. 21 /// The maximum console line length.
22 const _lineLength = 100; 22 const _lineLength = 100;
23 23
24 /// A typedef for a possibly-asynchronous function. 24 /// A typedef for a possibly-asynchronous function.
25 /// 25 ///
26 /// The return type should only ever by [Future] or void. 26 /// The return type should only ever by [Future] or void.
27 typedef AsyncFunction(); 27 typedef AsyncFunction();
(...skipping 288 matching lines...) Expand 10 before | Expand all | Expand 10 after
316 }, onError: (error, stackTrace) { 316 }, onError: (error, stackTrace) {
317 completer.completeError(error, stackTrace); 317 completer.completeError(error, stackTrace);
318 subscription.cancel(); 318 subscription.cancel();
319 }, onDone: () { 319 }, onDone: () {
320 completer.complete(); 320 completer.complete();
321 }); 321 });
322 322
323 return completer.future; 323 return completer.future;
324 } 324 }
325 325
326 /// Returns a [CancelableFuture] that returns the next value of [queue] unless 326 /// Returns a [CancelableOperation] that returns the next value of [queue]
327 /// it's canceled. 327 /// unless it's canceled.
328 /// 328 ///
329 /// If the future is canceled, [queue] is not moved forward at all. Note that 329 /// If the operation is canceled, [queue] is not moved forward at all. Note that
330 /// it's not safe to call further methods on [queue] until this future has 330 /// it's not safe to call further methods on [queue] until this operation has
331 /// either completed or been canceled. 331 /// either completed or been canceled.
332 CancelableFuture cancelableNext(StreamQueue queue) { 332 CancelableOperation cancelableNext(StreamQueue queue) {
333 var fork = queue.fork(); 333 var fork = queue.fork();
334 var completer = new CancelableCompleter(() => fork.cancel(immediate: true)); 334 var completer = new CancelableCompleter(
335 onCancel: () => fork.cancel(immediate: true));
335 completer.complete(fork.next.then((_) { 336 completer.complete(fork.next.then((_) {
336 fork.cancel(); 337 fork.cancel();
337 return queue.next; 338 return queue.next;
338 })); 339 }));
339 return completer.future; 340 return completer.operation;
340 } 341 }
341 342
342 /// Returns a single-subscription stream that emits the results of [futures] in 343 /// Returns a single-subscription stream that emits the results of [operations]
343 /// the order they complete. 344 /// in the order they complete.
344 /// 345 ///
345 /// If any futures in [futures] are [CancelableFuture]s, this will cancel them 346 /// If the subscription is canceled, any pending operations are canceled as
346 /// if the subscription is canceled. 347 /// well.
347 Stream inCompletionOrder(Iterable<Future> futures) { 348 Stream inCompletionOrder(Iterable<CancelableOperation> operations) {
348 var futureSet = futures.toSet(); 349 var operationSet = operations.toSet();
349 var controller = new StreamController(sync: true, onCancel: () { 350 var controller = new StreamController(sync: true, onCancel: () {
350 return Future.wait(futureSet.map((future) { 351 return Future.wait(operationSet.map((operation) => operation.cancel()));
351 return future is CancelableFuture ? future.cancel() : null;
352 }).where((future) => future != null));
353 }); 352 });
354 353
355 for (var future in futureSet) { 354 for (var operation in operationSet) {
356 future.then(controller.add).catchError(controller.addError) 355 operation.value.then(controller.add).catchError(controller.addError)
357 .whenComplete(() { 356 .whenComplete(() {
358 futureSet.remove(future); 357 operationSet.remove(operation);
359 if (futureSet.isEmpty) controller.close(); 358 if (operationSet.isEmpty) controller.close();
360 }); 359 });
361 } 360 }
362 361
363 return controller.stream; 362 return controller.stream;
364 } 363 }
365 364
366 /// Returns a stream that emits [error] and [stackTrace], then closes. 365 /// Returns a stream that emits [error] and [stackTrace], then closes.
367 /// 366 ///
368 /// This is useful for adding errors to streams defined via `async*`. 367 /// This is useful for adding errors to streams defined via `async*`.
369 Stream errorStream(error, StackTrace stackTrace) { 368 Stream errorStream(error, StackTrace stackTrace) {
(...skipping 26 matching lines...) Expand all
396 urlSafe: urlSafe, addLineSeparator: addLineSeparator); 395 urlSafe: urlSafe, addLineSeparator: addLineSeparator);
397 } 396 }
398 397
399 /// Returns middleware that nests all requests beneath the URL prefix [beneath]. 398 /// Returns middleware that nests all requests beneath the URL prefix [beneath].
400 shelf.Middleware nestingMiddleware(String beneath) { 399 shelf.Middleware nestingMiddleware(String beneath) {
401 return (handler) { 400 return (handler) {
402 var pathHandler = new PathHandler()..add(beneath, handler); 401 var pathHandler = new PathHandler()..add(beneath, handler);
403 return pathHandler.handler; 402 return pathHandler.handler;
404 }; 403 };
405 } 404 }
OLDNEW
« no previous file with comments | « lib/src/util/cancelable_future.dart ('k') | pubspec.yaml » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698