Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(353)

Side by Side Diff: utils/pub/io.dart

Issue 12610006: Renamed StreamSink to EventSink. Renamed signalError to addError. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Changed inheritance back! Now create StreamSink instead of EventSink where we create them. Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /// Helper functionality to make working with IO easier. 5 /// Helper functionality to make working with IO easier.
6 library io; 6 library io;
7 7
8 import 'dart:async'; 8 import 'dart:async';
9 import 'dart:io'; 9 import 'dart:io';
10 import 'dart:isolate'; 10 import 'dart:isolate';
(...skipping 334 matching lines...) Expand 10 before | Expand all | Expand 10 after
345 } 345 }
346 346
347 return path.normalize(path.join(utilDir, 'pub', target)); 347 return path.normalize(path.join(utilDir, 'pub', target));
348 } 348 }
349 349
350 // TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr 350 // TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr
351 // nicer. 351 // nicer.
352 352
353 /// A sink that writes to standard output. Errors piped to this stream will be 353 /// A sink that writes to standard output. Errors piped to this stream will be
354 /// surfaced to the top-level error handler. 354 /// surfaced to the top-level error handler.
355 final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout"); 355 final EventSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout");
356 356
357 /// A sink that writes to standard error. Errors piped to this stream will be 357 /// A sink that writes to standard error. Errors piped to this stream will be
358 /// surfaced to the top-level error handler. 358 /// surfaced to the top-level error handler.
359 final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); 359 final EventSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
360 360
361 /// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are 361 /// Wrap the standard output or error [stream] in a [EventSink]. Any errors are
362 /// logged, and then the program is terminated. [name] is used for debugging. 362 /// logged, and then the program is terminated. [name] is used for debugging.
363 StreamSink<List<int>> _wrapStdio(IOSink sink, String name) { 363 EventSink<List<int>> _wrapStdio(IOSink sink, String name) {
364 var pair = consumerToSink(sink); 364 var pair = consumerToSink(sink);
365 pair.last.catchError((e) { 365 pair.last.catchError((e) {
366 // This log may or may not work, depending on how the stream failed. Not 366 // This log may or may not work, depending on how the stream failed. Not
367 // much we can do about that. 367 // much we can do about that.
368 log.error("Error writing to $name: $e"); 368 log.error("Error writing to $name: $e");
369 exit(exit_codes.IO); 369 exit(exit_codes.IO);
370 }); 370 });
371 return pair.first; 371 return pair.first;
372 } 372 }
373 373
(...skipping 13 matching lines...) Expand all
387 return streamFirst(stdinLines) 387 return streamFirst(stdinLines)
388 .then((line) => new RegExp(r"^[yY]").hasMatch(line)); 388 .then((line) => new RegExp(r"^[yY]").hasMatch(line));
389 } 389 }
390 390
391 /// Reads and discards all output from [stream]. Returns a [Future] that 391 /// Reads and discards all output from [stream]. Returns a [Future] that
392 /// completes when the stream is closed. 392 /// completes when the stream is closed.
393 Future drainStream(Stream stream) { 393 Future drainStream(Stream stream) {
394 return stream.reduce(null, (x, y) {}); 394 return stream.reduce(null, (x, y) {});
395 } 395 }
396 396
397 /// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that 397 /// Returns a [EventSink] that pipes all data to [consumer] and a [Future] that
398 /// will succeed when [StreamSink] is closed or fail with any errors that occur 398 /// will succeed when [EventSink] is closed or fail with any errors that occur
399 /// while writing. 399 /// while writing.
400 Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) { 400 Pair<EventSink, Future> consumerToSink(StreamConsumer consumer) {
401 var controller = new StreamController(); 401 var controller = new StreamController();
402 var done = controller.stream.pipe(consumer); 402 var done = controller.stream.pipe(consumer);
403 return new Pair<StreamSink, Future>(controller.sink, done); 403 return new Pair<EventSink, Future>(controller.sink, done);
404 } 404 }
405 405
406 // TODO(nweiz): remove this when issue 7786 is fixed. 406 // TODO(nweiz): remove this when issue 7786 is fixed.
407 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done, 407 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
408 /// the returned [Future] is completed and [sink] is closed if [closeSink] is 408 /// the returned [Future] is completed and [sink] is closed if [closeSink] is
409 /// true. 409 /// true.
410 /// 410 ///
411 /// When an error occurs on [stream], that error is passed to [sink]. If 411 /// When an error occurs on [stream], that error is passed to [sink]. If
412 /// [unsubscribeOnError] is true, [Future] will be completed successfully and no 412 /// [unsubscribeOnError] is true, [Future] will be completed successfully and no
413 /// more data or errors will be piped from [stream] to [sink]. If 413 /// more data or errors will be piped from [stream] to [sink]. If
414 /// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be 414 /// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be
415 /// closed. 415 /// closed.
416 Future store(Stream stream, StreamSink sink, 416 Future store(Stream stream, EventSink sink,
417 {bool unsubscribeOnError: true, closeSink: true}) { 417 {bool unsubscribeOnError: true, closeSink: true}) {
418 var completer = new Completer(); 418 var completer = new Completer();
419 stream.listen(sink.add, 419 stream.listen(sink.add,
420 onError: (e) { 420 onError: (e) {
421 sink.signalError(e); 421 sink.addError(e);
422 if (unsubscribeOnError) { 422 if (unsubscribeOnError) {
423 completer.complete(); 423 completer.complete();
424 if (closeSink) sink.close(); 424 if (closeSink) sink.close();
425 } 425 }
426 }, 426 },
427 onDone: () { 427 onDone: () {
428 if (closeSink) sink.close(); 428 if (closeSink) sink.close();
429 completer.complete(); 429 completer.complete();
430 }, unsubscribeOnError: unsubscribeOnError); 430 }, unsubscribeOnError: unsubscribeOnError);
431 return completer.future; 431 return completer.future;
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
468 {workingDir, Map<String, String> environment}) => 468 {workingDir, Map<String, String> environment}) =>
469 _doProcess(Process.start, executable, args, workingDir, environment) 469 _doProcess(Process.start, executable, args, workingDir, environment)
470 .then((process) => new PubProcess(process)); 470 .then((process) => new PubProcess(process));
471 471
472 /// A wrapper around [Process] that exposes `dart:async`-style APIs. 472 /// A wrapper around [Process] that exposes `dart:async`-style APIs.
473 class PubProcess { 473 class PubProcess {
474 /// The underlying `dart:io` [Process]. 474 /// The underlying `dart:io` [Process].
475 final Process _process; 475 final Process _process;
476 476
477 /// The mutable field for [stdin]. 477 /// The mutable field for [stdin].
478 StreamSink<List<int>> _stdin; 478 EventSink<List<int>> _stdin;
479 479
480 /// The mutable field for [stdinClosed]. 480 /// The mutable field for [stdinClosed].
481 Future _stdinClosed; 481 Future _stdinClosed;
482 482
483 /// The mutable field for [stdout]. 483 /// The mutable field for [stdout].
484 ByteStream _stdout; 484 ByteStream _stdout;
485 485
486 /// The mutable field for [stderr]. 486 /// The mutable field for [stderr].
487 ByteStream _stderr; 487 ByteStream _stderr;
488 488
489 /// The mutable field for [exitCode]. 489 /// The mutable field for [exitCode].
490 Future<int> _exitCode; 490 Future<int> _exitCode;
491 491
492 /// The sink used for passing data to the process's standard input stream. 492 /// The sink used for passing data to the process's standard input stream.
493 /// Errors on this stream are surfaced through [stdinClosed], [stdout], 493 /// Errors on this stream are surfaced through [stdinClosed], [stdout],
494 /// [stderr], and [exitCode], which are all members of an [ErrorGroup]. 494 /// [stderr], and [exitCode], which are all members of an [ErrorGroup].
495 StreamSink<List<int>> get stdin => _stdin; 495 EventSink<List<int>> get stdin => _stdin;
496 496
497 // TODO(nweiz): write some more sophisticated Future machinery so that this 497 // TODO(nweiz): write some more sophisticated Future machinery so that this
498 // doesn't surface errors from the other streams/futures, but still passes its 498 // doesn't surface errors from the other streams/futures, but still passes its
499 // unhandled errors to them. Right now it's impossible to recover from a stdin 499 // unhandled errors to them. Right now it's impossible to recover from a stdin
500 // error and continue interacting with the process. 500 // error and continue interacting with the process.
501 /// A [Future] that completes when [stdin] is closed, either by the user or by 501 /// A [Future] that completes when [stdin] is closed, either by the user or by
502 /// the process itself. 502 /// the process itself.
503 /// 503 ///
504 /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any 504 /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any
505 /// error in process will be passed to it, but won't reach the top-level error 505 /// error in process will be passed to it, but won't reach the top-level error
(...skipping 228 matching lines...) Expand 10 before | Expand all | Expand 10 after
734 var args = ["--create", "--gzip", "--directory", baseDir]; 734 var args = ["--create", "--gzip", "--directory", baseDir];
735 args.addAll(contents); 735 args.addAll(contents);
736 // TODO(nweiz): It's possible that enough command-line arguments will make 736 // TODO(nweiz): It's possible that enough command-line arguments will make
737 // the process choke, so at some point we should save the arguments to a 737 // the process choke, so at some point we should save the arguments to a
738 // file and pass them in via --files-from for tar and -i@filename for 7zip. 738 // file and pass them in via --files-from for tar and -i@filename for 7zip.
739 startProcess("tar", args).then((process) { 739 startProcess("tar", args).then((process) {
740 store(process.stdout, controller); 740 store(process.stdout, controller);
741 }).catchError((e) { 741 }).catchError((e) {
742 // We don't have to worry about double-signaling here, since the store() 742 // We don't have to worry about double-signaling here, since the store()
743 // above will only be reached if startProcess succeeds. 743 // above will only be reached if startProcess succeeds.
744 controller.signalError(e.error, e.stackTrace); 744 controller.addError(e.error, e.stackTrace);
745 controller.close(); 745 controller.close();
746 }); 746 });
747 return new ByteStream(controller.stream); 747 return new ByteStream(controller.stream);
748 } 748 }
749 749
750 withTempDir((tempDir) { 750 withTempDir((tempDir) {
751 // Create the tar file. 751 // Create the tar file.
752 var tarFile = path.join(tempDir, "intermediate.tar"); 752 var tarFile = path.join(tempDir, "intermediate.tar");
753 var args = ["a", "-w$baseDir", tarFile]; 753 var args = ["a", "-w$baseDir", tarFile];
754 args.addAll(contents.map((entry) => '-i!"$entry"')); 754 args.addAll(contents.map((entry) => '-i!"$entry"'));
(...skipping 16 matching lines...) Expand all
771 // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't 771 // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't
772 // want to show that since it's meaningless. 772 // want to show that since it's meaningless.
773 // 773 //
774 // TODO(rnystrom): Should log the stderr and display it if an actual error 774 // TODO(rnystrom): Should log the stderr and display it if an actual error
775 // occurs. 775 // occurs.
776 return store(process.stdout, controller); 776 return store(process.stdout, controller);
777 }); 777 });
778 }).catchError((e) { 778 }).catchError((e) {
779 // We don't have to worry about double-signaling here, since the store() 779 // We don't have to worry about double-signaling here, since the store()
780 // above will only be reached if everything succeeds. 780 // above will only be reached if everything succeeds.
781 controller.signalError(e.error, e.stackTrace); 781 controller.addError(e.error, e.stackTrace);
782 controller.close(); 782 controller.close();
783 }); 783 });
784 return new ByteStream(controller.stream); 784 return new ByteStream(controller.stream);
785 } 785 }
786 786
787 /// Exception thrown when an operation times out. 787 /// Exception thrown when an operation times out.
788 class TimeoutException implements Exception { 788 class TimeoutException implements Exception {
789 final String message; 789 final String message;
790 790
791 const TimeoutException(this.message); 791 const TimeoutException(this.message);
(...skipping 10 matching lines...) Expand all
802 const PubProcessResult(this.stdout, this.stderr, this.exitCode); 802 const PubProcessResult(this.stdout, this.stderr, this.exitCode);
803 803
804 bool get success => exitCode == 0; 804 bool get success => exitCode == 0;
805 } 805 }
806 806
807 /// Gets a [Uri] for [uri], which can either already be one, or be a [String]. 807 /// Gets a [Uri] for [uri], which can either already be one, or be a [String].
808 Uri _getUri(uri) { 808 Uri _getUri(uri) {
809 if (uri is Uri) return uri; 809 if (uri is Uri) return uri;
810 return Uri.parse(uri); 810 return Uri.parse(uri);
811 } 811 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698