| Index: utils/pub/io.dart
|
| diff --git a/utils/pub/io.dart b/utils/pub/io.dart
|
| index 60876c8f83f06db30076c59847963d18271c2ec6..79f0a06524cc5b91b5e953fdf03d42ccb5b6ca82 100644
|
| --- a/utils/pub/io.dart
|
| +++ b/utils/pub/io.dart
|
| @@ -12,14 +12,9 @@ import 'dart:json';
|
| import 'dart:uri';
|
|
|
| import '../../pkg/path/lib/path.dart' as path;
|
| -import '../../pkg/http/lib/http.dart' show ByteStream;
|
| -import 'error_group.dart';
|
| -import 'exit_codes.dart' as exit_codes;
|
| import 'log.dart' as log;
|
| import 'utils.dart';
|
|
|
| -export '../../pkg/http/lib/http.dart' show ByteStream;
|
| -
|
| final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?");
|
|
|
| /// Joins a number of path string parts into a single path. Handles
|
| @@ -125,15 +120,44 @@ Future<File> deleteFile(file) {
|
| /// Writes [stream] to a new file at [path], which may be a [String] or a
|
| /// [File]. Will replace any file already at that path. Completes when the file
|
| /// is done being written.
|
| -Future<File> createFileFromStream(Stream<List<int>> stream, path) {
|
| +Future<File> createFileFromStream(InputStream stream, path) {
|
| path = _getPath(path);
|
|
|
| log.io("Creating $path from stream.");
|
|
|
| + var completer = new Completer<File>();
|
| + var completed = false;
|
| var file = new File(path);
|
| - return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) {
|
| + var outputStream = file.openOutputStream();
|
| + stream.pipe(outputStream);
|
| +
|
| + outputStream.onClosed = () {
|
| log.fine("Created $path from stream.");
|
| - });
|
| + completed = true;
|
| + completer.complete(file);
|
| + };
|
| +
|
| + // TODO(nweiz): remove this when issue 4061 is fixed.
|
| + var stackTrace;
|
| + try {
|
| + throw "";
|
| + } catch (_, localStackTrace) {
|
| + stackTrace = localStackTrace;
|
| + }
|
| +
|
| + completeError(error) {
|
| + if (!completed) {
|
| + completed = true;
|
| + completer.completeError(error, stackTrace);
|
| + } else {
|
| + log.fine("Got error after stream was closed: $error");
|
| + }
|
| + }
|
| +
|
| + stream.onError = completeError;
|
| + outputStream.onError = completeError;
|
| +
|
| + return completer.future;
|
| }
|
|
|
| /// Creates a directory [dir]. Returns a [Future] that completes when the
|
| @@ -409,33 +433,8 @@ String relativeToPub(String target) {
|
| return path.normalize(join(utilDir, 'pub', target));
|
| }
|
|
|
| -// TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr
|
| -// nicer.
|
| -
|
| -/// A sink that writes to standard output. Errors piped to this stream will be
|
| -/// surfaced to the top-level error handler.
|
| -final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout");
|
| -
|
| -/// A sink that writes to standard error. Errors piped to this stream will be
|
| -/// surfaced to the top-level error handler.
|
| -final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
|
| -
|
| -/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are
|
| -/// logged, and then the program is terminated. [name] is used for debugging.
|
| -StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) {
|
| - var pair = consumerToSink(wrapOutputStream(stream));
|
| - pair.last.catchError((e) {
|
| - // This log may or may not work, depending on how the stream failed. Not
|
| - // much we can do about that.
|
| - log.error("Error writing to $name: $e");
|
| - exit(exit_codes.IO);
|
| - });
|
| - return pair.first;
|
| -}
|
| -
|
| -/// A line-by-line stream of standard input.
|
| -final Stream<String> stdinLines =
|
| - streamToLines(wrapInputStream(stdin).toStringStream());
|
| +/// A StringInputStream reading from stdin.
|
| +final _stringStdin = new StringInputStream(stdin);
|
|
|
| /// Displays a message and reads a yes/no confirmation from the user. Returns
|
| /// a [Future] that completes to `true` if the user confirms or `false` if they
|
| @@ -445,28 +444,143 @@ final Stream<String> stdinLines =
|
| /// should just be a fragment like, "Are you sure you want to proceed".
|
| Future<bool> confirm(String message) {
|
| log.fine('Showing confirm message: $message');
|
| - stdoutSink.add("$message (y/n)? ".charCodes);
|
| - return streamFirst(stdinLines)
|
| - .then((line) => new RegExp(r"^[yY]").hasMatch(line));
|
| + stdout.writeString("$message (y/n)? ");
|
| + return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line));
|
| +}
|
| +
|
| +/// Returns a single line read from a [StringInputStream]. By default, reads
|
| +/// from stdin.
|
| +///
|
| +/// A [StringInputStream] passed to this should have no callbacks registered.
|
| +Future<String> readLine([StringInputStream stream]) {
|
| + if (stream == null) stream = _stringStdin;
|
| + if (stream.closed) return new Future.immediate('');
|
| + void removeCallbacks() {
|
| + stream.onClosed = null;
|
| + stream.onLine = null;
|
| + stream.onError = null;
|
| + }
|
| +
|
| + // TODO(nweiz): remove this when issue 4061 is fixed.
|
| + var stackTrace;
|
| + try {
|
| + throw "";
|
| + } catch (_, localStackTrace) {
|
| + stackTrace = localStackTrace;
|
| + }
|
| +
|
| + var completer = new Completer();
|
| + stream.onClosed = () {
|
| + removeCallbacks();
|
| + completer.complete('');
|
| + };
|
| +
|
| + stream.onLine = () {
|
| + removeCallbacks();
|
| + var line = stream.readLine();
|
| + log.io('Read line: $line');
|
| + completer.complete(line);
|
| + };
|
| +
|
| + stream.onError = (e) {
|
| + removeCallbacks();
|
| + completer.completeError(e, stackTrace);
|
| + };
|
| +
|
| + return completer.future;
|
| +}
|
| +
|
| +/// Takes all input from [source] and writes it to [sink].
|
| +///
|
| +/// Returns a future that completes when [source] is closed.
|
| +Future pipeInputToInput(InputStream source, ListInputStream sink) {
|
| + var completer = new Completer();
|
| + source.onClosed = () {
|
| + sink.markEndOfStream();
|
| + completer.complete(null);
|
| + };
|
| + source.onData = () {
|
| + // Even if the sink is closed and we aren't going to do anything with more
|
| + // data, we still need to drain it from source to work around issue 7218.
|
| + var data = source.read();
|
| + try {
|
| + if (!sink.closed) sink.write(data);
|
| + } on StreamException catch (e, stackTrace) {
|
| + // Ignore an exception to work around issue 4222.
|
| + log.io("Writing to an unclosed ListInputStream caused exception $e\n"
|
| + "$stackTrace");
|
| + }
|
| + };
|
| + // TODO(nweiz): propagate this error to the sink. See issue 3657.
|
| + source.onError = (e) { throw e; };
|
| + return completer.future;
|
| +}
|
| +
|
| +/// Buffers all input from an InputStream and returns it as a future.
|
| +Future<List<int>> consumeInputStream(InputStream stream) {
|
| + if (stream.closed) return new Future.immediate(<int>[]);
|
| +
|
| + // TODO(nweiz): remove this when issue 4061 is fixed.
|
| + var stackTrace;
|
| + try {
|
| + throw "";
|
| + } catch (_, localStackTrace) {
|
| + stackTrace = localStackTrace;
|
| + }
|
| +
|
| + var completer = new Completer<List<int>>();
|
| + var buffer = <int>[];
|
| + stream.onClosed = () => completer.complete(buffer);
|
| + stream.onData = () => buffer.addAll(stream.read());
|
| + stream.onError = (e) => completer.completeError(e, stackTrace);
|
| + return completer.future;
|
| +}
|
| +
|
| +/// Buffers all input from a StringInputStream and returns it as a future.
|
| +Future<String> consumeStringInputStream(StringInputStream stream) {
|
| + if (stream.closed) return new Future.immediate('');
|
| +
|
| + // TODO(nweiz): remove this when issue 4061 is fixed.
|
| + var stackTrace;
|
| + try {
|
| + throw "";
|
| + } catch (_, localStackTrace) {
|
| + stackTrace = localStackTrace;
|
| + }
|
| +
|
| + var completer = new Completer<String>();
|
| + var buffer = new StringBuffer();
|
| + stream.onClosed = () => completer.complete(buffer.toString());
|
| + stream.onData = () => buffer.add(stream.read());
|
| + stream.onError = (e) => completer.completeError(e, stackTrace);
|
| + return completer.future;
|
| }
|
|
|
| /// Wraps [stream] in a single-subscription [Stream] that emits the same data.
|
| -ByteStream wrapInputStream(InputStream stream) {
|
| +Stream<List<int>> wrapInputStream(InputStream stream) {
|
| var controller = new StreamController();
|
| if (stream.closed) {
|
| controller.close();
|
| - return new ByteStream(controller.stream);
|
| + return controller.stream;
|
| }
|
|
|
| stream.onClosed = controller.close;
|
| stream.onData = () => controller.add(stream.read());
|
| stream.onError = (e) => controller.signalError(new AsyncError(e));
|
| - return new ByteStream(controller.stream);
|
| + return controller.stream;
|
| +}
|
| +
|
| +// TODO(nweiz): remove this ASAP (issue 7807).
|
| +/// Wraps [stream] in an [InputStream].
|
| +InputStream streamToInputStream(Stream<List<int>> stream) {
|
| + var inputStream = new ListInputStream();
|
| + stream.listen((chunk) => inputStream.write(chunk),
|
| + onDone: inputStream.markEndOfStream);
|
| + return inputStream;
|
| }
|
|
|
| /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
|
| -/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be
|
| -/// forwarded to the [Future] returned by [Stream.pipe].
|
| +/// using [Stream.pipe].
|
| StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
|
| new _OutputStreamConsumer(stream);
|
|
|
| @@ -494,9 +608,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
|
| if (!completed) completer.completeError(e, stack);
|
| completed = true;
|
| }
|
| - }, onError: (e) {
|
| - if (!completed) completer.completeError(e.error, e.stackTrace);
|
| - completed = true;
|
| }, onDone: () => _outputStream.close());
|
|
|
| _outputStream.onError = (e) {
|
| @@ -513,43 +624,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
|
| }
|
| }
|
|
|
| -/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that
|
| -/// will succeed when [StreamSink] is closed or fail with any errors that occur
|
| -/// while writing.
|
| -Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) {
|
| - var controller = new StreamController();
|
| - var done = controller.stream.pipe(consumer);
|
| - return new Pair<StreamSink, Future>(controller.sink, done);
|
| -}
|
| -
|
| -// TODO(nweiz): remove this when issue 7786 is fixed.
|
| -/// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
|
| -/// the returned [Future] is completed and [sink] is closed if [closeSink] is
|
| -/// true.
|
| -///
|
| -/// When an error occurs on [stream], that error is passed to [sink]. If
|
| -/// [unsubscribeOnError] is true, [Future] will be completed successfully and no
|
| -/// more data or errors will be piped from [stream] to [sink]. If
|
| -/// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be
|
| -/// closed.
|
| -Future store(Stream stream, StreamSink sink,
|
| - {bool unsubscribeOnError: true, closeSink: true}) {
|
| - var completer = new Completer();
|
| - stream.listen(sink.add,
|
| - onError: (e) {
|
| - sink.signalError(e);
|
| - if (unsubscribeOnError) {
|
| - completer.complete();
|
| - if (closeSink) sink.close();
|
| - }
|
| - },
|
| - onDone: () {
|
| - if (closeSink) sink.close();
|
| - completer.complete();
|
| - }, unsubscribeOnError: unsubscribeOnError);
|
| - return completer.future;
|
| -}
|
| -
|
| /// Spawns and runs the process located at [executable], passing in [args].
|
| /// Returns a [Future] that will complete with the results of the process after
|
| /// it has ended.
|
| @@ -583,92 +657,41 @@ Future<PubProcessResult> runProcess(String executable, List<String> args,
|
| /// The spawned process will inherit its parent's environment variables. If
|
| /// [environment] is provided, that will be used to augment (not replace) the
|
| /// the inherited variables.
|
| -Future<PubProcess> startProcess(String executable, List<String> args,
|
| +Future<Process> startProcess(String executable, List<String> args,
|
| {workingDir, Map<String, String> environment}) =>
|
| _doProcess(Process.start, executable, args, workingDir, environment)
|
| - .then((process) => new PubProcess(process));
|
| + .then((process) => new _WrappedProcess(process));
|
|
|
| -/// A wrapper around [Process] that exposes `dart:async`-style APIs.
|
| -class PubProcess {
|
| - /// The underlying `dart:io` [Process].
|
| +/// A wrapper around [Process] that buffers the stdout and stderr to avoid
|
| +/// running into issue 7218.
|
| +class _WrappedProcess implements Process {
|
| final Process _process;
|
| + final InputStream stderr;
|
| + final InputStream stdout;
|
|
|
| - /// The mutable field for [stdin].
|
| - StreamSink<List<int>> _stdin;
|
| -
|
| - /// The mutable field for [stdinClosed].
|
| - Future _stdinClosed;
|
| -
|
| - /// The mutable field for [stdout].
|
| - ByteStream _stdout;
|
| -
|
| - /// The mutable field for [stderr].
|
| - ByteStream _stderr;
|
| -
|
| - /// The mutable field for [exitCode].
|
| - Future<int> _exitCode;
|
| -
|
| - /// The sink used for passing data to the process's standard input stream.
|
| - /// Errors on this stream are surfaced through [stdinClosed], [stdout],
|
| - /// [stderr], and [exitCode], which are all members of an [ErrorGroup].
|
| - StreamSink<List<int>> get stdin => _stdin;
|
| -
|
| - // TODO(nweiz): write some more sophisticated Future machinery so that this
|
| - // doesn't surface errors from the other streams/futures, but still passes its
|
| - // unhandled errors to them. Right now it's impossible to recover from a stdin
|
| - // error and continue interacting with the process.
|
| - /// A [Future] that completes when [stdin] is closed, either by the user or by
|
| - /// the process itself.
|
| - ///
|
| - /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any
|
| - /// error in process will be passed to it, but won't reach the top-level error
|
| - /// handler unless nothing has handled it.
|
| - Future get stdinClosed => _stdinClosed;
|
| -
|
| - /// The process's standard output stream.
|
| - ///
|
| - /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode],
|
| - /// so any error in process will be passed to it, but won't reach the
|
| - /// top-level error handler unless nothing has handled it.
|
| - ByteStream get stdout => _stdout;
|
| -
|
| - /// The process's standard error stream.
|
| - ///
|
| - /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode],
|
| - /// so any error in process will be passed to it, but won't reach the
|
| - /// top-level error handler unless nothing has handled it.
|
| - ByteStream get stderr => _stderr;
|
| -
|
| - /// A [Future] that will complete to the process's exit code once the process
|
| - /// has finished running.
|
| - ///
|
| - /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so
|
| - /// any error in process will be passed to it, but won't reach the top-level
|
| - /// error handler unless nothing has handled it.
|
| - Future<int> get exitCode => _exitCode;
|
| -
|
| - /// Creates a new [PubProcess] wrapping [process].
|
| - PubProcess(Process process)
|
| - : _process = process {
|
| - var errorGroup = new ErrorGroup();
|
| -
|
| - var pair = consumerToSink(wrapOutputStream(process.stdin));
|
| - _stdin = pair.first;
|
| - _stdinClosed = errorGroup.registerFuture(pair.last);
|
| -
|
| - _stdout = new ByteStream(
|
| - errorGroup.registerStream(wrapInputStream(process.stdout)));
|
| - _stderr = new ByteStream(
|
| - errorGroup.registerStream(wrapInputStream(process.stderr)));
|
| -
|
| - var exitCodeCompleter = new Completer();
|
| - _exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
|
| - _process.onExit = (code) => exitCodeCompleter.complete(code);
|
| + OutputStream get stdin => _process.stdin;
|
| +
|
| + void set onExit(void callback(int exitCode)) {
|
| + _process.onExit = callback;
|
| }
|
|
|
| - /// Sends [signal] to the underlying process.
|
| + _WrappedProcess(Process process)
|
| + : _process = process,
|
| + stderr = _wrapInputStream(process.stderr),
|
| + stdout = _wrapInputStream(process.stdout);
|
| +
|
| bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) =>
|
| _process.kill(signal);
|
| +
|
| + /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source]
|
| + /// input stream. This is useful for spawned processes which will not exit
|
| + /// until their output streams have been drained. TODO(rnystrom): We should
|
| + /// use this logic anywhere we spawn a process.
|
| + static InputStream _wrapInputStream(InputStream source) {
|
| + var sink = new ListInputStream();
|
| + pipeInputToInput(source, sink);
|
| + return sink;
|
| + }
|
| }
|
|
|
| /// Calls [fn] with appropriately modified arguments. [fn] should have the same
|
| @@ -745,7 +768,7 @@ Future withTempDir(Future fn(String path)) {
|
|
|
| /// Extracts a `.tar.gz` file from [stream] to [destination], which can be a
|
| /// directory or a path. Returns whether or not the extraction was successful.
|
| -Future<bool> extractTarGz(Stream<List<int>> stream, destination) {
|
| +Future<bool> extractTarGz(InputStream stream, destination) {
|
| destination = _getPath(destination);
|
|
|
| log.fine("Extracting .tar.gz stream to $destination.");
|
| @@ -754,29 +777,27 @@ Future<bool> extractTarGz(Stream<List<int>> stream, destination) {
|
| return _extractTarGzWindows(stream, destination);
|
| }
|
|
|
| - return startProcess("tar",
|
| - ["--extract", "--gunzip", "--directory", destination]).then((process) {
|
| - // Ignore errors on process.std{out,err}. They'll be passed to
|
| - // process.exitCode, and we don't want them being top-levelled by
|
| - // std{out,err}Sink.
|
| - store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false);
|
| - store(process.stderr.handleError((_) {}), stderrSink, closeSink: false);
|
| - return Future.wait([
|
| - store(stream, process.stdin),
|
| - process.exitCode
|
| - ]);
|
| - }).then((results) {
|
| - var exitCode = results[1];
|
| - if (exitCode != 0) {
|
| - throw "Failed to extract .tar.gz stream to $destination (exit code "
|
| - "$exitCode).";
|
| - }
|
| + var completer = new Completer<int>();
|
| + var processFuture = startProcess("tar",
|
| + ["--extract", "--gunzip", "--directory", destination]);
|
| + processFuture.then((process) {
|
| + process.onExit = (exitCode) => completer.complete(exitCode);
|
| + stream.pipe(process.stdin);
|
| + process.stdout.pipe(stdout, close: false);
|
| + process.stderr.pipe(stderr, close: false);
|
| + }).catchError((e) {
|
| + completer.completeError(e.error, e.stackTrace);
|
| + });
|
| +
|
| + return completer.future.then((exitCode) {
|
| log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode.");
|
| + // TODO(rnystrom): Does anything check this result value? If not, it should
|
| + // throw on a bad exit code.
|
| + return exitCode == 0;
|
| });
|
| }
|
|
|
| -Future<bool> _extractTarGzWindows(Stream<List<int>> stream,
|
| - String destination) {
|
| +Future<bool> _extractTarGzWindows(InputStream stream, String destination) {
|
| // TODO(rnystrom): In the repo's history, there is an older implementation of
|
| // this that does everything in memory by piping streams directly together
|
| // instead of writing out temp files. The code is simpler, but unfortunately,
|
| @@ -839,8 +860,8 @@ Future<bool> _extractTarGzWindows(Stream<List<int>> stream,
|
| /// Create a .tar.gz archive from a list of entries. Each entry can be a
|
| /// [String], [Directory], or [File] object. The root of the archive is
|
| /// considered to be [baseDir], which defaults to the current working directory.
|
| -/// Returns a [ByteStream] that will emit the contents of the archive.
|
| -ByteStream createTarGz(List contents, {baseDir}) {
|
| +/// Returns an [InputStream] that will emit the contents of the archive.
|
| +InputStream createTarGz(List contents, {baseDir}) {
|
| var buffer = new StringBuffer();
|
| buffer.add('Creating .tag.gz stream containing:\n');
|
| contents.forEach((file) => buffer.add('$file\n'));
|
| @@ -848,7 +869,7 @@ ByteStream createTarGz(List contents, {baseDir}) {
|
|
|
| // TODO(nweiz): Propagate errors to the returned stream (including non-zero
|
| // exit codes). See issue 3657.
|
| - var controller = new StreamController<List<int>>();
|
| + var stream = new ListInputStream();
|
|
|
| if (baseDir == null) baseDir = path.current;
|
| baseDir = getFullPath(baseDir);
|
| @@ -867,14 +888,15 @@ ByteStream createTarGz(List contents, {baseDir}) {
|
| // the process choke, so at some point we should save the arguments to a
|
| // file and pass them in via --files-from for tar and -i@filename for 7zip.
|
| startProcess("tar", args).then((process) {
|
| - store(process.stdout, controller);
|
| - }).catchError((e) {
|
| - // We don't have to worry about double-signaling here, since the store()
|
| - // above will only be reached if startProcess succeeds.
|
| - controller.signalError(e.error, e.stackTrace);
|
| - controller.close();
|
| + pipeInputToInput(process.stdout, stream);
|
| +
|
| + // Drain and discard 7zip's stderr. 7zip writes its normal output to
|
| + // stderr. We don't want to show that since it's meaningless.
|
| + // TODO(rnystrom): Should log this and display it if an actual error
|
| + // occurs.
|
| + consumeInputStream(process.stderr);
|
| });
|
| - return new ByteStream(controller.stream);
|
| + return stream;
|
| }
|
|
|
| withTempDir((tempDir) {
|
| @@ -898,20 +920,15 @@ ByteStream createTarGz(List contents, {baseDir}) {
|
| args = ["a", "unused", "-tgzip", "-so", tarFile];
|
| return startProcess(command, args);
|
| }).then((process) {
|
| - // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't
|
| - // want to show that since it's meaningless.
|
| - //
|
| - // TODO(rnystrom): Should log the stderr and display it if an actual error
|
| + // Drain and discard 7zip's stderr. 7zip writes its normal output to
|
| + // stderr. We don't want to show that since it's meaningless.
|
| + // TODO(rnystrom): Should log this and display it if an actual error
|
| // occurs.
|
| - store(process.stdout, controller);
|
| + consumeInputStream(process.stderr);
|
| + return pipeInputToInput(process.stdout, stream);
|
| });
|
| - }).catchError((e) {
|
| - // We don't have to worry about double-signaling here, since the store()
|
| - // above will only be reached if everything succeeds.
|
| - controller.signalError(e.error, e.stackTrace);
|
| - controller.close();
|
| });
|
| - return new ByteStream(controller.stream);
|
| + return stream;
|
| }
|
|
|
| /// Exception thrown when an operation times out.
|
|
|