Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(371)

Side by Side Diff: utils/pub/io.dart

Issue 12086110: Use the dart:async Stream API thoroughly in Pub. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « utils/pub/hosted_source.dart ('k') | utils/pub/log.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 /// Helper functionality to make working with IO easier. 5 /// Helper functionality to make working with IO easier.
6 library io; 6 library io;
7 7
8 import 'dart:async'; 8 import 'dart:async';
9 import 'dart:io'; 9 import 'dart:io';
10 import 'dart:isolate'; 10 import 'dart:isolate';
11 import 'dart:json'; 11 import 'dart:json';
12 import 'dart:uri'; 12 import 'dart:uri';
13 13
14 import '../../pkg/path/lib/path.dart' as path; 14 import '../../pkg/path/lib/path.dart' as path;
15 import '../../pkg/http/lib/http.dart' show ByteStream;
16 import 'error_group.dart';
17 import 'exit_codes.dart' as exit_codes;
15 import 'log.dart' as log; 18 import 'log.dart' as log;
16 import 'utils.dart'; 19 import 'utils.dart';
17 20
21 export '../../pkg/http/lib/http.dart' show ByteStream;
22
18 final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?"); 23 final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?");
19 24
20 /// Joins a number of path string parts into a single path. Handles 25 /// Joins a number of path string parts into a single path. Handles
21 /// platform-specific path separators. Parts can be [String], [Directory], or 26 /// platform-specific path separators. Parts can be [String], [Directory], or
22 /// [File] objects. 27 /// [File] objects.
23 String join(part1, [part2, part3, part4, part5, part6, part7, part8]) { 28 String join(part1, [part2, part3, part4, part5, part6, part7, part8]) {
24 var parts = [part1, part2, part3, part4, part5, part6, part7, part8] 29 var parts = [part1, part2, part3, part4, part5, part6, part7, part8]
25 .map((part) => part == null ? null : _getPath(part)).toList(); 30 .map((part) => part == null ? null : _getPath(part)).toList();
26 31
27 return path.join(parts[0], parts[1], parts[2], parts[3], parts[4], parts[5], 32 return path.join(parts[0], parts[1], parts[2], parts[3], parts[4], parts[5],
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
137 /// a [Future] that completes when the deletion is done. 142 /// a [Future] that completes when the deletion is done.
138 Future<File> deleteFile(file) { 143 Future<File> deleteFile(file) {
139 var path = _getPath(file); 144 var path = _getPath(file);
140 return log.ioAsync("delete file $path", 145 return log.ioAsync("delete file $path",
141 new File(path).delete()); 146 new File(path).delete());
142 } 147 }
143 148
144 /// Writes [stream] to a new file at [path], which may be a [String] or a 149 /// Writes [stream] to a new file at [path], which may be a [String] or a
145 /// [File]. Will replace any file already at that path. Completes when the file 150 /// [File]. Will replace any file already at that path. Completes when the file
146 /// is done being written. 151 /// is done being written.
147 Future<File> createFileFromStream(InputStream stream, path) { 152 Future<File> createFileFromStream(Stream<List<int>> stream, path) {
148 path = _getPath(path); 153 path = _getPath(path);
149 154
150 log.io("Creating $path from stream."); 155 log.io("Creating $path from stream.");
151 156
152 var completer = new Completer<File>();
153 var completed = false;
154 var file = new File(path); 157 var file = new File(path);
155 var outputStream = file.openOutputStream(); 158 return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) {
156 stream.pipe(outputStream);
157
158 outputStream.onClosed = () {
159 log.fine("Created $path from stream."); 159 log.fine("Created $path from stream.");
160 completed = true; 160 });
161 completer.complete(file);
162 };
163
164 // TODO(nweiz): remove this when issue 4061 is fixed.
165 var stackTrace;
166 try {
167 throw "";
168 } catch (_, localStackTrace) {
169 stackTrace = localStackTrace;
170 }
171
172 completeError(error) {
173 if (!completed) {
174 completed = true;
175 completer.completeError(error, stackTrace);
176 } else {
177 log.fine("Got error after stream was closed: $error");
178 }
179 }
180
181 stream.onError = completeError;
182 outputStream.onError = completeError;
183
184 return completer.future;
185 } 161 }
186 162
187 /// Creates a directory [dir]. Returns a [Future] that completes when the 163 /// Creates a directory [dir]. Returns a [Future] that completes when the
188 /// directory is created. 164 /// directory is created.
189 Future<Directory> createDir(dir) { 165 Future<Directory> createDir(dir) {
190 dir = _getDirectory(dir); 166 dir = _getDirectory(dir);
191 return log.ioAsync("create directory ${dir.path}", 167 return log.ioAsync("create directory ${dir.path}",
192 dir.create()); 168 dir.create());
193 } 169 }
194 170
(...skipping 255 matching lines...) Expand 10 before | Expand all | Expand 10 after
450 // which also live under "utils", or from the SDK where pub is in "util". 426 // which also live under "utils", or from the SDK where pub is in "util".
451 var utilDir = dirname(scriptPath); 427 var utilDir = dirname(scriptPath);
452 while (basename(utilDir) != 'utils' && basename(utilDir) != 'util') { 428 while (basename(utilDir) != 'utils' && basename(utilDir) != 'util') {
453 if (basename(utilDir) == '') throw 'Could not find path to pub.'; 429 if (basename(utilDir) == '') throw 'Could not find path to pub.';
454 utilDir = dirname(utilDir); 430 utilDir = dirname(utilDir);
455 } 431 }
456 432
457 return path.normalize(join(utilDir, 'pub', target)); 433 return path.normalize(join(utilDir, 'pub', target));
458 } 434 }
459 435
460 /// A StringInputStream reading from stdin. 436 // TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr
461 final _stringStdin = new StringInputStream(stdin); 437 // nicer.
438
439 /// A sink that writes to standard output. Errors piped to this stream will be
440 /// surfaced to the top-level error handler.
441 final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout");
442
443 /// A sink that writes to standard error. Errors piped to this stream will be
444 /// surfaced to the top-level error handler.
445 final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
446
447 /// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are
448 /// logged, and then the program is terminated. [name] is used for debugging.
449 StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) {
450 var pair = consumerToSink(wrapOutputStream(stream));
451 pair.last.catchError((e) {
452 // This log may or may not work, depending on how the stream failed. Not
453 // much we can do about that.
454 log.error("Error writing to $name: $e");
455 exit(exit_codes.IO);
456 });
457 return pair.first;
458 }
459
460 /// A line-by-line stream of standard input.
461 final Stream<String> stdinLines =
462 streamToLines(wrapInputStream(stdin).toStringStream());
462 463
463 /// Displays a message and reads a yes/no confirmation from the user. Returns 464 /// Displays a message and reads a yes/no confirmation from the user. Returns
464 /// a [Future] that completes to `true` if the user confirms or `false` if they 465 /// a [Future] that completes to `true` if the user confirms or `false` if they
465 /// do not. 466 /// do not.
466 /// 467 ///
467 /// This will automatically append " (y/n)?" to the message, so [message] 468 /// This will automatically append " (y/n)?" to the message, so [message]
468 /// should just be a fragment like, "Are you sure you want to proceed". 469 /// should just be a fragment like, "Are you sure you want to proceed".
469 Future<bool> confirm(String message) { 470 Future<bool> confirm(String message) {
470 log.fine('Showing confirm message: $message'); 471 log.fine('Showing confirm message: $message');
471 stdout.writeString("$message (y/n)? "); 472 stdoutSink.add("$message (y/n)? ".charCodes);
472 return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line)); 473 return streamFirst(stdinLines)
473 } 474 .then((line) => new RegExp(r"^[yY]").hasMatch(line));
474
475 /// Returns a single line read from a [StringInputStream]. By default, reads
476 /// from stdin.
477 ///
478 /// A [StringInputStream] passed to this should have no callbacks registered.
479 Future<String> readLine([StringInputStream stream]) {
480 if (stream == null) stream = _stringStdin;
481 if (stream.closed) return new Future.immediate('');
482 void removeCallbacks() {
483 stream.onClosed = null;
484 stream.onLine = null;
485 stream.onError = null;
486 }
487
488 // TODO(nweiz): remove this when issue 4061 is fixed.
489 var stackTrace;
490 try {
491 throw "";
492 } catch (_, localStackTrace) {
493 stackTrace = localStackTrace;
494 }
495
496 var completer = new Completer();
497 stream.onClosed = () {
498 removeCallbacks();
499 completer.complete('');
500 };
501
502 stream.onLine = () {
503 removeCallbacks();
504 var line = stream.readLine();
505 log.io('Read line: $line');
506 completer.complete(line);
507 };
508
509 stream.onError = (e) {
510 removeCallbacks();
511 completer.completeError(e, stackTrace);
512 };
513
514 return completer.future;
515 }
516
517 /// Takes all input from [source] and writes it to [sink].
518 ///
519 /// Returns a future that completes when [source] is closed.
520 Future pipeInputToInput(InputStream source, ListInputStream sink) {
521 var completer = new Completer();
522 source.onClosed = () {
523 sink.markEndOfStream();
524 completer.complete(null);
525 };
526 source.onData = () {
527 // Even if the sink is closed and we aren't going to do anything with more
528 // data, we still need to drain it from source to work around issue 7218.
529 var data = source.read();
530 try {
531 if (!sink.closed) sink.write(data);
532 } on StreamException catch (e, stackTrace) {
533 // Ignore an exception to work around issue 4222.
534 log.io("Writing to an unclosed ListInputStream caused exception $e\n"
535 "$stackTrace");
536 }
537 };
538 // TODO(nweiz): propagate this error to the sink. See issue 3657.
539 source.onError = (e) { throw e; };
540 return completer.future;
541 }
542
543 /// Buffers all input from an InputStream and returns it as a future.
544 Future<List<int>> consumeInputStream(InputStream stream) {
545 if (stream.closed) return new Future.immediate(<int>[]);
546
547 // TODO(nweiz): remove this when issue 4061 is fixed.
548 var stackTrace;
549 try {
550 throw "";
551 } catch (_, localStackTrace) {
552 stackTrace = localStackTrace;
553 }
554
555 var completer = new Completer<List<int>>();
556 var buffer = <int>[];
557 stream.onClosed = () => completer.complete(buffer);
558 stream.onData = () => buffer.addAll(stream.read());
559 stream.onError = (e) => completer.completeError(e, stackTrace);
560 return completer.future;
561 }
562
563 /// Buffers all input from a StringInputStream and returns it as a future.
564 Future<String> consumeStringInputStream(StringInputStream stream) {
565 if (stream.closed) return new Future.immediate('');
566
567 // TODO(nweiz): remove this when issue 4061 is fixed.
568 var stackTrace;
569 try {
570 throw "";
571 } catch (_, localStackTrace) {
572 stackTrace = localStackTrace;
573 }
574
575 var completer = new Completer<String>();
576 var buffer = new StringBuffer();
577 stream.onClosed = () => completer.complete(buffer.toString());
578 stream.onData = () => buffer.add(stream.read());
579 stream.onError = (e) => completer.completeError(e, stackTrace);
580 return completer.future;
581 } 475 }
582 476
583 /// Wraps [stream] in a single-subscription [Stream] that emits the same data. 477 /// Wraps [stream] in a single-subscription [Stream] that emits the same data.
584 Stream<List<int>> wrapInputStream(InputStream stream) { 478 ByteStream wrapInputStream(InputStream stream) {
585 var controller = new StreamController(); 479 var controller = new StreamController();
586 if (stream.closed) { 480 if (stream.closed) {
587 controller.close(); 481 controller.close();
588 return controller.stream; 482 return new ByteStream(controller.stream);
589 } 483 }
590 484
591 stream.onClosed = controller.close; 485 stream.onClosed = controller.close;
592 stream.onData = () => controller.add(stream.read()); 486 stream.onData = () => controller.add(stream.read());
593 stream.onError = (e) => controller.signalError(new AsyncError(e)); 487 stream.onError = (e) => controller.signalError(new AsyncError(e));
594 return controller.stream; 488 return new ByteStream(controller.stream);
595 }
596
597 // TODO(nweiz): remove this ASAP (issue 7807).
598 /// Wraps [stream] in an [InputStream].
599 InputStream streamToInputStream(Stream<List<int>> stream) {
600 var inputStream = new ListInputStream();
601 stream.listen((chunk) => inputStream.write(chunk),
602 onDone: inputStream.markEndOfStream);
603 return inputStream;
604 } 489 }
605 490
606 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it 491 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
607 /// using [Stream.pipe]. 492 /// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be
493 /// forwarded to the [Future] returned by [Stream.pipe].
608 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => 494 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
609 new _OutputStreamConsumer(stream); 495 new _OutputStreamConsumer(stream);
610 496
611 /// A [StreamConsumer] that pipes data into an [OutputStream]. 497 /// A [StreamConsumer] that pipes data into an [OutputStream].
612 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { 498 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
613 final OutputStream _outputStream; 499 final OutputStream _outputStream;
614 500
615 _OutputStreamConsumer(this._outputStream); 501 _OutputStreamConsumer(this._outputStream);
616 502
617 Future consume(Stream<List<int>> stream) { 503 Future consume(Stream<List<int>> stream) {
618 // TODO(nweiz): we have to manually keep track of whether or not the 504 // TODO(nweiz): we have to manually keep track of whether or not the
619 // completer has completed since the output stream could signal an error 505 // completer has completed since the output stream could signal an error
620 // after close() has been called but before it has shut down internally. See 506 // after close() has been called but before it has shut down internally. See
621 // the following TODO. 507 // the following TODO.
622 var completed = false; 508 var completed = false;
623 var completer = new Completer(); 509 var completer = new Completer();
624 stream.listen((data) { 510 stream.listen((data) {
625 // Writing empty data to a closed stream can cause errors. 511 // Writing empty data to a closed stream can cause errors.
626 if (data.isEmpty) return; 512 if (data.isEmpty) return;
627 513
628 // TODO(nweiz): remove this try/catch when issue 7836 is fixed. 514 // TODO(nweiz): remove this try/catch when issue 7836 is fixed.
629 try { 515 try {
630 _outputStream.write(data); 516 _outputStream.write(data);
631 } catch (e, stack) { 517 } catch (e, stack) {
632 if (!completed) completer.completeError(e, stack); 518 if (!completed) completer.completeError(e, stack);
633 completed = true; 519 completed = true;
634 } 520 }
521 }, onError: (e) {
522 if (!completed) completer.completeError(e.error, e.stackTrace);
523 completed = true;
635 }, onDone: () => _outputStream.close()); 524 }, onDone: () => _outputStream.close());
636 525
637 _outputStream.onError = (e) { 526 _outputStream.onError = (e) {
638 if (!completed) completer.completeError(e); 527 if (!completed) completer.completeError(e);
639 completed = true; 528 completed = true;
640 }; 529 };
641 530
642 _outputStream.onClosed = () { 531 _outputStream.onClosed = () {
643 if (!completed) completer.complete(); 532 if (!completed) completer.complete();
644 completed = true; 533 completed = true;
645 }; 534 };
646 535
647 return completer.future; 536 return completer.future;
648 } 537 }
649 } 538 }
650 539
540 /// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that
541 /// will succeed when [StreamSink] is closed or fail with any errors that occur
542 /// while writing.
543 Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) {
544 var controller = new StreamController();
545 var done = controller.stream.pipe(consumer);
546 return new Pair<StreamSink, Future>(controller.sink, done);
547 }
548
549 // TODO(nweiz): remove this when issue 7786 is fixed.
550 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
551 /// the returned [Future] is completed and [sink] is closed if [closeSink] is
552 /// true.
553 ///
554 /// When an error occurs on [stream], that error is passed to [sink]. If
555 /// [unsubscribeOnError] is true, [Future] will be completed successfully and no
556 /// more data or errors will be piped from [stream] to [sink]. If
557 /// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be
558 /// closed.
559 Future store(Stream stream, StreamSink sink,
560 {bool unsubscribeOnError: true, closeSink: true}) {
561 var completer = new Completer();
562 stream.listen(sink.add,
563 onError: (e) {
564 sink.signalError(e);
565 if (unsubscribeOnError) {
566 completer.complete();
567 if (closeSink) sink.close();
568 }
569 },
570 onDone: () {
571 if (closeSink) sink.close();
572 completer.complete();
573 }, unsubscribeOnError: unsubscribeOnError);
574 return completer.future;
575 }
576
651 /// Spawns and runs the process located at [executable], passing in [args]. 577 /// Spawns and runs the process located at [executable], passing in [args].
652 /// Returns a [Future] that will complete with the results of the process after 578 /// Returns a [Future] that will complete with the results of the process after
653 /// it has ended. 579 /// it has ended.
654 /// 580 ///
655 /// The spawned process will inherit its parent's environment variables. If 581 /// The spawned process will inherit its parent's environment variables. If
656 /// [environment] is provided, that will be used to augment (not replace) the 582 /// [environment] is provided, that will be used to augment (not replace) the
657 /// the inherited variables. 583 /// the inherited variables.
658 Future<PubProcessResult> runProcess(String executable, List<String> args, 584 Future<PubProcessResult> runProcess(String executable, List<String> args,
659 {workingDir, Map<String, String> environment}) { 585 {workingDir, Map<String, String> environment}) {
660 return _doProcess(Process.run, executable, args, workingDir, environment) 586 return _doProcess(Process.run, executable, args, workingDir, environment)
(...skipping 13 matching lines...) Expand all
674 return pubResult; 600 return pubResult;
675 }); 601 });
676 } 602 }
677 603
678 /// Spawns the process located at [executable], passing in [args]. Returns a 604 /// Spawns the process located at [executable], passing in [args]. Returns a
679 /// [Future] that will complete with the [Process] once it's been started. 605 /// [Future] that will complete with the [Process] once it's been started.
680 /// 606 ///
681 /// The spawned process will inherit its parent's environment variables. If 607 /// The spawned process will inherit its parent's environment variables. If
682 /// [environment] is provided, that will be used to augment (not replace) the 608 /// [environment] is provided, that will be used to augment (not replace) the
683 /// the inherited variables. 609 /// the inherited variables.
684 Future<Process> startProcess(String executable, List<String> args, 610 Future<PubProcess> startProcess(String executable, List<String> args,
685 {workingDir, Map<String, String> environment}) => 611 {workingDir, Map<String, String> environment}) =>
686 _doProcess(Process.start, executable, args, workingDir, environment) 612 _doProcess(Process.start, executable, args, workingDir, environment)
687 .then((process) => new _WrappedProcess(process)); 613 .then((process) => new PubProcess(process));
688 614
689 /// A wrapper around [Process] that buffers the stdout and stderr to avoid 615 /// A wrapper around [Process] that exposes `dart:async`-style APIs.
690 /// running into issue 7218. 616 class PubProcess {
691 class _WrappedProcess implements Process { 617 /// The underlying `dart:io` [Process].
692 final Process _process; 618 final Process _process;
693 final InputStream stderr;
694 final InputStream stdout;
695 619
696 OutputStream get stdin => _process.stdin; 620 /// The mutable field for [stdin].
621 StreamSink<List<int>> _stdin;
697 622
698 void set onExit(void callback(int exitCode)) { 623 /// The mutable field for [stdinClosed].
699 _process.onExit = callback; 624 Future _stdinClosed;
625
626 /// The mutable field for [stdout].
627 ByteStream _stdout;
628
629 /// The mutable field for [stderr].
630 ByteStream _stderr;
631
632 /// The mutable field for [exitCode].
633 Future<int> _exitCode;
634
635 /// The sink used for passing data to the process's standard input stream.
636 /// Errors on this stream are surfaced through [stdinClosed], [stdout],
637 /// [stderr], and [exitCode], which are all members of an [ErrorGroup].
638 StreamSink<List<int>> get stdin => _stdin;
639
640 // TODO(nweiz): write some more sophisticated Future machinery so that this
641 // doesn't surface errors from the other streams/futures, but still passes its
642 // unhandled errors to them. Right now it's impossible to recover from a stdin
643 // error and continue interacting with the process.
644 /// A [Future] that completes when [stdin] is closed, either by the user or by
645 /// the process itself.
646 ///
647 /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any
648 /// error in process will be passed to it, but won't reach the top-level error
649 /// handler unless nothing has handled it.
650 Future get stdinClosed => _stdinClosed;
651
652 /// The process's standard output stream.
653 ///
654 /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode],
655 /// so any error in process will be passed to it, but won't reach the
656 /// top-level error handler unless nothing has handled it.
657 ByteStream get stdout => _stdout;
658
659 /// The process's standard error stream.
660 ///
661 /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode],
662 /// so any error in process will be passed to it, but won't reach the
663 /// top-level error handler unless nothing has handled it.
664 ByteStream get stderr => _stderr;
665
666 /// A [Future] that will complete to the process's exit code once the process
667 /// has finished running.
668 ///
669 /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so
670 /// any error in process will be passed to it, but won't reach the top-level
671 /// error handler unless nothing has handled it.
672 Future<int> get exitCode => _exitCode;
673
674 /// Creates a new [PubProcess] wrapping [process].
675 PubProcess(Process process)
676 : _process = process {
677 var errorGroup = new ErrorGroup();
678
679 var pair = consumerToSink(wrapOutputStream(process.stdin));
680 _stdin = pair.first;
681 _stdinClosed = errorGroup.registerFuture(pair.last);
682
683 _stdout = new ByteStream(
684 errorGroup.registerStream(wrapInputStream(process.stdout)));
685 _stderr = new ByteStream(
686 errorGroup.registerStream(wrapInputStream(process.stderr)));
687
688 var exitCodeCompleter = new Completer();
689 _exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
690 _process.onExit = (code) => exitCodeCompleter.complete(code);
700 } 691 }
701 692
702 _WrappedProcess(Process process) 693 /// Sends [signal] to the underlying process.
703 : _process = process,
704 stderr = _wrapInputStream(process.stderr),
705 stdout = _wrapInputStream(process.stdout);
706
707 bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => 694 bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) =>
708 _process.kill(signal); 695 _process.kill(signal);
709
710 /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source]
711 /// input stream. This is useful for spawned processes which will not exit
712 /// until their output streams have been drained. TODO(rnystrom): We should
713 /// use this logic anywhere we spawn a process.
714 static InputStream _wrapInputStream(InputStream source) {
715 var sink = new ListInputStream();
716 pipeInputToInput(source, sink);
717 return sink;
718 }
719 } 696 }
720 697
721 /// Calls [fn] with appropriately modified arguments. [fn] should have the same 698 /// Calls [fn] with appropriately modified arguments. [fn] should have the same
722 /// signature as [Process.start], except that the returned [Future] may have a 699 /// signature as [Process.start], except that the returned [Future] may have a
723 /// type other than [Process]. 700 /// type other than [Process].
724 Future _doProcess(Function fn, String executable, List<String> args, workingDir, 701 Future _doProcess(Function fn, String executable, List<String> args, workingDir,
725 Map<String, String> environment) { 702 Map<String, String> environment) {
726 // TODO(rnystrom): Should dart:io just handle this? 703 // TODO(rnystrom): Should dart:io just handle this?
727 // Spawning a process on Windows will not look for the executable in the 704 // Spawning a process on Windows will not look for the executable in the
728 // system path. So, if executable looks like it needs that (i.e. it doesn't 705 // system path. So, if executable looks like it needs that (i.e. it doesn't
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
785 tempDir = dir; 762 tempDir = dir;
786 return fn(tempDir.path); 763 return fn(tempDir.path);
787 }).whenComplete(() { 764 }).whenComplete(() {
788 log.fine('Cleaning up temp directory ${tempDir.path}.'); 765 log.fine('Cleaning up temp directory ${tempDir.path}.');
789 return deleteDir(tempDir); 766 return deleteDir(tempDir);
790 }); 767 });
791 } 768 }
792 769
793 /// Extracts a `.tar.gz` file from [stream] to [destination], which can be a 770 /// Extracts a `.tar.gz` file from [stream] to [destination], which can be a
794 /// directory or a path. Returns whether or not the extraction was successful. 771 /// directory or a path. Returns whether or not the extraction was successful.
795 Future<bool> extractTarGz(InputStream stream, destination) { 772 Future<bool> extractTarGz(Stream<List<int>> stream, destination) {
796 destination = _getPath(destination); 773 destination = _getPath(destination);
797 774
798 log.fine("Extracting .tar.gz stream to $destination."); 775 log.fine("Extracting .tar.gz stream to $destination.");
799 776
800 if (Platform.operatingSystem == "windows") { 777 if (Platform.operatingSystem == "windows") {
801 return _extractTarGzWindows(stream, destination); 778 return _extractTarGzWindows(stream, destination);
802 } 779 }
803 780
804 var completer = new Completer<int>(); 781 return startProcess("tar",
805 var processFuture = startProcess("tar", 782 ["--extract", "--gunzip", "--directory", destination]).then((process) {
806 ["--extract", "--gunzip", "--directory", destination]); 783 // Ignore errors on process.std{out,err}. They'll be passed to
807 processFuture.then((process) { 784 // process.exitCode, and we don't want them being top-levelled by
808 process.onExit = (exitCode) => completer.complete(exitCode); 785 // std{out,err}Sink.
809 stream.pipe(process.stdin); 786 store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false);
810 process.stdout.pipe(stdout, close: false); 787 store(process.stderr.handleError((_) {}), stderrSink, closeSink: false);
811 process.stderr.pipe(stderr, close: false); 788 return Future.wait([
812 }).catchError((e) { 789 store(stream, process.stdin),
813 completer.completeError(e.error, e.stackTrace); 790 process.exitCode
814 }); 791 ]);
815 792 }).then((results) {
816 return completer.future.then((exitCode) { 793 var exitCode = results[1];
794 if (exitCode != 0) {
795 throw "Failed to extract .tar.gz stream to $destination (exit code "
796 "$exitCode).";
797 }
817 log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode."); 798 log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode.");
818 // TODO(rnystrom): Does anything check this result value? If not, it should
819 // throw on a bad exit code.
820 return exitCode == 0;
821 }); 799 });
822 } 800 }
823 801
824 Future<bool> _extractTarGzWindows(InputStream stream, String destination) { 802 Future<bool> _extractTarGzWindows(Stream<List<int>> stream,
803 String destination) {
825 // TODO(rnystrom): In the repo's history, there is an older implementation of 804 // TODO(rnystrom): In the repo's history, there is an older implementation of
826 // this that does everything in memory by piping streams directly together 805 // this that does everything in memory by piping streams directly together
827 // instead of writing out temp files. The code is simpler, but unfortunately, 806 // instead of writing out temp files. The code is simpler, but unfortunately,
828 // 7zip seems to periodically fail when we invoke it from Dart and tell it to 807 // 7zip seems to periodically fail when we invoke it from Dart and tell it to
829 // read from stdin instead of a file. Consider resurrecting that version if 808 // read from stdin instead of a file. Consider resurrecting that version if
830 // we can figure out why it fails. 809 // we can figure out why it fails.
831 810
832 // Note: This line of code gets munged by create_sdk.py to be the correct 811 // Note: This line of code gets munged by create_sdk.py to be the correct
833 // relative path to 7zip in the SDK. 812 // relative path to 7zip in the SDK.
834 var pathTo7zip = '../../third_party/7zip/7za.exe'; 813 var pathTo7zip = '../../third_party/7zip/7za.exe';
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
877 856
878 log.fine('Clean up 7zip temp directory ${tempDir.path}.'); 857 log.fine('Clean up 7zip temp directory ${tempDir.path}.');
879 // TODO(rnystrom): Should also delete this if anything fails. 858 // TODO(rnystrom): Should also delete this if anything fails.
880 return deleteDir(tempDir); 859 return deleteDir(tempDir);
881 }).then((_) => true); 860 }).then((_) => true);
882 } 861 }
883 862
884 /// Create a .tar.gz archive from a list of entries. Each entry can be a 863 /// Create a .tar.gz archive from a list of entries. Each entry can be a
885 /// [String], [Directory], or [File] object. The root of the archive is 864 /// [String], [Directory], or [File] object. The root of the archive is
886 /// considered to be [baseDir], which defaults to the current working directory. 865 /// considered to be [baseDir], which defaults to the current working directory.
887 /// Returns an [InputStream] that will emit the contents of the archive. 866 /// Returns a [ByteStream] that will emit the contents of the archive.
888 InputStream createTarGz(List contents, {baseDir}) { 867 ByteStream createTarGz(List contents, {baseDir}) {
889 var buffer = new StringBuffer(); 868 var buffer = new StringBuffer();
890 buffer.add('Creating .tag.gz stream containing:\n'); 869 buffer.add('Creating .tag.gz stream containing:\n');
891 contents.forEach((file) => buffer.add('$file\n')); 870 contents.forEach((file) => buffer.add('$file\n'));
892 log.fine(buffer.toString()); 871 log.fine(buffer.toString());
893 872
894 // TODO(nweiz): Propagate errors to the returned stream (including non-zero 873 // TODO(nweiz): Propagate errors to the returned stream (including non-zero
895 // exit codes). See issue 3657. 874 // exit codes). See issue 3657.
896 var stream = new ListInputStream(); 875 var controller = new StreamController<List<int>>();
897 876
898 if (baseDir == null) baseDir = path.current; 877 if (baseDir == null) baseDir = path.current;
899 baseDir = getFullPath(baseDir); 878 baseDir = getFullPath(baseDir);
900 contents = contents.map((entry) { 879 contents = contents.map((entry) {
901 entry = getFullPath(entry); 880 entry = getFullPath(entry);
902 if (!isBeneath(entry, baseDir)) { 881 if (!isBeneath(entry, baseDir)) {
903 throw 'Entry $entry is not inside $baseDir.'; 882 throw 'Entry $entry is not inside $baseDir.';
904 } 883 }
905 return relativeTo(entry, baseDir); 884 return relativeTo(entry, baseDir);
906 }).toList(); 885 }).toList();
907 886
908 if (Platform.operatingSystem != "windows") { 887 if (Platform.operatingSystem != "windows") {
909 var args = ["--create", "--gzip", "--directory", baseDir]; 888 var args = ["--create", "--gzip", "--directory", baseDir];
910 args.addAll(contents.map(_getPath)); 889 args.addAll(contents.map(_getPath));
911 // TODO(nweiz): It's possible that enough command-line arguments will make 890 // TODO(nweiz): It's possible that enough command-line arguments will make
912 // the process choke, so at some point we should save the arguments to a 891 // the process choke, so at some point we should save the arguments to a
913 // file and pass them in via --files-from for tar and -i@filename for 7zip. 892 // file and pass them in via --files-from for tar and -i@filename for 7zip.
914 startProcess("tar", args).then((process) { 893 startProcess("tar", args).then((process) {
915 pipeInputToInput(process.stdout, stream); 894 store(process.stdout, controller);
916 895 }).catchError((e) {
917 // Drain and discard 7zip's stderr. 7zip writes its normal output to 896 // We don't have to worry about double-signaling here, since the store()
918 // stderr. We don't want to show that since it's meaningless. 897 // above will only be reached if startProcess succeeds.
919 // TODO(rnystrom): Should log this and display it if an actual error 898 controller.signalError(e.error, e.stackTrace);
920 // occurs. 899 controller.close();
921 consumeInputStream(process.stderr);
922 }); 900 });
923 return stream; 901 return new ByteStream(controller.stream);
924 } 902 }
925 903
926 withTempDir((tempDir) { 904 withTempDir((tempDir) {
927 // Create the tar file. 905 // Create the tar file.
928 var tarFile = join(tempDir, "intermediate.tar"); 906 var tarFile = join(tempDir, "intermediate.tar");
929 var args = ["a", "-w$baseDir", tarFile]; 907 var args = ["a", "-w$baseDir", tarFile];
930 args.addAll(contents.map((entry) => '-i!"$entry"')); 908 args.addAll(contents.map((entry) => '-i!"$entry"'));
931 909
932 // Note: This line of code gets munged by create_sdk.py to be the correct 910 // Note: This line of code gets munged by create_sdk.py to be the correct
933 // relative path to 7zip in the SDK. 911 // relative path to 7zip in the SDK.
934 var pathTo7zip = '../../third_party/7zip/7za.exe'; 912 var pathTo7zip = '../../third_party/7zip/7za.exe';
935 var command = relativeToPub(pathTo7zip); 913 var command = relativeToPub(pathTo7zip);
936 914
937 // We're passing 'baseDir' both as '-w' and setting it as the working 915 // We're passing 'baseDir' both as '-w' and setting it as the working
938 // directory explicitly here intentionally. The former ensures that the 916 // directory explicitly here intentionally. The former ensures that the
939 // files added to the archive have the correct relative path in the archive. 917 // files added to the archive have the correct relative path in the archive.
940 // The latter enables relative paths in the "-i" args to be resolved. 918 // The latter enables relative paths in the "-i" args to be resolved.
941 return runProcess(command, args, workingDir: baseDir).then((_) { 919 return runProcess(command, args, workingDir: baseDir).then((_) {
942 // GZIP it. 7zip doesn't support doing both as a single operation. Send 920 // GZIP it. 7zip doesn't support doing both as a single operation. Send
943 // the output to stdout. 921 // the output to stdout.
944 args = ["a", "unused", "-tgzip", "-so", tarFile]; 922 args = ["a", "unused", "-tgzip", "-so", tarFile];
945 return startProcess(command, args); 923 return startProcess(command, args);
946 }).then((process) { 924 }).then((process) {
947 // Drain and discard 7zip's stderr. 7zip writes its normal output to 925 // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't
948 // stderr. We don't want to show that since it's meaningless. 926 // want to show that since it's meaningless.
949 // TODO(rnystrom): Should log this and display it if an actual error 927 //
928 // TODO(rnystrom): Should log the stderr and display it if an actual error
950 // occurs. 929 // occurs.
951 consumeInputStream(process.stderr); 930 store(process.stdout, controller);
952 return pipeInputToInput(process.stdout, stream);
953 }); 931 });
932 }).catchError((e) {
933 // We don't have to worry about double-signaling here, since the store()
934 // above will only be reached if everything succeeds.
935 controller.signalError(e.error, e.stackTrace);
936 controller.close();
954 }); 937 });
955 return stream; 938 return new ByteStream(controller.stream);
956 } 939 }
957 940
958 /// Exception thrown when an operation times out. 941 /// Exception thrown when an operation times out.
959 class TimeoutException implements Exception { 942 class TimeoutException implements Exception {
960 final String message; 943 final String message;
961 944
962 const TimeoutException(this.message); 945 const TimeoutException(this.message);
963 946
964 String toString() => message; 947 String toString() => message;
965 } 948 }
(...skipping 24 matching lines...) Expand all
990 Directory _getDirectory(entry) { 973 Directory _getDirectory(entry) {
991 if (entry is Directory) return entry; 974 if (entry is Directory) return entry;
992 return new Directory(entry); 975 return new Directory(entry);
993 } 976 }
994 977
995 /// Gets a [Uri] for [uri], which can either already be one, or be a [String]. 978 /// Gets a [Uri] for [uri], which can either already be one, or be a [String].
996 Uri _getUri(uri) { 979 Uri _getUri(uri) {
997 if (uri is Uri) return uri; 980 if (uri is Uri) return uri;
998 return Uri.parse(uri); 981 return Uri.parse(uri);
999 } 982 }
OLDNEW
« no previous file with comments | « utils/pub/hosted_source.dart ('k') | utils/pub/log.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698