Index: utils/pub/io.dart |
diff --git a/utils/pub/io.dart b/utils/pub/io.dart |
index 60876c8f83f06db30076c59847963d18271c2ec6..79f0a06524cc5b91b5e953fdf03d42ccb5b6ca82 100644 |
--- a/utils/pub/io.dart |
+++ b/utils/pub/io.dart |
@@ -12,14 +12,9 @@ import 'dart:json'; |
import 'dart:uri'; |
import '../../pkg/path/lib/path.dart' as path; |
-import '../../pkg/http/lib/http.dart' show ByteStream; |
-import 'error_group.dart'; |
-import 'exit_codes.dart' as exit_codes; |
import 'log.dart' as log; |
import 'utils.dart'; |
-export '../../pkg/http/lib/http.dart' show ByteStream; |
- |
final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?"); |
/// Joins a number of path string parts into a single path. Handles |
@@ -125,15 +120,44 @@ Future<File> deleteFile(file) { |
/// Writes [stream] to a new file at [path], which may be a [String] or a |
/// [File]. Will replace any file already at that path. Completes when the file |
/// is done being written. |
-Future<File> createFileFromStream(Stream<List<int>> stream, path) { |
+Future<File> createFileFromStream(InputStream stream, path) { |
path = _getPath(path); |
log.io("Creating $path from stream."); |
+ var completer = new Completer<File>(); |
+ var completed = false; |
var file = new File(path); |
- return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) { |
+ var outputStream = file.openOutputStream(); |
+ stream.pipe(outputStream); |
+ |
+ outputStream.onClosed = () { |
log.fine("Created $path from stream."); |
- }); |
+ completed = true; |
+ completer.complete(file); |
+ }; |
+ |
+ // TODO(nweiz): remove this when issue 4061 is fixed. |
+ var stackTrace; |
+ try { |
+ throw ""; |
+ } catch (_, localStackTrace) { |
+ stackTrace = localStackTrace; |
+ } |
+ |
+ completeError(error) { |
+ if (!completed) { |
+ completed = true; |
+ completer.completeError(error, stackTrace); |
+ } else { |
+ log.fine("Got error after stream was closed: $error"); |
+ } |
+ } |
+ |
+ stream.onError = completeError; |
+ outputStream.onError = completeError; |
+ |
+ return completer.future; |
} |
/// Creates a directory [dir]. Returns a [Future] that completes when the |
@@ -409,33 +433,8 @@ String relativeToPub(String target) { |
return path.normalize(join(utilDir, 'pub', target)); |
} |
-// TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr |
-// nicer. |
- |
-/// A sink that writes to standard output. Errors piped to this stream will be |
-/// surfaced to the top-level error handler. |
-final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout"); |
- |
-/// A sink that writes to standard error. Errors piped to this stream will be |
-/// surfaced to the top-level error handler. |
-final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); |
- |
-/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are |
-/// logged, and then the program is terminated. [name] is used for debugging. |
-StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { |
- var pair = consumerToSink(wrapOutputStream(stream)); |
- pair.last.catchError((e) { |
- // This log may or may not work, depending on how the stream failed. Not |
- // much we can do about that. |
- log.error("Error writing to $name: $e"); |
- exit(exit_codes.IO); |
- }); |
- return pair.first; |
-} |
- |
-/// A line-by-line stream of standard input. |
-final Stream<String> stdinLines = |
- streamToLines(wrapInputStream(stdin).toStringStream()); |
+/// A StringInputStream reading from stdin. |
+final _stringStdin = new StringInputStream(stdin); |
/// Displays a message and reads a yes/no confirmation from the user. Returns |
/// a [Future] that completes to `true` if the user confirms or `false` if they |
@@ -445,28 +444,143 @@ final Stream<String> stdinLines = |
/// should just be a fragment like, "Are you sure you want to proceed". |
Future<bool> confirm(String message) { |
log.fine('Showing confirm message: $message'); |
- stdoutSink.add("$message (y/n)? ".charCodes); |
- return streamFirst(stdinLines) |
- .then((line) => new RegExp(r"^[yY]").hasMatch(line)); |
+ stdout.writeString("$message (y/n)? "); |
+ return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line)); |
+} |
+ |
+/// Returns a single line read from a [StringInputStream]. By default, reads |
+/// from stdin. |
+/// |
+/// A [StringInputStream] passed to this should have no callbacks registered. |
+Future<String> readLine([StringInputStream stream]) { |
+ if (stream == null) stream = _stringStdin; |
+ if (stream.closed) return new Future.immediate(''); |
+ void removeCallbacks() { |
+ stream.onClosed = null; |
+ stream.onLine = null; |
+ stream.onError = null; |
+ } |
+ |
+ // TODO(nweiz): remove this when issue 4061 is fixed. |
+ var stackTrace; |
+ try { |
+ throw ""; |
+ } catch (_, localStackTrace) { |
+ stackTrace = localStackTrace; |
+ } |
+ |
+ var completer = new Completer(); |
+ stream.onClosed = () { |
+ removeCallbacks(); |
+ completer.complete(''); |
+ }; |
+ |
+ stream.onLine = () { |
+ removeCallbacks(); |
+ var line = stream.readLine(); |
+ log.io('Read line: $line'); |
+ completer.complete(line); |
+ }; |
+ |
+ stream.onError = (e) { |
+ removeCallbacks(); |
+ completer.completeError(e, stackTrace); |
+ }; |
+ |
+ return completer.future; |
+} |
+ |
+/// Takes all input from [source] and writes it to [sink]. |
+/// |
+/// Returns a future that completes when [source] is closed. |
+Future pipeInputToInput(InputStream source, ListInputStream sink) { |
+ var completer = new Completer(); |
+ source.onClosed = () { |
+ sink.markEndOfStream(); |
+ completer.complete(null); |
+ }; |
+ source.onData = () { |
+ // Even if the sink is closed and we aren't going to do anything with more |
+ // data, we still need to drain it from source to work around issue 7218. |
+ var data = source.read(); |
+ try { |
+ if (!sink.closed) sink.write(data); |
+ } on StreamException catch (e, stackTrace) { |
+ // Ignore an exception to work around issue 4222. |
+ log.io("Writing to an unclosed ListInputStream caused exception $e\n" |
+ "$stackTrace"); |
+ } |
+ }; |
+ // TODO(nweiz): propagate this error to the sink. See issue 3657. |
+ source.onError = (e) { throw e; }; |
+ return completer.future; |
+} |
+ |
+/// Buffers all input from an InputStream and returns it as a future. |
+Future<List<int>> consumeInputStream(InputStream stream) { |
+ if (stream.closed) return new Future.immediate(<int>[]); |
+ |
+ // TODO(nweiz): remove this when issue 4061 is fixed. |
+ var stackTrace; |
+ try { |
+ throw ""; |
+ } catch (_, localStackTrace) { |
+ stackTrace = localStackTrace; |
+ } |
+ |
+ var completer = new Completer<List<int>>(); |
+ var buffer = <int>[]; |
+ stream.onClosed = () => completer.complete(buffer); |
+ stream.onData = () => buffer.addAll(stream.read()); |
+ stream.onError = (e) => completer.completeError(e, stackTrace); |
+ return completer.future; |
+} |
+ |
+/// Buffers all input from a StringInputStream and returns it as a future. |
+Future<String> consumeStringInputStream(StringInputStream stream) { |
+ if (stream.closed) return new Future.immediate(''); |
+ |
+ // TODO(nweiz): remove this when issue 4061 is fixed. |
+ var stackTrace; |
+ try { |
+ throw ""; |
+ } catch (_, localStackTrace) { |
+ stackTrace = localStackTrace; |
+ } |
+ |
+ var completer = new Completer<String>(); |
+ var buffer = new StringBuffer(); |
+ stream.onClosed = () => completer.complete(buffer.toString()); |
+ stream.onData = () => buffer.add(stream.read()); |
+ stream.onError = (e) => completer.completeError(e, stackTrace); |
+ return completer.future; |
} |
/// Wraps [stream] in a single-subscription [Stream] that emits the same data. |
-ByteStream wrapInputStream(InputStream stream) { |
+Stream<List<int>> wrapInputStream(InputStream stream) { |
var controller = new StreamController(); |
if (stream.closed) { |
controller.close(); |
- return new ByteStream(controller.stream); |
+ return controller.stream; |
} |
stream.onClosed = controller.close; |
stream.onData = () => controller.add(stream.read()); |
stream.onError = (e) => controller.signalError(new AsyncError(e)); |
- return new ByteStream(controller.stream); |
+ return controller.stream; |
+} |
+ |
+// TODO(nweiz): remove this ASAP (issue 7807). |
+/// Wraps [stream] in an [InputStream]. |
+InputStream streamToInputStream(Stream<List<int>> stream) { |
+ var inputStream = new ListInputStream(); |
+ stream.listen((chunk) => inputStream.write(chunk), |
+ onDone: inputStream.markEndOfStream); |
+ return inputStream; |
} |
/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
-/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be |
-/// forwarded to the [Future] returned by [Stream.pipe]. |
+/// using [Stream.pipe]. |
StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
new _OutputStreamConsumer(stream); |
@@ -494,9 +608,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
if (!completed) completer.completeError(e, stack); |
completed = true; |
} |
- }, onError: (e) { |
- if (!completed) completer.completeError(e.error, e.stackTrace); |
- completed = true; |
}, onDone: () => _outputStream.close()); |
_outputStream.onError = (e) { |
@@ -513,43 +624,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
} |
} |
-/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that |
-/// will succeed when [StreamSink] is closed or fail with any errors that occur |
-/// while writing. |
-Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) { |
- var controller = new StreamController(); |
- var done = controller.stream.pipe(consumer); |
- return new Pair<StreamSink, Future>(controller.sink, done); |
-} |
- |
-// TODO(nweiz): remove this when issue 7786 is fixed. |
-/// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
-/// the returned [Future] is completed and [sink] is closed if [closeSink] is |
-/// true. |
-/// |
-/// When an error occurs on [stream], that error is passed to [sink]. If |
-/// [unsubscribeOnError] is true, [Future] will be completed successfully and no |
-/// more data or errors will be piped from [stream] to [sink]. If |
-/// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be |
-/// closed. |
-Future store(Stream stream, StreamSink sink, |
- {bool unsubscribeOnError: true, closeSink: true}) { |
- var completer = new Completer(); |
- stream.listen(sink.add, |
- onError: (e) { |
- sink.signalError(e); |
- if (unsubscribeOnError) { |
- completer.complete(); |
- if (closeSink) sink.close(); |
- } |
- }, |
- onDone: () { |
- if (closeSink) sink.close(); |
- completer.complete(); |
- }, unsubscribeOnError: unsubscribeOnError); |
- return completer.future; |
-} |
- |
/// Spawns and runs the process located at [executable], passing in [args]. |
/// Returns a [Future] that will complete with the results of the process after |
/// it has ended. |
@@ -583,92 +657,41 @@ Future<PubProcessResult> runProcess(String executable, List<String> args, |
/// The spawned process will inherit its parent's environment variables. If |
/// [environment] is provided, that will be used to augment (not replace) the |
/// the inherited variables. |
-Future<PubProcess> startProcess(String executable, List<String> args, |
+Future<Process> startProcess(String executable, List<String> args, |
{workingDir, Map<String, String> environment}) => |
_doProcess(Process.start, executable, args, workingDir, environment) |
- .then((process) => new PubProcess(process)); |
+ .then((process) => new _WrappedProcess(process)); |
-/// A wrapper around [Process] that exposes `dart:async`-style APIs. |
-class PubProcess { |
- /// The underlying `dart:io` [Process]. |
+/// A wrapper around [Process] that buffers the stdout and stderr to avoid |
+/// running into issue 7218. |
+class _WrappedProcess implements Process { |
final Process _process; |
+ final InputStream stderr; |
+ final InputStream stdout; |
- /// The mutable field for [stdin]. |
- StreamSink<List<int>> _stdin; |
- |
- /// The mutable field for [stdinClosed]. |
- Future _stdinClosed; |
- |
- /// The mutable field for [stdout]. |
- ByteStream _stdout; |
- |
- /// The mutable field for [stderr]. |
- ByteStream _stderr; |
- |
- /// The mutable field for [exitCode]. |
- Future<int> _exitCode; |
- |
- /// The sink used for passing data to the process's standard input stream. |
- /// Errors on this stream are surfaced through [stdinClosed], [stdout], |
- /// [stderr], and [exitCode], which are all members of an [ErrorGroup]. |
- StreamSink<List<int>> get stdin => _stdin; |
- |
- // TODO(nweiz): write some more sophisticated Future machinery so that this |
- // doesn't surface errors from the other streams/futures, but still passes its |
- // unhandled errors to them. Right now it's impossible to recover from a stdin |
- // error and continue interacting with the process. |
- /// A [Future] that completes when [stdin] is closed, either by the user or by |
- /// the process itself. |
- /// |
- /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any |
- /// error in process will be passed to it, but won't reach the top-level error |
- /// handler unless nothing has handled it. |
- Future get stdinClosed => _stdinClosed; |
- |
- /// The process's standard output stream. |
- /// |
- /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode], |
- /// so any error in process will be passed to it, but won't reach the |
- /// top-level error handler unless nothing has handled it. |
- ByteStream get stdout => _stdout; |
- |
- /// The process's standard error stream. |
- /// |
- /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode], |
- /// so any error in process will be passed to it, but won't reach the |
- /// top-level error handler unless nothing has handled it. |
- ByteStream get stderr => _stderr; |
- |
- /// A [Future] that will complete to the process's exit code once the process |
- /// has finished running. |
- /// |
- /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so |
- /// any error in process will be passed to it, but won't reach the top-level |
- /// error handler unless nothing has handled it. |
- Future<int> get exitCode => _exitCode; |
- |
- /// Creates a new [PubProcess] wrapping [process]. |
- PubProcess(Process process) |
- : _process = process { |
- var errorGroup = new ErrorGroup(); |
- |
- var pair = consumerToSink(wrapOutputStream(process.stdin)); |
- _stdin = pair.first; |
- _stdinClosed = errorGroup.registerFuture(pair.last); |
- |
- _stdout = new ByteStream( |
- errorGroup.registerStream(wrapInputStream(process.stdout))); |
- _stderr = new ByteStream( |
- errorGroup.registerStream(wrapInputStream(process.stderr))); |
- |
- var exitCodeCompleter = new Completer(); |
- _exitCode = errorGroup.registerFuture(exitCodeCompleter.future); |
- _process.onExit = (code) => exitCodeCompleter.complete(code); |
+ OutputStream get stdin => _process.stdin; |
+ |
+ void set onExit(void callback(int exitCode)) { |
+ _process.onExit = callback; |
} |
- /// Sends [signal] to the underlying process. |
+ _WrappedProcess(Process process) |
+ : _process = process, |
+ stderr = _wrapInputStream(process.stderr), |
+ stdout = _wrapInputStream(process.stdout); |
+ |
bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => |
_process.kill(signal); |
+ |
+ /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] |
+ /// input stream. This is useful for spawned processes which will not exit |
+ /// until their output streams have been drained. TODO(rnystrom): We should |
+ /// use this logic anywhere we spawn a process. |
+ static InputStream _wrapInputStream(InputStream source) { |
+ var sink = new ListInputStream(); |
+ pipeInputToInput(source, sink); |
+ return sink; |
+ } |
} |
/// Calls [fn] with appropriately modified arguments. [fn] should have the same |
@@ -745,7 +768,7 @@ Future withTempDir(Future fn(String path)) { |
/// Extracts a `.tar.gz` file from [stream] to [destination], which can be a |
/// directory or a path. Returns whether or not the extraction was successful. |
-Future<bool> extractTarGz(Stream<List<int>> stream, destination) { |
+Future<bool> extractTarGz(InputStream stream, destination) { |
destination = _getPath(destination); |
log.fine("Extracting .tar.gz stream to $destination."); |
@@ -754,29 +777,27 @@ Future<bool> extractTarGz(Stream<List<int>> stream, destination) { |
return _extractTarGzWindows(stream, destination); |
} |
- return startProcess("tar", |
- ["--extract", "--gunzip", "--directory", destination]).then((process) { |
- // Ignore errors on process.std{out,err}. They'll be passed to |
- // process.exitCode, and we don't want them being top-levelled by |
- // std{out,err}Sink. |
- store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false); |
- store(process.stderr.handleError((_) {}), stderrSink, closeSink: false); |
- return Future.wait([ |
- store(stream, process.stdin), |
- process.exitCode |
- ]); |
- }).then((results) { |
- var exitCode = results[1]; |
- if (exitCode != 0) { |
- throw "Failed to extract .tar.gz stream to $destination (exit code " |
- "$exitCode)."; |
- } |
+ var completer = new Completer<int>(); |
+ var processFuture = startProcess("tar", |
+ ["--extract", "--gunzip", "--directory", destination]); |
+ processFuture.then((process) { |
+ process.onExit = (exitCode) => completer.complete(exitCode); |
+ stream.pipe(process.stdin); |
+ process.stdout.pipe(stdout, close: false); |
+ process.stderr.pipe(stderr, close: false); |
+ }).catchError((e) { |
+ completer.completeError(e.error, e.stackTrace); |
+ }); |
+ |
+ return completer.future.then((exitCode) { |
log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode."); |
+ // TODO(rnystrom): Does anything check this result value? If not, it should |
+ // throw on a bad exit code. |
+ return exitCode == 0; |
}); |
} |
-Future<bool> _extractTarGzWindows(Stream<List<int>> stream, |
- String destination) { |
+Future<bool> _extractTarGzWindows(InputStream stream, String destination) { |
// TODO(rnystrom): In the repo's history, there is an older implementation of |
// this that does everything in memory by piping streams directly together |
// instead of writing out temp files. The code is simpler, but unfortunately, |
@@ -839,8 +860,8 @@ Future<bool> _extractTarGzWindows(Stream<List<int>> stream, |
/// Create a .tar.gz archive from a list of entries. Each entry can be a |
/// [String], [Directory], or [File] object. The root of the archive is |
/// considered to be [baseDir], which defaults to the current working directory. |
-/// Returns a [ByteStream] that will emit the contents of the archive. |
-ByteStream createTarGz(List contents, {baseDir}) { |
+/// Returns an [InputStream] that will emit the contents of the archive. |
+InputStream createTarGz(List contents, {baseDir}) { |
var buffer = new StringBuffer(); |
buffer.add('Creating .tag.gz stream containing:\n'); |
contents.forEach((file) => buffer.add('$file\n')); |
@@ -848,7 +869,7 @@ ByteStream createTarGz(List contents, {baseDir}) { |
// TODO(nweiz): Propagate errors to the returned stream (including non-zero |
// exit codes). See issue 3657. |
- var controller = new StreamController<List<int>>(); |
+ var stream = new ListInputStream(); |
if (baseDir == null) baseDir = path.current; |
baseDir = getFullPath(baseDir); |
@@ -867,14 +888,15 @@ ByteStream createTarGz(List contents, {baseDir}) { |
// the process choke, so at some point we should save the arguments to a |
// file and pass them in via --files-from for tar and -i@filename for 7zip. |
startProcess("tar", args).then((process) { |
- store(process.stdout, controller); |
- }).catchError((e) { |
- // We don't have to worry about double-signaling here, since the store() |
- // above will only be reached if startProcess succeeds. |
- controller.signalError(e.error, e.stackTrace); |
- controller.close(); |
+ pipeInputToInput(process.stdout, stream); |
+ |
+ // Drain and discard 7zip's stderr. 7zip writes its normal output to |
+ // stderr. We don't want to show that since it's meaningless. |
+ // TODO(rnystrom): Should log this and display it if an actual error |
+ // occurs. |
+ consumeInputStream(process.stderr); |
}); |
- return new ByteStream(controller.stream); |
+ return stream; |
} |
withTempDir((tempDir) { |
@@ -898,20 +920,15 @@ ByteStream createTarGz(List contents, {baseDir}) { |
args = ["a", "unused", "-tgzip", "-so", tarFile]; |
return startProcess(command, args); |
}).then((process) { |
- // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't |
- // want to show that since it's meaningless. |
- // |
- // TODO(rnystrom): Should log the stderr and display it if an actual error |
+ // Drain and discard 7zip's stderr. 7zip writes its normal output to |
+ // stderr. We don't want to show that since it's meaningless. |
+ // TODO(rnystrom): Should log this and display it if an actual error |
// occurs. |
- store(process.stdout, controller); |
+ consumeInputStream(process.stderr); |
+ return pipeInputToInput(process.stdout, stream); |
}); |
- }).catchError((e) { |
- // We don't have to worry about double-signaling here, since the store() |
- // above will only be reached if everything succeeds. |
- controller.signalError(e.error, e.stackTrace); |
- controller.close(); |
}); |
- return new ByteStream(controller.stream); |
+ return stream; |
} |
/// Exception thrown when an operation times out. |