Index: utils/pub/io.dart |
diff --git a/utils/pub/io.dart b/utils/pub/io.dart |
index b227378a28f552f3c541967d640d76d219dd0229..4b3ea23b4fb301b8aba5aa9f4cbd204cff41c12b 100644 |
--- a/utils/pub/io.dart |
+++ b/utils/pub/io.dart |
@@ -12,9 +12,14 @@ import 'dart:json'; |
import 'dart:uri'; |
import '../../pkg/path/lib/path.dart' as path; |
+import '../../pkg/http/lib/http.dart' show ByteStream; |
+import 'error_group.dart'; |
+import 'exit_codes.dart' as exit_codes; |
import 'log.dart' as log; |
import 'utils.dart'; |
+export '../../pkg/http/lib/http.dart' show ByteStream; |
+ |
bool _isGitInstalledCache; |
/// The cached Git command. |
@@ -125,44 +130,15 @@ Future<File> deleteFile(file) { |
/// Writes [stream] to a new file at [path], which may be a [String] or a |
/// [File]. Will replace any file already at that path. Completes when the file |
/// is done being written. |
-Future<File> createFileFromStream(InputStream stream, path) { |
+Future<File> createFileFromStream(Stream<List<int>> stream, path) { |
path = _getPath(path); |
log.io("Creating $path from stream."); |
- var completer = new Completer<File>(); |
- var completed = false; |
var file = new File(path); |
- var outputStream = file.openOutputStream(); |
- stream.pipe(outputStream); |
- |
- outputStream.onClosed = () { |
+ return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) { |
log.fine("Created $path from stream."); |
- completed = true; |
- completer.complete(file); |
- }; |
- |
- // TODO(nweiz): remove this when issue 4061 is fixed. |
- var stackTrace; |
- try { |
- throw ""; |
- } catch (_, localStackTrace) { |
- stackTrace = localStackTrace; |
- } |
- |
- completeError(error) { |
- if (!completed) { |
- completed = true; |
- completer.completeError(error, stackTrace); |
- } else { |
- log.fine("Got error after stream was closed: $error"); |
- } |
- } |
- |
- stream.onError = completeError; |
- outputStream.onError = completeError; |
- |
- return completer.future; |
+ }); |
} |
/// Creates a directory [dir]. Returns a [Future] that completes when the |
@@ -438,8 +414,33 @@ String relativeToPub(String target) { |
return path.normalize(join(utilDir, 'pub', target)); |
} |
-/// A StringInputStream reading from stdin. |
-final _stringStdin = new StringInputStream(stdin); |
+// TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr |
+// nicer. |
+ |
+/// A sink that writes to standard output. Errors piped to this stream will be |
+/// surfaced to the top-level error handler. |
+final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout"); |
+ |
+/// A sink that writes to standard error. Errors piped to this stream will be |
+/// surfaced to the top-level error handler. |
+final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); |
+ |
+/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are |
+/// logged, and then the program is terminated. [name] is used for debugging. |
+StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { |
+ var pair = consumerToSink(wrapOutputStream(stream)); |
+ pair.last.catchError((e) { |
+ // This log may or may not work, depending on how the stream failed. Not |
+ // much we can do about that. |
+ log.error("Error writing to $name: $e"); |
+ exit(exit_codes.IO); |
+ }); |
+ return pair.first; |
+} |
+ |
+/// A line-by-line stream of standard input. |
+final Stream<String> stdinLines = |
+ streamToLines(wrapInputStream(stdin).toStringStream()); |
/// Displays a message and reads a yes/no confirmation from the user. Returns |
/// a [Future] that completes to `true` if the user confirms or `false` if they |
@@ -449,143 +450,28 @@ final _stringStdin = new StringInputStream(stdin); |
/// should just be a fragment like, "Are you sure you want to proceed". |
Future<bool> confirm(String message) { |
log.fine('Showing confirm message: $message'); |
- stdout.writeString("$message (y/n)? "); |
- return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line)); |
-} |
- |
-/// Returns a single line read from a [StringInputStream]. By default, reads |
-/// from stdin. |
-/// |
-/// A [StringInputStream] passed to this should have no callbacks registered. |
-Future<String> readLine([StringInputStream stream]) { |
- if (stream == null) stream = _stringStdin; |
- if (stream.closed) return new Future.immediate(''); |
- void removeCallbacks() { |
- stream.onClosed = null; |
- stream.onLine = null; |
- stream.onError = null; |
- } |
- |
- // TODO(nweiz): remove this when issue 4061 is fixed. |
- var stackTrace; |
- try { |
- throw ""; |
- } catch (_, localStackTrace) { |
- stackTrace = localStackTrace; |
- } |
- |
- var completer = new Completer(); |
- stream.onClosed = () { |
- removeCallbacks(); |
- completer.complete(''); |
- }; |
- |
- stream.onLine = () { |
- removeCallbacks(); |
- var line = stream.readLine(); |
- log.io('Read line: $line'); |
- completer.complete(line); |
- }; |
- |
- stream.onError = (e) { |
- removeCallbacks(); |
- completer.completeError(e, stackTrace); |
- }; |
- |
- return completer.future; |
-} |
- |
-/// Takes all input from [source] and writes it to [sink]. |
-/// |
-/// Returns a future that completes when [source] is closed. |
-Future pipeInputToInput(InputStream source, ListInputStream sink) { |
- var completer = new Completer(); |
- source.onClosed = () { |
- sink.markEndOfStream(); |
- completer.complete(null); |
- }; |
- source.onData = () { |
- // Even if the sink is closed and we aren't going to do anything with more |
- // data, we still need to drain it from source to work around issue 7218. |
- var data = source.read(); |
- try { |
- if (!sink.closed) sink.write(data); |
- } on StreamException catch (e, stackTrace) { |
- // Ignore an exception to work around issue 4222. |
- log.io("Writing to an unclosed ListInputStream caused exception $e\n" |
- "$stackTrace"); |
- } |
- }; |
- // TODO(nweiz): propagate this error to the sink. See issue 3657. |
- source.onError = (e) { throw e; }; |
- return completer.future; |
-} |
- |
-/// Buffers all input from an InputStream and returns it as a future. |
-Future<List<int>> consumeInputStream(InputStream stream) { |
- if (stream.closed) return new Future.immediate(<int>[]); |
- |
- // TODO(nweiz): remove this when issue 4061 is fixed. |
- var stackTrace; |
- try { |
- throw ""; |
- } catch (_, localStackTrace) { |
- stackTrace = localStackTrace; |
- } |
- |
- var completer = new Completer<List<int>>(); |
- var buffer = <int>[]; |
- stream.onClosed = () => completer.complete(buffer); |
- stream.onData = () => buffer.addAll(stream.read()); |
- stream.onError = (e) => completer.completeError(e, stackTrace); |
- return completer.future; |
-} |
- |
-/// Buffers all input from a StringInputStream and returns it as a future. |
-Future<String> consumeStringInputStream(StringInputStream stream) { |
- if (stream.closed) return new Future.immediate(''); |
- |
- // TODO(nweiz): remove this when issue 4061 is fixed. |
- var stackTrace; |
- try { |
- throw ""; |
- } catch (_, localStackTrace) { |
- stackTrace = localStackTrace; |
- } |
- |
- var completer = new Completer<String>(); |
- var buffer = new StringBuffer(); |
- stream.onClosed = () => completer.complete(buffer.toString()); |
- stream.onData = () => buffer.add(stream.read()); |
- stream.onError = (e) => completer.completeError(e, stackTrace); |
- return completer.future; |
+ stdoutSink.add("$message (y/n)? ".charCodes); |
+ return streamFirst(stdinLines) |
+ .then((line) => new RegExp(r"^[yY]").hasMatch(line)); |
} |
/// Wraps [stream] in a single-subscription [Stream] that emits the same data. |
-Stream<List<int>> wrapInputStream(InputStream stream) { |
+ByteStream wrapInputStream(InputStream stream) { |
var controller = new StreamController(); |
if (stream.closed) { |
controller.close(); |
- return controller.stream; |
+ return new ByteStream(controller.stream); |
} |
stream.onClosed = controller.close; |
stream.onData = () => controller.add(stream.read()); |
stream.onError = (e) => controller.signalError(new AsyncError(e)); |
- return controller.stream; |
-} |
- |
-// TODO(nweiz): remove this ASAP (issue 7807). |
-/// Wraps [stream] in an [InputStream]. |
-InputStream streamToInputStream(Stream<List<int>> stream) { |
- var inputStream = new ListInputStream(); |
- stream.listen((chunk) => inputStream.write(chunk), |
- onDone: inputStream.markEndOfStream); |
- return inputStream; |
+ return new ByteStream(controller.stream); |
} |
/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
-/// using [Stream.pipe]. |
+/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be |
+/// forwarded to the [Future] returned by [Stream.pipe]. |
StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
new _OutputStreamConsumer(stream); |
@@ -613,6 +499,9 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
if (!completed) completer.completeError(e, stack); |
completed = true; |
} |
+ }, onError: (e) { |
+ if (!completed) completer.completeError(e.error, e.stackTrace); |
+ completed = true; |
}, onDone: () => _outputStream.close()); |
_outputStream.onError = (e) { |
@@ -629,6 +518,43 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
} |
} |
+/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that |
+/// will succeed when [StreamSink] is closed or fail with any errors that occur |
+/// while writing. |
+Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) { |
+ var controller = new StreamController(); |
+ var done = controller.stream.pipe(consumer); |
+ return new Pair<StreamSink, Future>(controller.sink, done); |
+} |
+ |
+// TODO(nweiz): remove this when issue 7786 is fixed. |
+/// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
+/// the returned [Future] is completed and [sink] is closed if [closeSink] is |
+/// true. |
+/// |
+/// When an error occurs on [stream], that error is passed to [sink]. If |
+/// [unsubscribeOnError] is true, [Future] will be completed successfully and no |
+/// more data or errors will be piped from [stream] to [sink]. If |
+/// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be |
+/// closed. |
+Future store(Stream stream, StreamSink sink, |
+ {bool unsubscribeOnError: true, closeSink: true}) { |
+ var completer = new Completer(); |
+ stream.listen(sink.add, |
+ onError: (e) { |
+ sink.signalError(e); |
+ if (unsubscribeOnError) { |
+ completer.complete(); |
+ if (closeSink) sink.close(); |
+ } |
+ }, |
+ onDone: () { |
+ if (closeSink) sink.close(); |
+ completer.complete(); |
+ }, unsubscribeOnError: unsubscribeOnError); |
+ return completer.future; |
+} |
+ |
/// Spawns and runs the process located at [executable], passing in [args]. |
/// Returns a [Future] that will complete with the results of the process after |
/// it has ended. |
@@ -662,41 +588,92 @@ Future<PubProcessResult> runProcess(String executable, List<String> args, |
/// The spawned process will inherit its parent's environment variables. If |
/// [environment] is provided, that will be used to augment (not replace) the |
/// the inherited variables. |
-Future<Process> startProcess(String executable, List<String> args, |
+Future<PubProcess> startProcess(String executable, List<String> args, |
{workingDir, Map<String, String> environment}) => |
_doProcess(Process.start, executable, args, workingDir, environment) |
- .then((process) => new _WrappedProcess(process)); |
+ .then((process) => new PubProcess(process)); |
-/// A wrapper around [Process] that buffers the stdout and stderr to avoid |
-/// running into issue 7218. |
-class _WrappedProcess implements Process { |
+/// A wrapper around [Process] that exposes `dart:async`-style APIs. |
+class PubProcess { |
+ /// The underlying `dart:io` [Process]. |
final Process _process; |
- final InputStream stderr; |
- final InputStream stdout; |
- OutputStream get stdin => _process.stdin; |
- |
- void set onExit(void callback(int exitCode)) { |
- _process.onExit = callback; |
+ /// The mutable field for [stdin]. |
+ StreamSink<List<int>> _stdin; |
+ |
+ /// The mutable field for [stdinClosed]. |
+ Future _stdinClosed; |
+ |
+ /// The mutable field for [stdout]. |
+ ByteStream _stdout; |
+ |
+ /// The mutable field for [stderr]. |
+ ByteStream _stderr; |
+ |
+ /// The mutable field for [exitCode]. |
+ Future<int> _exitCode; |
+ |
+ /// The sink used for passing data to the process's standard input stream. |
+ /// Errors on this stream are surfaced through [stdinClosed], [stdout], |
+ /// [stderr], and [exitCode], which are all members of an [ErrorGroup]. |
+ StreamSink<List<int>> get stdin => _stdin; |
+ |
+ // TODO(nweiz): write some more sophisticated Future machinery so that this |
+ // doesn't surface errors from the other streams/futures, but still passes its |
+ // unhandled errors to them. Right now it's impossible to recover from a stdin |
+ // error and continue interacting with the process. |
+ /// A [Future] that completes when [stdin] is closed, either by the user or by |
+ /// the process itself. |
+ /// |
+ /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any |
+ /// error in process will be passed to it, but won't reach the top-level error |
+ /// handler unless nothing has handled it. |
+ Future get stdinClosed => _stdinClosed; |
+ |
+ /// The process's standard output stream. |
+ /// |
+ /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode], |
+ /// so any error in process will be passed to it, but won't reach the |
+ /// top-level error handler unless nothing has handled it. |
+ ByteStream get stdout => _stdout; |
+ |
+ /// The process's standard error stream. |
+ /// |
+ /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode], |
+ /// so any error in process will be passed to it, but won't reach the |
+ /// top-level error handler unless nothing has handled it. |
+ ByteStream get stderr => _stderr; |
+ |
+ /// A [Future] that will complete to the process's exit code once the process |
+ /// has finished running. |
+ /// |
+ /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so |
+ /// any error in process will be passed to it, but won't reach the top-level |
+ /// error handler unless nothing has handled it. |
+ Future<int> get exitCode => _exitCode; |
+ |
+ /// Creates a new [PubProcess] wrapping [process]. |
+ PubProcess(Process process) |
+ : _process = process { |
+ var errorGroup = new ErrorGroup(); |
+ |
+ var pair = consumerToSink(wrapOutputStream(process.stdin)); |
+ _stdin = pair.first; |
+ _stdinClosed = errorGroup.registerFuture(pair.last); |
+ |
+ _stdout = new ByteStream( |
+ errorGroup.registerStream(wrapInputStream(process.stdout))); |
+ _stderr = new ByteStream( |
+ errorGroup.registerStream(wrapInputStream(process.stderr))); |
+ |
+ var exitCodeCompleter = new Completer(); |
+ _exitCode = errorGroup.registerFuture(exitCodeCompleter.future); |
+ _process.onExit = (code) => exitCodeCompleter.complete(code); |
} |
- _WrappedProcess(Process process) |
- : _process = process, |
- stderr = _wrapInputStream(process.stderr), |
- stdout = _wrapInputStream(process.stdout); |
- |
+ /// Sends [signal] to the underlying process. |
bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => |
_process.kill(signal); |
- |
- /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] |
- /// input stream. This is useful for spawned processes which will not exit |
- /// until their output streams have been drained. TODO(rnystrom): We should |
- /// use this logic anywhere we spawn a process. |
- static InputStream _wrapInputStream(InputStream source) { |
- var sink = new ListInputStream(); |
- pipeInputToInput(source, sink); |
- return sink; |
- } |
} |
/// Calls [fn] with appropriately modified arguments. [fn] should have the same |
@@ -832,7 +809,7 @@ Future<bool> _tryGitCommand(String command) { |
/// Extracts a `.tar.gz` file from [stream] to [destination], which can be a |
/// directory or a path. Returns whether or not the extraction was successful. |
-Future<bool> extractTarGz(InputStream stream, destination) { |
+Future<bool> extractTarGz(Stream<List<int>> stream, destination) { |
destination = _getPath(destination); |
log.fine("Extracting .tar.gz stream to $destination."); |
@@ -841,27 +818,29 @@ Future<bool> extractTarGz(InputStream stream, destination) { |
return _extractTarGzWindows(stream, destination); |
} |
- var completer = new Completer<int>(); |
- var processFuture = startProcess("tar", |
- ["--extract", "--gunzip", "--directory", destination]); |
- processFuture.then((process) { |
- process.onExit = (exitCode) => completer.complete(exitCode); |
- stream.pipe(process.stdin); |
- process.stdout.pipe(stdout, close: false); |
- process.stderr.pipe(stderr, close: false); |
- }).catchError((e) { |
- completer.completeError(e.error, e.stackTrace); |
- }); |
- |
- return completer.future.then((exitCode) { |
+ return startProcess("tar", |
+ ["--extract", "--gunzip", "--directory", destination]).then((process) { |
+ // Ignore errors on process.std{out,err}. They'll be passed to |
+ // process.exitCode, and we don't want them being top-levelled by |
+ // std{out,err}Sink. |
+ store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false); |
+ store(process.stderr.handleError((_) {}), stderrSink, closeSink: false); |
+ return Future.wait([ |
+ store(stream, process.stdin), |
+ process.exitCode |
+ ]); |
+ }).then((results) { |
+ var exitCode = results[1]; |
+ if (exitCode != 0) { |
+ throw "Failed to extract .tar.gz stream to $destination (exit code " |
+ "$exitCode)."; |
+ } |
log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode."); |
- // TODO(rnystrom): Does anything check this result value? If not, it should |
- // throw on a bad exit code. |
- return exitCode == 0; |
}); |
} |
-Future<bool> _extractTarGzWindows(InputStream stream, String destination) { |
+Future<bool> _extractTarGzWindows(Stream<List<int>> stream, |
+ String destination) { |
// TODO(rnystrom): In the repo's history, there is an older implementation of |
// this that does everything in memory by piping streams directly together |
// instead of writing out temp files. The code is simpler, but unfortunately, |
@@ -924,8 +903,8 @@ Future<bool> _extractTarGzWindows(InputStream stream, String destination) { |
/// Create a .tar.gz archive from a list of entries. Each entry can be a |
/// [String], [Directory], or [File] object. The root of the archive is |
/// considered to be [baseDir], which defaults to the current working directory. |
-/// Returns an [InputStream] that will emit the contents of the archive. |
-InputStream createTarGz(List contents, {baseDir}) { |
+/// Returns a [ByteStream] that will emit the contents of the archive. |
+ByteStream createTarGz(List contents, {baseDir}) { |
var buffer = new StringBuffer(); |
buffer.add('Creating .tag.gz stream containing:\n'); |
contents.forEach((file) => buffer.add('$file\n')); |
@@ -933,7 +912,7 @@ InputStream createTarGz(List contents, {baseDir}) { |
// TODO(nweiz): Propagate errors to the returned stream (including non-zero |
// exit codes). See issue 3657. |
- var stream = new ListInputStream(); |
+ var controller = new StreamController<List<int>>(); |
if (baseDir == null) baseDir = path.current; |
baseDir = getFullPath(baseDir); |
@@ -952,15 +931,14 @@ InputStream createTarGz(List contents, {baseDir}) { |
// the process choke, so at some point we should save the arguments to a |
// file and pass them in via --files-from for tar and -i@filename for 7zip. |
startProcess("tar", args).then((process) { |
- pipeInputToInput(process.stdout, stream); |
- |
- // Drain and discard 7zip's stderr. 7zip writes its normal output to |
- // stderr. We don't want to show that since it's meaningless. |
- // TODO(rnystrom): Should log this and display it if an actual error |
- // occurs. |
- consumeInputStream(process.stderr); |
+ store(process.stdout, controller); |
+ }).catchError((e) { |
+ // We don't have to worry about double-signaling here, since the store() |
+ // above will only be reached if startProcess succeeds. |
+ controller.signalError(e.error, e.stackTrace); |
+ controller.close(); |
}); |
- return stream; |
+ return new ByteStream(controller.stream); |
} |
withTempDir((tempDir) { |
@@ -984,15 +962,20 @@ InputStream createTarGz(List contents, {baseDir}) { |
args = ["a", "unused", "-tgzip", "-so", tarFile]; |
return startProcess(command, args); |
}).then((process) { |
- // Drain and discard 7zip's stderr. 7zip writes its normal output to |
- // stderr. We don't want to show that since it's meaningless. |
- // TODO(rnystrom): Should log this and display it if an actual error |
+ // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't |
+ // want to show that since it's meaningless. |
+ // |
+ // TODO(rnystrom): Should log the stderr and display it if an actual error |
// occurs. |
- consumeInputStream(process.stderr); |
- return pipeInputToInput(process.stdout, stream); |
+ store(process.stdout, controller); |
}); |
+ }).catchError((e) { |
+ // We don't have to worry about double-signaling here, since the store() |
+ // above will only be reached if everything succeeds. |
+ controller.signalError(e.error, e.stackTrace); |
+ controller.close(); |
}); |
- return stream; |
+ return new ByteStream(controller); |
} |
/// Exception thrown when an operation times out. |