OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 /// Helper functionality to make working with IO easier. | 5 /// Helper functionality to make working with IO easier. |
6 library io; | 6 library io; |
7 | 7 |
8 import 'dart:async'; | 8 import 'dart:async'; |
9 import 'dart:io'; | 9 import 'dart:io'; |
10 import 'dart:isolate'; | 10 import 'dart:isolate'; |
11 import 'dart:json'; | 11 import 'dart:json'; |
12 import 'dart:uri'; | 12 import 'dart:uri'; |
13 | 13 |
14 import '../../pkg/path/lib/path.dart' as path; | 14 import '../../pkg/path/lib/path.dart' as path; |
| 15 import '../../pkg/http/lib/http.dart' show ByteStream; |
| 16 import 'error_group.dart'; |
| 17 import 'exit_codes.dart' as exit_codes; |
15 import 'log.dart' as log; | 18 import 'log.dart' as log; |
16 import 'utils.dart'; | 19 import 'utils.dart'; |
17 | 20 |
| 21 export '../../pkg/http/lib/http.dart' show ByteStream; |
| 22 |
18 final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?"); | 23 final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?"); |
19 | 24 |
20 /// Joins a number of path string parts into a single path. Handles | 25 /// Joins a number of path string parts into a single path. Handles |
21 /// platform-specific path separators. Parts can be [String], [Directory], or | 26 /// platform-specific path separators. Parts can be [String], [Directory], or |
22 /// [File] objects. | 27 /// [File] objects. |
23 String join(part1, [part2, part3, part4, part5, part6, part7, part8]) { | 28 String join(part1, [part2, part3, part4, part5, part6, part7, part8]) { |
24 var parts = [part1, part2, part3, part4, part5, part6, part7, part8] | 29 var parts = [part1, part2, part3, part4, part5, part6, part7, part8] |
25 .map((part) => part == null ? null : _getPath(part)).toList(); | 30 .map((part) => part == null ? null : _getPath(part)).toList(); |
26 | 31 |
27 return path.join(parts[0], parts[1], parts[2], parts[3], parts[4], parts[5], | 32 return path.join(parts[0], parts[1], parts[2], parts[3], parts[4], parts[5], |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
137 /// a [Future] that completes when the deletion is done. | 142 /// a [Future] that completes when the deletion is done. |
138 Future<File> deleteFile(file) { | 143 Future<File> deleteFile(file) { |
139 var path = _getPath(file); | 144 var path = _getPath(file); |
140 return log.ioAsync("delete file $path", | 145 return log.ioAsync("delete file $path", |
141 new File(path).delete()); | 146 new File(path).delete()); |
142 } | 147 } |
143 | 148 |
144 /// Writes [stream] to a new file at [path], which may be a [String] or a | 149 /// Writes [stream] to a new file at [path], which may be a [String] or a |
145 /// [File]. Will replace any file already at that path. Completes when the file | 150 /// [File]. Will replace any file already at that path. Completes when the file |
146 /// is done being written. | 151 /// is done being written. |
147 Future<File> createFileFromStream(InputStream stream, path) { | 152 Future<File> createFileFromStream(Stream<List<int>> stream, path) { |
148 path = _getPath(path); | 153 path = _getPath(path); |
149 | 154 |
150 log.io("Creating $path from stream."); | 155 log.io("Creating $path from stream."); |
151 | 156 |
152 var completer = new Completer<File>(); | |
153 var completed = false; | |
154 var file = new File(path); | 157 var file = new File(path); |
155 var outputStream = file.openOutputStream(); | 158 return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) { |
156 stream.pipe(outputStream); | |
157 | |
158 outputStream.onClosed = () { | |
159 log.fine("Created $path from stream."); | 159 log.fine("Created $path from stream."); |
160 completed = true; | 160 }); |
161 completer.complete(file); | |
162 }; | |
163 | |
164 // TODO(nweiz): remove this when issue 4061 is fixed. | |
165 var stackTrace; | |
166 try { | |
167 throw ""; | |
168 } catch (_, localStackTrace) { | |
169 stackTrace = localStackTrace; | |
170 } | |
171 | |
172 completeError(error) { | |
173 if (!completed) { | |
174 completed = true; | |
175 completer.completeError(error, stackTrace); | |
176 } else { | |
177 log.fine("Got error after stream was closed: $error"); | |
178 } | |
179 } | |
180 | |
181 stream.onError = completeError; | |
182 outputStream.onError = completeError; | |
183 | |
184 return completer.future; | |
185 } | 161 } |
186 | 162 |
187 /// Creates a directory [dir]. Returns a [Future] that completes when the | 163 /// Creates a directory [dir]. Returns a [Future] that completes when the |
188 /// directory is created. | 164 /// directory is created. |
189 Future<Directory> createDir(dir) { | 165 Future<Directory> createDir(dir) { |
190 dir = _getDirectory(dir); | 166 dir = _getDirectory(dir); |
191 return log.ioAsync("create directory ${dir.path}", | 167 return log.ioAsync("create directory ${dir.path}", |
192 dir.create()); | 168 dir.create()); |
193 } | 169 } |
194 | 170 |
(...skipping 255 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
450 // which also live under "utils", or from the SDK where pub is in "util". | 426 // which also live under "utils", or from the SDK where pub is in "util". |
451 var utilDir = dirname(scriptPath); | 427 var utilDir = dirname(scriptPath); |
452 while (basename(utilDir) != 'utils' && basename(utilDir) != 'util') { | 428 while (basename(utilDir) != 'utils' && basename(utilDir) != 'util') { |
453 if (basename(utilDir) == '') throw 'Could not find path to pub.'; | 429 if (basename(utilDir) == '') throw 'Could not find path to pub.'; |
454 utilDir = dirname(utilDir); | 430 utilDir = dirname(utilDir); |
455 } | 431 } |
456 | 432 |
457 return path.normalize(join(utilDir, 'pub', target)); | 433 return path.normalize(join(utilDir, 'pub', target)); |
458 } | 434 } |
459 | 435 |
460 /// A StringInputStream reading from stdin. | 436 // TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr |
461 final _stringStdin = new StringInputStream(stdin); | 437 // nicer. |
| 438 |
| 439 /// A sink that writes to standard output. Errors piped to this stream will be |
| 440 /// surfaced to the top-level error handler. |
| 441 final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout"); |
| 442 |
| 443 /// A sink that writes to standard error. Errors piped to this stream will be |
| 444 /// surfaced to the top-level error handler. |
| 445 final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); |
| 446 |
| 447 /// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are |
| 448 /// logged, and then the program is terminated. [name] is used for debugging. |
| 449 StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { |
| 450 var pair = consumerToSink(wrapOutputStream(stream)); |
| 451 pair.last.catchError((e) { |
| 452 // This log may or may not work, depending on how the stream failed. Not |
| 453 // much we can do about that. |
| 454 log.error("Error writing to $name: $e"); |
| 455 exit(exit_codes.IO); |
| 456 }); |
| 457 return pair.first; |
| 458 } |
| 459 |
| 460 /// A line-by-line stream of standard input. |
| 461 final Stream<String> stdinLines = |
| 462 streamToLines(wrapInputStream(stdin).toStringStream()); |
462 | 463 |
463 /// Displays a message and reads a yes/no confirmation from the user. Returns | 464 /// Displays a message and reads a yes/no confirmation from the user. Returns |
464 /// a [Future] that completes to `true` if the user confirms or `false` if they | 465 /// a [Future] that completes to `true` if the user confirms or `false` if they |
465 /// do not. | 466 /// do not. |
466 /// | 467 /// |
467 /// This will automatically append " (y/n)?" to the message, so [message] | 468 /// This will automatically append " (y/n)?" to the message, so [message] |
468 /// should just be a fragment like, "Are you sure you want to proceed". | 469 /// should just be a fragment like, "Are you sure you want to proceed". |
469 Future<bool> confirm(String message) { | 470 Future<bool> confirm(String message) { |
470 log.fine('Showing confirm message: $message'); | 471 log.fine('Showing confirm message: $message'); |
471 stdout.writeString("$message (y/n)? "); | 472 stdoutSink.add("$message (y/n)? ".charCodes); |
472 return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line)); | 473 return streamFirst(stdinLines) |
473 } | 474 .then((line) => new RegExp(r"^[yY]").hasMatch(line)); |
474 | |
475 /// Returns a single line read from a [StringInputStream]. By default, reads | |
476 /// from stdin. | |
477 /// | |
478 /// A [StringInputStream] passed to this should have no callbacks registered. | |
479 Future<String> readLine([StringInputStream stream]) { | |
480 if (stream == null) stream = _stringStdin; | |
481 if (stream.closed) return new Future.immediate(''); | |
482 void removeCallbacks() { | |
483 stream.onClosed = null; | |
484 stream.onLine = null; | |
485 stream.onError = null; | |
486 } | |
487 | |
488 // TODO(nweiz): remove this when issue 4061 is fixed. | |
489 var stackTrace; | |
490 try { | |
491 throw ""; | |
492 } catch (_, localStackTrace) { | |
493 stackTrace = localStackTrace; | |
494 } | |
495 | |
496 var completer = new Completer(); | |
497 stream.onClosed = () { | |
498 removeCallbacks(); | |
499 completer.complete(''); | |
500 }; | |
501 | |
502 stream.onLine = () { | |
503 removeCallbacks(); | |
504 var line = stream.readLine(); | |
505 log.io('Read line: $line'); | |
506 completer.complete(line); | |
507 }; | |
508 | |
509 stream.onError = (e) { | |
510 removeCallbacks(); | |
511 completer.completeError(e, stackTrace); | |
512 }; | |
513 | |
514 return completer.future; | |
515 } | |
516 | |
517 /// Takes all input from [source] and writes it to [sink]. | |
518 /// | |
519 /// Returns a future that completes when [source] is closed. | |
520 Future pipeInputToInput(InputStream source, ListInputStream sink) { | |
521 var completer = new Completer(); | |
522 source.onClosed = () { | |
523 sink.markEndOfStream(); | |
524 completer.complete(null); | |
525 }; | |
526 source.onData = () { | |
527 // Even if the sink is closed and we aren't going to do anything with more | |
528 // data, we still need to drain it from source to work around issue 7218. | |
529 var data = source.read(); | |
530 try { | |
531 if (!sink.closed) sink.write(data); | |
532 } on StreamException catch (e, stackTrace) { | |
533 // Ignore an exception to work around issue 4222. | |
534 log.io("Writing to an unclosed ListInputStream caused exception $e\n" | |
535 "$stackTrace"); | |
536 } | |
537 }; | |
538 // TODO(nweiz): propagate this error to the sink. See issue 3657. | |
539 source.onError = (e) { throw e; }; | |
540 return completer.future; | |
541 } | |
542 | |
543 /// Buffers all input from an InputStream and returns it as a future. | |
544 Future<List<int>> consumeInputStream(InputStream stream) { | |
545 if (stream.closed) return new Future.immediate(<int>[]); | |
546 | |
547 // TODO(nweiz): remove this when issue 4061 is fixed. | |
548 var stackTrace; | |
549 try { | |
550 throw ""; | |
551 } catch (_, localStackTrace) { | |
552 stackTrace = localStackTrace; | |
553 } | |
554 | |
555 var completer = new Completer<List<int>>(); | |
556 var buffer = <int>[]; | |
557 stream.onClosed = () => completer.complete(buffer); | |
558 stream.onData = () => buffer.addAll(stream.read()); | |
559 stream.onError = (e) => completer.completeError(e, stackTrace); | |
560 return completer.future; | |
561 } | |
562 | |
563 /// Buffers all input from a StringInputStream and returns it as a future. | |
564 Future<String> consumeStringInputStream(StringInputStream stream) { | |
565 if (stream.closed) return new Future.immediate(''); | |
566 | |
567 // TODO(nweiz): remove this when issue 4061 is fixed. | |
568 var stackTrace; | |
569 try { | |
570 throw ""; | |
571 } catch (_, localStackTrace) { | |
572 stackTrace = localStackTrace; | |
573 } | |
574 | |
575 var completer = new Completer<String>(); | |
576 var buffer = new StringBuffer(); | |
577 stream.onClosed = () => completer.complete(buffer.toString()); | |
578 stream.onData = () => buffer.add(stream.read()); | |
579 stream.onError = (e) => completer.completeError(e, stackTrace); | |
580 return completer.future; | |
581 } | 475 } |
582 | 476 |
583 /// Wraps [stream] in a single-subscription [Stream] that emits the same data. | 477 /// Wraps [stream] in a single-subscription [Stream] that emits the same data. |
584 Stream<List<int>> wrapInputStream(InputStream stream) { | 478 ByteStream wrapInputStream(InputStream stream) { |
585 var controller = new StreamController(); | 479 var controller = new StreamController(); |
586 if (stream.closed) { | 480 if (stream.closed) { |
587 controller.close(); | 481 controller.close(); |
588 return controller.stream; | 482 return new ByteStream(controller.stream); |
589 } | 483 } |
590 | 484 |
591 stream.onClosed = controller.close; | 485 stream.onClosed = controller.close; |
592 stream.onData = () => controller.add(stream.read()); | 486 stream.onData = () => controller.add(stream.read()); |
593 stream.onError = (e) => controller.signalError(new AsyncError(e)); | 487 stream.onError = (e) => controller.signalError(new AsyncError(e)); |
594 return controller.stream; | 488 return new ByteStream(controller.stream); |
595 } | |
596 | |
597 // TODO(nweiz): remove this ASAP (issue 7807). | |
598 /// Wraps [stream] in an [InputStream]. | |
599 InputStream streamToInputStream(Stream<List<int>> stream) { | |
600 var inputStream = new ListInputStream(); | |
601 stream.listen((chunk) => inputStream.write(chunk), | |
602 onDone: inputStream.markEndOfStream); | |
603 return inputStream; | |
604 } | 489 } |
605 | 490 |
606 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it | 491 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
607 /// using [Stream.pipe]. | 492 /// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be |
| 493 /// forwarded to the [Future] returned by [Stream.pipe]. |
608 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => | 494 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
609 new _OutputStreamConsumer(stream); | 495 new _OutputStreamConsumer(stream); |
610 | 496 |
611 /// A [StreamConsumer] that pipes data into an [OutputStream]. | 497 /// A [StreamConsumer] that pipes data into an [OutputStream]. |
612 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { | 498 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
613 final OutputStream _outputStream; | 499 final OutputStream _outputStream; |
614 | 500 |
615 _OutputStreamConsumer(this._outputStream); | 501 _OutputStreamConsumer(this._outputStream); |
616 | 502 |
617 Future consume(Stream<List<int>> stream) { | 503 Future consume(Stream<List<int>> stream) { |
618 // TODO(nweiz): we have to manually keep track of whether or not the | 504 // TODO(nweiz): we have to manually keep track of whether or not the |
619 // completer has completed since the output stream could signal an error | 505 // completer has completed since the output stream could signal an error |
620 // after close() has been called but before it has shut down internally. See | 506 // after close() has been called but before it has shut down internally. See |
621 // the following TODO. | 507 // the following TODO. |
622 var completed = false; | 508 var completed = false; |
623 var completer = new Completer(); | 509 var completer = new Completer(); |
624 stream.listen((data) { | 510 stream.listen((data) { |
625 // Writing empty data to a closed stream can cause errors. | 511 // Writing empty data to a closed stream can cause errors. |
626 if (data.isEmpty) return; | 512 if (data.isEmpty) return; |
627 | 513 |
628 // TODO(nweiz): remove this try/catch when issue 7836 is fixed. | 514 // TODO(nweiz): remove this try/catch when issue 7836 is fixed. |
629 try { | 515 try { |
630 _outputStream.write(data); | 516 _outputStream.write(data); |
631 } catch (e, stack) { | 517 } catch (e, stack) { |
632 if (!completed) completer.completeError(e, stack); | 518 if (!completed) completer.completeError(e, stack); |
633 completed = true; | 519 completed = true; |
634 } | 520 } |
| 521 }, onError: (e) { |
| 522 if (!completed) completer.completeError(e.error, e.stackTrace); |
| 523 completed = true; |
635 }, onDone: () => _outputStream.close()); | 524 }, onDone: () => _outputStream.close()); |
636 | 525 |
637 _outputStream.onError = (e) { | 526 _outputStream.onError = (e) { |
638 if (!completed) completer.completeError(e); | 527 if (!completed) completer.completeError(e); |
639 completed = true; | 528 completed = true; |
640 }; | 529 }; |
641 | 530 |
642 _outputStream.onClosed = () { | 531 _outputStream.onClosed = () { |
643 if (!completed) completer.complete(); | 532 if (!completed) completer.complete(); |
644 completed = true; | 533 completed = true; |
645 }; | 534 }; |
646 | 535 |
647 return completer.future; | 536 return completer.future; |
648 } | 537 } |
649 } | 538 } |
650 | 539 |
| 540 /// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that |
| 541 /// will succeed when [StreamSink] is closed or fail with any errors that occur |
| 542 /// while writing. |
| 543 Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) { |
| 544 var controller = new StreamController(); |
| 545 var done = controller.stream.pipe(consumer); |
| 546 return new Pair<StreamSink, Future>(controller.sink, done); |
| 547 } |
| 548 |
| 549 // TODO(nweiz): remove this when issue 7786 is fixed. |
| 550 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
| 551 /// the returned [Future] is completed and [sink] is closed if [closeSink] is |
| 552 /// true. |
| 553 /// |
| 554 /// When an error occurs on [stream], that error is passed to [sink]. If |
| 555 /// [unsubscribeOnError] is true, [Future] will be completed successfully and no |
| 556 /// more data or errors will be piped from [stream] to [sink]. If |
| 557 /// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be |
| 558 /// closed. |
| 559 Future store(Stream stream, StreamSink sink, |
| 560 {bool unsubscribeOnError: true, closeSink: true}) { |
| 561 var completer = new Completer(); |
| 562 stream.listen(sink.add, |
| 563 onError: (e) { |
| 564 sink.signalError(e); |
| 565 if (unsubscribeOnError) { |
| 566 completer.complete(); |
| 567 if (closeSink) sink.close(); |
| 568 } |
| 569 }, |
| 570 onDone: () { |
| 571 if (closeSink) sink.close(); |
| 572 completer.complete(); |
| 573 }, unsubscribeOnError: unsubscribeOnError); |
| 574 return completer.future; |
| 575 } |
| 576 |
651 /// Spawns and runs the process located at [executable], passing in [args]. | 577 /// Spawns and runs the process located at [executable], passing in [args]. |
652 /// Returns a [Future] that will complete with the results of the process after | 578 /// Returns a [Future] that will complete with the results of the process after |
653 /// it has ended. | 579 /// it has ended. |
654 /// | 580 /// |
655 /// The spawned process will inherit its parent's environment variables. If | 581 /// The spawned process will inherit its parent's environment variables. If |
656 /// [environment] is provided, that will be used to augment (not replace) the | 582 /// [environment] is provided, that will be used to augment (not replace) the |
657 /// the inherited variables. | 583 /// the inherited variables. |
658 Future<PubProcessResult> runProcess(String executable, List<String> args, | 584 Future<PubProcessResult> runProcess(String executable, List<String> args, |
659 {workingDir, Map<String, String> environment}) { | 585 {workingDir, Map<String, String> environment}) { |
660 return _doProcess(Process.run, executable, args, workingDir, environment) | 586 return _doProcess(Process.run, executable, args, workingDir, environment) |
(...skipping 13 matching lines...) Expand all Loading... |
674 return pubResult; | 600 return pubResult; |
675 }); | 601 }); |
676 } | 602 } |
677 | 603 |
678 /// Spawns the process located at [executable], passing in [args]. Returns a | 604 /// Spawns the process located at [executable], passing in [args]. Returns a |
679 /// [Future] that will complete with the [Process] once it's been started. | 605 /// [Future] that will complete with the [Process] once it's been started. |
680 /// | 606 /// |
681 /// The spawned process will inherit its parent's environment variables. If | 607 /// The spawned process will inherit its parent's environment variables. If |
682 /// [environment] is provided, that will be used to augment (not replace) the | 608 /// [environment] is provided, that will be used to augment (not replace) the |
683 /// the inherited variables. | 609 /// the inherited variables. |
684 Future<Process> startProcess(String executable, List<String> args, | 610 Future<PubProcess> startProcess(String executable, List<String> args, |
685 {workingDir, Map<String, String> environment}) => | 611 {workingDir, Map<String, String> environment}) => |
686 _doProcess(Process.start, executable, args, workingDir, environment) | 612 _doProcess(Process.start, executable, args, workingDir, environment) |
687 .then((process) => new _WrappedProcess(process)); | 613 .then((process) => new PubProcess(process)); |
688 | 614 |
689 /// A wrapper around [Process] that buffers the stdout and stderr to avoid | 615 /// A wrapper around [Process] that exposes `dart:async`-style APIs. |
690 /// running into issue 7218. | 616 class PubProcess { |
691 class _WrappedProcess implements Process { | 617 /// The underlying `dart:io` [Process]. |
692 final Process _process; | 618 final Process _process; |
693 final InputStream stderr; | |
694 final InputStream stdout; | |
695 | 619 |
696 OutputStream get stdin => _process.stdin; | 620 /// The mutable field for [stdin]. |
| 621 StreamSink<List<int>> _stdin; |
697 | 622 |
698 void set onExit(void callback(int exitCode)) { | 623 /// The mutable field for [stdinClosed]. |
699 _process.onExit = callback; | 624 Future _stdinClosed; |
| 625 |
| 626 /// The mutable field for [stdout]. |
| 627 ByteStream _stdout; |
| 628 |
| 629 /// The mutable field for [stderr]. |
| 630 ByteStream _stderr; |
| 631 |
| 632 /// The mutable field for [exitCode]. |
| 633 Future<int> _exitCode; |
| 634 |
| 635 /// The sink used for passing data to the process's standard input stream. |
| 636 /// Errors on this stream are surfaced through [stdinClosed], [stdout], |
| 637 /// [stderr], and [exitCode], which are all members of an [ErrorGroup]. |
| 638 StreamSink<List<int>> get stdin => _stdin; |
| 639 |
| 640 // TODO(nweiz): write some more sophisticated Future machinery so that this |
| 641 // doesn't surface errors from the other streams/futures, but still passes its |
| 642 // unhandled errors to them. Right now it's impossible to recover from a stdin |
| 643 // error and continue interacting with the process. |
| 644 /// A [Future] that completes when [stdin] is closed, either by the user or by |
| 645 /// the process itself. |
| 646 /// |
| 647 /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any |
| 648 /// error in process will be passed to it, but won't reach the top-level error |
| 649 /// handler unless nothing has handled it. |
| 650 Future get stdinClosed => _stdinClosed; |
| 651 |
| 652 /// The process's standard output stream. |
| 653 /// |
| 654 /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode], |
| 655 /// so any error in process will be passed to it, but won't reach the |
| 656 /// top-level error handler unless nothing has handled it. |
| 657 ByteStream get stdout => _stdout; |
| 658 |
| 659 /// The process's standard error stream. |
| 660 /// |
| 661 /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode], |
| 662 /// so any error in process will be passed to it, but won't reach the |
| 663 /// top-level error handler unless nothing has handled it. |
| 664 ByteStream get stderr => _stderr; |
| 665 |
| 666 /// A [Future] that will complete to the process's exit code once the process |
| 667 /// has finished running. |
| 668 /// |
| 669 /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so |
| 670 /// any error in process will be passed to it, but won't reach the top-level |
| 671 /// error handler unless nothing has handled it. |
| 672 Future<int> get exitCode => _exitCode; |
| 673 |
| 674 /// Creates a new [PubProcess] wrapping [process]. |
| 675 PubProcess(Process process) |
| 676 : _process = process { |
| 677 var errorGroup = new ErrorGroup(); |
| 678 |
| 679 var pair = consumerToSink(wrapOutputStream(process.stdin)); |
| 680 _stdin = pair.first; |
| 681 _stdinClosed = errorGroup.registerFuture(pair.last); |
| 682 |
| 683 _stdout = new ByteStream( |
| 684 errorGroup.registerStream(wrapInputStream(process.stdout))); |
| 685 _stderr = new ByteStream( |
| 686 errorGroup.registerStream(wrapInputStream(process.stderr))); |
| 687 |
| 688 var exitCodeCompleter = new Completer(); |
| 689 _exitCode = errorGroup.registerFuture(exitCodeCompleter.future); |
| 690 _process.onExit = (code) => exitCodeCompleter.complete(code); |
700 } | 691 } |
701 | 692 |
702 _WrappedProcess(Process process) | 693 /// Sends [signal] to the underlying process. |
703 : _process = process, | |
704 stderr = _wrapInputStream(process.stderr), | |
705 stdout = _wrapInputStream(process.stdout); | |
706 | |
707 bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => | 694 bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => |
708 _process.kill(signal); | 695 _process.kill(signal); |
709 | |
710 /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] | |
711 /// input stream. This is useful for spawned processes which will not exit | |
712 /// until their output streams have been drained. TODO(rnystrom): We should | |
713 /// use this logic anywhere we spawn a process. | |
714 static InputStream _wrapInputStream(InputStream source) { | |
715 var sink = new ListInputStream(); | |
716 pipeInputToInput(source, sink); | |
717 return sink; | |
718 } | |
719 } | 696 } |
720 | 697 |
721 /// Calls [fn] with appropriately modified arguments. [fn] should have the same | 698 /// Calls [fn] with appropriately modified arguments. [fn] should have the same |
722 /// signature as [Process.start], except that the returned [Future] may have a | 699 /// signature as [Process.start], except that the returned [Future] may have a |
723 /// type other than [Process]. | 700 /// type other than [Process]. |
724 Future _doProcess(Function fn, String executable, List<String> args, workingDir, | 701 Future _doProcess(Function fn, String executable, List<String> args, workingDir, |
725 Map<String, String> environment) { | 702 Map<String, String> environment) { |
726 // TODO(rnystrom): Should dart:io just handle this? | 703 // TODO(rnystrom): Should dart:io just handle this? |
727 // Spawning a process on Windows will not look for the executable in the | 704 // Spawning a process on Windows will not look for the executable in the |
728 // system path. So, if executable looks like it needs that (i.e. it doesn't | 705 // system path. So, if executable looks like it needs that (i.e. it doesn't |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
785 tempDir = dir; | 762 tempDir = dir; |
786 return fn(tempDir.path); | 763 return fn(tempDir.path); |
787 }).whenComplete(() { | 764 }).whenComplete(() { |
788 log.fine('Cleaning up temp directory ${tempDir.path}.'); | 765 log.fine('Cleaning up temp directory ${tempDir.path}.'); |
789 return deleteDir(tempDir); | 766 return deleteDir(tempDir); |
790 }); | 767 }); |
791 } | 768 } |
792 | 769 |
793 /// Extracts a `.tar.gz` file from [stream] to [destination], which can be a | 770 /// Extracts a `.tar.gz` file from [stream] to [destination], which can be a |
794 /// directory or a path. Returns whether or not the extraction was successful. | 771 /// directory or a path. Returns whether or not the extraction was successful. |
795 Future<bool> extractTarGz(InputStream stream, destination) { | 772 Future<bool> extractTarGz(Stream<List<int>> stream, destination) { |
796 destination = _getPath(destination); | 773 destination = _getPath(destination); |
797 | 774 |
798 log.fine("Extracting .tar.gz stream to $destination."); | 775 log.fine("Extracting .tar.gz stream to $destination."); |
799 | 776 |
800 if (Platform.operatingSystem == "windows") { | 777 if (Platform.operatingSystem == "windows") { |
801 return _extractTarGzWindows(stream, destination); | 778 return _extractTarGzWindows(stream, destination); |
802 } | 779 } |
803 | 780 |
804 var completer = new Completer<int>(); | 781 return startProcess("tar", |
805 var processFuture = startProcess("tar", | 782 ["--extract", "--gunzip", "--directory", destination]).then((process) { |
806 ["--extract", "--gunzip", "--directory", destination]); | 783 // Ignore errors on process.std{out,err}. They'll be passed to |
807 processFuture.then((process) { | 784 // process.exitCode, and we don't want them being top-levelled by |
808 process.onExit = (exitCode) => completer.complete(exitCode); | 785 // std{out,err}Sink. |
809 stream.pipe(process.stdin); | 786 store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false); |
810 process.stdout.pipe(stdout, close: false); | 787 store(process.stderr.handleError((_) {}), stderrSink, closeSink: false); |
811 process.stderr.pipe(stderr, close: false); | 788 return Future.wait([ |
812 }).catchError((e) { | 789 store(stream, process.stdin), |
813 completer.completeError(e.error, e.stackTrace); | 790 process.exitCode |
814 }); | 791 ]); |
815 | 792 }).then((results) { |
816 return completer.future.then((exitCode) { | 793 var exitCode = results[1]; |
| 794 if (exitCode != 0) { |
| 795 throw "Failed to extract .tar.gz stream to $destination (exit code " |
| 796 "$exitCode)."; |
| 797 } |
817 log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode."); | 798 log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode."); |
818 // TODO(rnystrom): Does anything check this result value? If not, it should | |
819 // throw on a bad exit code. | |
820 return exitCode == 0; | |
821 }); | 799 }); |
822 } | 800 } |
823 | 801 |
824 Future<bool> _extractTarGzWindows(InputStream stream, String destination) { | 802 Future<bool> _extractTarGzWindows(Stream<List<int>> stream, |
| 803 String destination) { |
825 // TODO(rnystrom): In the repo's history, there is an older implementation of | 804 // TODO(rnystrom): In the repo's history, there is an older implementation of |
826 // this that does everything in memory by piping streams directly together | 805 // this that does everything in memory by piping streams directly together |
827 // instead of writing out temp files. The code is simpler, but unfortunately, | 806 // instead of writing out temp files. The code is simpler, but unfortunately, |
828 // 7zip seems to periodically fail when we invoke it from Dart and tell it to | 807 // 7zip seems to periodically fail when we invoke it from Dart and tell it to |
829 // read from stdin instead of a file. Consider resurrecting that version if | 808 // read from stdin instead of a file. Consider resurrecting that version if |
830 // we can figure out why it fails. | 809 // we can figure out why it fails. |
831 | 810 |
832 // Note: This line of code gets munged by create_sdk.py to be the correct | 811 // Note: This line of code gets munged by create_sdk.py to be the correct |
833 // relative path to 7zip in the SDK. | 812 // relative path to 7zip in the SDK. |
834 var pathTo7zip = '../../third_party/7zip/7za.exe'; | 813 var pathTo7zip = '../../third_party/7zip/7za.exe'; |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
877 | 856 |
878 log.fine('Clean up 7zip temp directory ${tempDir.path}.'); | 857 log.fine('Clean up 7zip temp directory ${tempDir.path}.'); |
879 // TODO(rnystrom): Should also delete this if anything fails. | 858 // TODO(rnystrom): Should also delete this if anything fails. |
880 return deleteDir(tempDir); | 859 return deleteDir(tempDir); |
881 }).then((_) => true); | 860 }).then((_) => true); |
882 } | 861 } |
883 | 862 |
884 /// Create a .tar.gz archive from a list of entries. Each entry can be a | 863 /// Create a .tar.gz archive from a list of entries. Each entry can be a |
885 /// [String], [Directory], or [File] object. The root of the archive is | 864 /// [String], [Directory], or [File] object. The root of the archive is |
886 /// considered to be [baseDir], which defaults to the current working directory. | 865 /// considered to be [baseDir], which defaults to the current working directory. |
887 /// Returns an [InputStream] that will emit the contents of the archive. | 866 /// Returns a [ByteStream] that will emit the contents of the archive. |
888 InputStream createTarGz(List contents, {baseDir}) { | 867 ByteStream createTarGz(List contents, {baseDir}) { |
889 var buffer = new StringBuffer(); | 868 var buffer = new StringBuffer(); |
890 buffer.add('Creating .tag.gz stream containing:\n'); | 869 buffer.add('Creating .tag.gz stream containing:\n'); |
891 contents.forEach((file) => buffer.add('$file\n')); | 870 contents.forEach((file) => buffer.add('$file\n')); |
892 log.fine(buffer.toString()); | 871 log.fine(buffer.toString()); |
893 | 872 |
894 // TODO(nweiz): Propagate errors to the returned stream (including non-zero | 873 // TODO(nweiz): Propagate errors to the returned stream (including non-zero |
895 // exit codes). See issue 3657. | 874 // exit codes). See issue 3657. |
896 var stream = new ListInputStream(); | 875 var controller = new StreamController<List<int>>(); |
897 | 876 |
898 if (baseDir == null) baseDir = path.current; | 877 if (baseDir == null) baseDir = path.current; |
899 baseDir = getFullPath(baseDir); | 878 baseDir = getFullPath(baseDir); |
900 contents = contents.map((entry) { | 879 contents = contents.map((entry) { |
901 entry = getFullPath(entry); | 880 entry = getFullPath(entry); |
902 if (!isBeneath(entry, baseDir)) { | 881 if (!isBeneath(entry, baseDir)) { |
903 throw 'Entry $entry is not inside $baseDir.'; | 882 throw 'Entry $entry is not inside $baseDir.'; |
904 } | 883 } |
905 return relativeTo(entry, baseDir); | 884 return relativeTo(entry, baseDir); |
906 }).toList(); | 885 }).toList(); |
907 | 886 |
908 if (Platform.operatingSystem != "windows") { | 887 if (Platform.operatingSystem != "windows") { |
909 var args = ["--create", "--gzip", "--directory", baseDir]; | 888 var args = ["--create", "--gzip", "--directory", baseDir]; |
910 args.addAll(contents.map(_getPath)); | 889 args.addAll(contents.map(_getPath)); |
911 // TODO(nweiz): It's possible that enough command-line arguments will make | 890 // TODO(nweiz): It's possible that enough command-line arguments will make |
912 // the process choke, so at some point we should save the arguments to a | 891 // the process choke, so at some point we should save the arguments to a |
913 // file and pass them in via --files-from for tar and -i@filename for 7zip. | 892 // file and pass them in via --files-from for tar and -i@filename for 7zip. |
914 startProcess("tar", args).then((process) { | 893 startProcess("tar", args).then((process) { |
915 pipeInputToInput(process.stdout, stream); | 894 store(process.stdout, controller); |
916 | 895 }).catchError((e) { |
917 // Drain and discard 7zip's stderr. 7zip writes its normal output to | 896 // We don't have to worry about double-signaling here, since the store() |
918 // stderr. We don't want to show that since it's meaningless. | 897 // above will only be reached if startProcess succeeds. |
919 // TODO(rnystrom): Should log this and display it if an actual error | 898 controller.signalError(e.error, e.stackTrace); |
920 // occurs. | 899 controller.close(); |
921 consumeInputStream(process.stderr); | |
922 }); | 900 }); |
923 return stream; | 901 return new ByteStream(controller.stream); |
924 } | 902 } |
925 | 903 |
926 withTempDir((tempDir) { | 904 withTempDir((tempDir) { |
927 // Create the tar file. | 905 // Create the tar file. |
928 var tarFile = join(tempDir, "intermediate.tar"); | 906 var tarFile = join(tempDir, "intermediate.tar"); |
929 var args = ["a", "-w$baseDir", tarFile]; | 907 var args = ["a", "-w$baseDir", tarFile]; |
930 args.addAll(contents.map((entry) => '-i!"$entry"')); | 908 args.addAll(contents.map((entry) => '-i!"$entry"')); |
931 | 909 |
932 // Note: This line of code gets munged by create_sdk.py to be the correct | 910 // Note: This line of code gets munged by create_sdk.py to be the correct |
933 // relative path to 7zip in the SDK. | 911 // relative path to 7zip in the SDK. |
934 var pathTo7zip = '../../third_party/7zip/7za.exe'; | 912 var pathTo7zip = '../../third_party/7zip/7za.exe'; |
935 var command = relativeToPub(pathTo7zip); | 913 var command = relativeToPub(pathTo7zip); |
936 | 914 |
937 // We're passing 'baseDir' both as '-w' and setting it as the working | 915 // We're passing 'baseDir' both as '-w' and setting it as the working |
938 // directory explicitly here intentionally. The former ensures that the | 916 // directory explicitly here intentionally. The former ensures that the |
939 // files added to the archive have the correct relative path in the archive. | 917 // files added to the archive have the correct relative path in the archive. |
940 // The latter enables relative paths in the "-i" args to be resolved. | 918 // The latter enables relative paths in the "-i" args to be resolved. |
941 return runProcess(command, args, workingDir: baseDir).then((_) { | 919 return runProcess(command, args, workingDir: baseDir).then((_) { |
942 // GZIP it. 7zip doesn't support doing both as a single operation. Send | 920 // GZIP it. 7zip doesn't support doing both as a single operation. Send |
943 // the output to stdout. | 921 // the output to stdout. |
944 args = ["a", "unused", "-tgzip", "-so", tarFile]; | 922 args = ["a", "unused", "-tgzip", "-so", tarFile]; |
945 return startProcess(command, args); | 923 return startProcess(command, args); |
946 }).then((process) { | 924 }).then((process) { |
947 // Drain and discard 7zip's stderr. 7zip writes its normal output to | 925 // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't |
948 // stderr. We don't want to show that since it's meaningless. | 926 // want to show that since it's meaningless. |
949 // TODO(rnystrom): Should log this and display it if an actual error | 927 // |
| 928 // TODO(rnystrom): Should log the stderr and display it if an actual error |
950 // occurs. | 929 // occurs. |
951 consumeInputStream(process.stderr); | 930 store(process.stdout, controller); |
952 return pipeInputToInput(process.stdout, stream); | |
953 }); | 931 }); |
| 932 }).catchError((e) { |
| 933 // We don't have to worry about double-signaling here, since the store() |
| 934 // above will only be reached if everything succeeds. |
| 935 controller.signalError(e.error, e.stackTrace); |
| 936 controller.close(); |
954 }); | 937 }); |
955 return stream; | 938 return new ByteStream(controller.stream); |
956 } | 939 } |
957 | 940 |
958 /// Exception thrown when an operation times out. | 941 /// Exception thrown when an operation times out. |
959 class TimeoutException implements Exception { | 942 class TimeoutException implements Exception { |
960 final String message; | 943 final String message; |
961 | 944 |
962 const TimeoutException(this.message); | 945 const TimeoutException(this.message); |
963 | 946 |
964 String toString() => message; | 947 String toString() => message; |
965 } | 948 } |
(...skipping 24 matching lines...) Expand all Loading... |
990 Directory _getDirectory(entry) { | 973 Directory _getDirectory(entry) { |
991 if (entry is Directory) return entry; | 974 if (entry is Directory) return entry; |
992 return new Directory(entry); | 975 return new Directory(entry); |
993 } | 976 } |
994 | 977 |
995 /// Gets a [Uri] for [uri], which can either already be one, or be a [String]. | 978 /// Gets a [Uri] for [uri], which can either already be one, or be a [String]. |
996 Uri _getUri(uri) { | 979 Uri _getUri(uri) { |
997 if (uri is Uri) return uri; | 980 if (uri is Uri) return uri; |
998 return Uri.parse(uri); | 981 return Uri.parse(uri); |
999 } | 982 } |
OLD | NEW |