Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(854)

Unified Diff: utils/pub/io.dart

Issue 12086110: Use the dart:async Stream API thoroughly in Pub. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « utils/pub/hosted_source.dart ('k') | utils/pub/log.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/pub/io.dart
diff --git a/utils/pub/io.dart b/utils/pub/io.dart
index 8243a28190ad8bf2f88177c9f716baf275e05fbf..982c18b3b007d86ca10f9bf53e9cd5781b12e21b 100644
--- a/utils/pub/io.dart
+++ b/utils/pub/io.dart
@@ -12,9 +12,14 @@ import 'dart:json';
import 'dart:uri';
import '../../pkg/path/lib/path.dart' as path;
+import '../../pkg/http/lib/http.dart' show ByteStream;
+import 'error_group.dart';
+import 'exit_codes.dart' as exit_codes;
import 'log.dart' as log;
import 'utils.dart';
+export '../../pkg/http/lib/http.dart' show ByteStream;
+
final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?");
/// Joins a number of path string parts into a single path. Handles
@@ -144,44 +149,15 @@ Future<File> deleteFile(file) {
/// Writes [stream] to a new file at [path], which may be a [String] or a
/// [File]. Will replace any file already at that path. Completes when the file
/// is done being written.
-Future<File> createFileFromStream(InputStream stream, path) {
+Future<File> createFileFromStream(Stream<List<int>> stream, path) {
path = _getPath(path);
log.io("Creating $path from stream.");
- var completer = new Completer<File>();
- var completed = false;
var file = new File(path);
- var outputStream = file.openOutputStream();
- stream.pipe(outputStream);
-
- outputStream.onClosed = () {
+ return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) {
log.fine("Created $path from stream.");
- completed = true;
- completer.complete(file);
- };
-
- // TODO(nweiz): remove this when issue 4061 is fixed.
- var stackTrace;
- try {
- throw "";
- } catch (_, localStackTrace) {
- stackTrace = localStackTrace;
- }
-
- completeError(error) {
- if (!completed) {
- completed = true;
- completer.completeError(error, stackTrace);
- } else {
- log.fine("Got error after stream was closed: $error");
- }
- }
-
- stream.onError = completeError;
- outputStream.onError = completeError;
-
- return completer.future;
+ });
}
/// Creates a directory [dir]. Returns a [Future] that completes when the
@@ -457,8 +433,33 @@ String relativeToPub(String target) {
return path.normalize(join(utilDir, 'pub', target));
}
-/// A StringInputStream reading from stdin.
-final _stringStdin = new StringInputStream(stdin);
+// TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr
+// nicer.
+
+/// A sink that writes to standard output. Errors piped to this stream will be
+/// surfaced to the top-level error handler.
+final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout");
+
+/// A sink that writes to standard error. Errors piped to this stream will be
+/// surfaced to the top-level error handler.
+final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
+
+/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are
+/// logged, and then the program is terminated. [name] is used for debugging.
+StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) {
+ var pair = consumerToSink(wrapOutputStream(stream));
+ pair.last.catchError((e) {
+ // This log may or may not work, depending on how the stream failed. Not
+ // much we can do about that.
+ log.error("Error writing to $name: $e");
+ exit(exit_codes.IO);
+ });
+ return pair.first;
+}
+
+/// A line-by-line stream of standard input.
+final Stream<String> stdinLines =
+ streamToLines(wrapInputStream(stdin).toStringStream());
/// Displays a message and reads a yes/no confirmation from the user. Returns
/// a [Future] that completes to `true` if the user confirms or `false` if they
@@ -468,143 +469,28 @@ final _stringStdin = new StringInputStream(stdin);
/// should just be a fragment like, "Are you sure you want to proceed".
Future<bool> confirm(String message) {
log.fine('Showing confirm message: $message');
- stdout.writeString("$message (y/n)? ");
- return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line));
-}
-
-/// Returns a single line read from a [StringInputStream]. By default, reads
-/// from stdin.
-///
-/// A [StringInputStream] passed to this should have no callbacks registered.
-Future<String> readLine([StringInputStream stream]) {
- if (stream == null) stream = _stringStdin;
- if (stream.closed) return new Future.immediate('');
- void removeCallbacks() {
- stream.onClosed = null;
- stream.onLine = null;
- stream.onError = null;
- }
-
- // TODO(nweiz): remove this when issue 4061 is fixed.
- var stackTrace;
- try {
- throw "";
- } catch (_, localStackTrace) {
- stackTrace = localStackTrace;
- }
-
- var completer = new Completer();
- stream.onClosed = () {
- removeCallbacks();
- completer.complete('');
- };
-
- stream.onLine = () {
- removeCallbacks();
- var line = stream.readLine();
- log.io('Read line: $line');
- completer.complete(line);
- };
-
- stream.onError = (e) {
- removeCallbacks();
- completer.completeError(e, stackTrace);
- };
-
- return completer.future;
-}
-
-/// Takes all input from [source] and writes it to [sink].
-///
-/// Returns a future that completes when [source] is closed.
-Future pipeInputToInput(InputStream source, ListInputStream sink) {
- var completer = new Completer();
- source.onClosed = () {
- sink.markEndOfStream();
- completer.complete(null);
- };
- source.onData = () {
- // Even if the sink is closed and we aren't going to do anything with more
- // data, we still need to drain it from source to work around issue 7218.
- var data = source.read();
- try {
- if (!sink.closed) sink.write(data);
- } on StreamException catch (e, stackTrace) {
- // Ignore an exception to work around issue 4222.
- log.io("Writing to an unclosed ListInputStream caused exception $e\n"
- "$stackTrace");
- }
- };
- // TODO(nweiz): propagate this error to the sink. See issue 3657.
- source.onError = (e) { throw e; };
- return completer.future;
-}
-
-/// Buffers all input from an InputStream and returns it as a future.
-Future<List<int>> consumeInputStream(InputStream stream) {
- if (stream.closed) return new Future.immediate(<int>[]);
-
- // TODO(nweiz): remove this when issue 4061 is fixed.
- var stackTrace;
- try {
- throw "";
- } catch (_, localStackTrace) {
- stackTrace = localStackTrace;
- }
-
- var completer = new Completer<List<int>>();
- var buffer = <int>[];
- stream.onClosed = () => completer.complete(buffer);
- stream.onData = () => buffer.addAll(stream.read());
- stream.onError = (e) => completer.completeError(e, stackTrace);
- return completer.future;
-}
-
-/// Buffers all input from a StringInputStream and returns it as a future.
-Future<String> consumeStringInputStream(StringInputStream stream) {
- if (stream.closed) return new Future.immediate('');
-
- // TODO(nweiz): remove this when issue 4061 is fixed.
- var stackTrace;
- try {
- throw "";
- } catch (_, localStackTrace) {
- stackTrace = localStackTrace;
- }
-
- var completer = new Completer<String>();
- var buffer = new StringBuffer();
- stream.onClosed = () => completer.complete(buffer.toString());
- stream.onData = () => buffer.add(stream.read());
- stream.onError = (e) => completer.completeError(e, stackTrace);
- return completer.future;
+ stdoutSink.add("$message (y/n)? ".charCodes);
+ return streamFirst(stdinLines)
+ .then((line) => new RegExp(r"^[yY]").hasMatch(line));
}
/// Wraps [stream] in a single-subscription [Stream] that emits the same data.
-Stream<List<int>> wrapInputStream(InputStream stream) {
+ByteStream wrapInputStream(InputStream stream) {
var controller = new StreamController();
if (stream.closed) {
controller.close();
- return controller.stream;
+ return new ByteStream(controller.stream);
}
stream.onClosed = controller.close;
stream.onData = () => controller.add(stream.read());
stream.onError = (e) => controller.signalError(new AsyncError(e));
- return controller.stream;
-}
-
-// TODO(nweiz): remove this ASAP (issue 7807).
-/// Wraps [stream] in an [InputStream].
-InputStream streamToInputStream(Stream<List<int>> stream) {
- var inputStream = new ListInputStream();
- stream.listen((chunk) => inputStream.write(chunk),
- onDone: inputStream.markEndOfStream);
- return inputStream;
+ return new ByteStream(controller.stream);
}
/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
-/// using [Stream.pipe].
+/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be
+/// forwarded to the [Future] returned by [Stream.pipe].
StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
new _OutputStreamConsumer(stream);
@@ -632,6 +518,9 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
if (!completed) completer.completeError(e, stack);
completed = true;
}
+ }, onError: (e) {
+ if (!completed) completer.completeError(e.error, e.stackTrace);
+ completed = true;
}, onDone: () => _outputStream.close());
_outputStream.onError = (e) {
@@ -648,6 +537,43 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
}
}
+/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that
+/// will succeed when [StreamSink] is closed or fail with any errors that occur
+/// while writing.
+Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) {
+ var controller = new StreamController();
+ var done = controller.stream.pipe(consumer);
+ return new Pair<StreamSink, Future>(controller.sink, done);
+}
+
+// TODO(nweiz): remove this when issue 7786 is fixed.
+/// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
+/// the returned [Future] is completed and [sink] is closed if [closeSink] is
+/// true.
+///
+/// When an error occurs on [stream], that error is passed to [sink]. If
+/// [unsubscribeOnError] is true, [Future] will be completed successfully and no
+/// more data or errors will be piped from [stream] to [sink]. If
+/// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be
+/// closed.
+Future store(Stream stream, StreamSink sink,
+ {bool unsubscribeOnError: true, closeSink: true}) {
+ var completer = new Completer();
+ stream.listen(sink.add,
+ onError: (e) {
+ sink.signalError(e);
+ if (unsubscribeOnError) {
+ completer.complete();
+ if (closeSink) sink.close();
+ }
+ },
+ onDone: () {
+ if (closeSink) sink.close();
+ completer.complete();
+ }, unsubscribeOnError: unsubscribeOnError);
+ return completer.future;
+}
+
/// Spawns and runs the process located at [executable], passing in [args].
/// Returns a [Future] that will complete with the results of the process after
/// it has ended.
@@ -681,41 +607,92 @@ Future<PubProcessResult> runProcess(String executable, List<String> args,
/// The spawned process will inherit its parent's environment variables. If
/// [environment] is provided, that will be used to augment (not replace) the
/// the inherited variables.
-Future<Process> startProcess(String executable, List<String> args,
+Future<PubProcess> startProcess(String executable, List<String> args,
{workingDir, Map<String, String> environment}) =>
_doProcess(Process.start, executable, args, workingDir, environment)
- .then((process) => new _WrappedProcess(process));
+ .then((process) => new PubProcess(process));
-/// A wrapper around [Process] that buffers the stdout and stderr to avoid
-/// running into issue 7218.
-class _WrappedProcess implements Process {
+/// A wrapper around [Process] that exposes `dart:async`-style APIs.
+class PubProcess {
+ /// The underlying `dart:io` [Process].
final Process _process;
- final InputStream stderr;
- final InputStream stdout;
- OutputStream get stdin => _process.stdin;
-
- void set onExit(void callback(int exitCode)) {
- _process.onExit = callback;
+ /// The mutable field for [stdin].
+ StreamSink<List<int>> _stdin;
+
+ /// The mutable field for [stdinClosed].
+ Future _stdinClosed;
+
+ /// The mutable field for [stdout].
+ ByteStream _stdout;
+
+ /// The mutable field for [stderr].
+ ByteStream _stderr;
+
+ /// The mutable field for [exitCode].
+ Future<int> _exitCode;
+
+ /// The sink used for passing data to the process's standard input stream.
+ /// Errors on this stream are surfaced through [stdinClosed], [stdout],
+ /// [stderr], and [exitCode], which are all members of an [ErrorGroup].
+ StreamSink<List<int>> get stdin => _stdin;
+
+ // TODO(nweiz): write some more sophisticated Future machinery so that this
+ // doesn't surface errors from the other streams/futures, but still passes its
+ // unhandled errors to them. Right now it's impossible to recover from a stdin
+ // error and continue interacting with the process.
+ /// A [Future] that completes when [stdin] is closed, either by the user or by
+ /// the process itself.
+ ///
+ /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any
+ /// error in process will be passed to it, but won't reach the top-level error
+ /// handler unless nothing has handled it.
+ Future get stdinClosed => _stdinClosed;
+
+ /// The process's standard output stream.
+ ///
+ /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode],
+ /// so any error in process will be passed to it, but won't reach the
+ /// top-level error handler unless nothing has handled it.
+ ByteStream get stdout => _stdout;
+
+ /// The process's standard error stream.
+ ///
+ /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode],
+ /// so any error in process will be passed to it, but won't reach the
+ /// top-level error handler unless nothing has handled it.
+ ByteStream get stderr => _stderr;
+
+ /// A [Future] that will complete to the process's exit code once the process
+ /// has finished running.
+ ///
+ /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so
+ /// any error in process will be passed to it, but won't reach the top-level
+ /// error handler unless nothing has handled it.
+ Future<int> get exitCode => _exitCode;
+
+ /// Creates a new [PubProcess] wrapping [process].
+ PubProcess(Process process)
+ : _process = process {
+ var errorGroup = new ErrorGroup();
+
+ var pair = consumerToSink(wrapOutputStream(process.stdin));
+ _stdin = pair.first;
+ _stdinClosed = errorGroup.registerFuture(pair.last);
+
+ _stdout = new ByteStream(
+ errorGroup.registerStream(wrapInputStream(process.stdout)));
+ _stderr = new ByteStream(
+ errorGroup.registerStream(wrapInputStream(process.stderr)));
+
+ var exitCodeCompleter = new Completer();
+ _exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
+ _process.onExit = (code) => exitCodeCompleter.complete(code);
}
- _WrappedProcess(Process process)
- : _process = process,
- stderr = _wrapInputStream(process.stderr),
- stdout = _wrapInputStream(process.stdout);
-
+ /// Sends [signal] to the underlying process.
bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) =>
_process.kill(signal);
-
- /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source]
- /// input stream. This is useful for spawned processes which will not exit
- /// until their output streams have been drained. TODO(rnystrom): We should
- /// use this logic anywhere we spawn a process.
- static InputStream _wrapInputStream(InputStream source) {
- var sink = new ListInputStream();
- pipeInputToInput(source, sink);
- return sink;
- }
}
/// Calls [fn] with appropriately modified arguments. [fn] should have the same
@@ -792,7 +769,7 @@ Future withTempDir(Future fn(String path)) {
/// Extracts a `.tar.gz` file from [stream] to [destination], which can be a
/// directory or a path. Returns whether or not the extraction was successful.
-Future<bool> extractTarGz(InputStream stream, destination) {
+Future<bool> extractTarGz(Stream<List<int>> stream, destination) {
destination = _getPath(destination);
log.fine("Extracting .tar.gz stream to $destination.");
@@ -801,27 +778,29 @@ Future<bool> extractTarGz(InputStream stream, destination) {
return _extractTarGzWindows(stream, destination);
}
- var completer = new Completer<int>();
- var processFuture = startProcess("tar",
- ["--extract", "--gunzip", "--directory", destination]);
- processFuture.then((process) {
- process.onExit = (exitCode) => completer.complete(exitCode);
- stream.pipe(process.stdin);
- process.stdout.pipe(stdout, close: false);
- process.stderr.pipe(stderr, close: false);
- }).catchError((e) {
- completer.completeError(e.error, e.stackTrace);
- });
-
- return completer.future.then((exitCode) {
+ return startProcess("tar",
+ ["--extract", "--gunzip", "--directory", destination]).then((process) {
+ // Ignore errors on process.std{out,err}. They'll be passed to
+ // process.exitCode, and we don't want them being top-levelled by
+ // std{out,err}Sink.
+ store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false);
+ store(process.stderr.handleError((_) {}), stderrSink, closeSink: false);
+ return Future.wait([
+ store(stream, process.stdin),
+ process.exitCode
+ ]);
+ }).then((results) {
+ var exitCode = results[1];
+ if (exitCode != 0) {
+ throw "Failed to extract .tar.gz stream to $destination (exit code "
+ "$exitCode).";
+ }
log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode.");
- // TODO(rnystrom): Does anything check this result value? If not, it should
- // throw on a bad exit code.
- return exitCode == 0;
});
}
-Future<bool> _extractTarGzWindows(InputStream stream, String destination) {
+Future<bool> _extractTarGzWindows(Stream<List<int>> stream,
+ String destination) {
// TODO(rnystrom): In the repo's history, there is an older implementation of
// this that does everything in memory by piping streams directly together
// instead of writing out temp files. The code is simpler, but unfortunately,
@@ -884,8 +863,8 @@ Future<bool> _extractTarGzWindows(InputStream stream, String destination) {
/// Create a .tar.gz archive from a list of entries. Each entry can be a
/// [String], [Directory], or [File] object. The root of the archive is
/// considered to be [baseDir], which defaults to the current working directory.
-/// Returns an [InputStream] that will emit the contents of the archive.
-InputStream createTarGz(List contents, {baseDir}) {
+/// Returns a [ByteStream] that will emit the contents of the archive.
+ByteStream createTarGz(List contents, {baseDir}) {
var buffer = new StringBuffer();
buffer.add('Creating .tag.gz stream containing:\n');
contents.forEach((file) => buffer.add('$file\n'));
@@ -893,7 +872,7 @@ InputStream createTarGz(List contents, {baseDir}) {
// TODO(nweiz): Propagate errors to the returned stream (including non-zero
// exit codes). See issue 3657.
- var stream = new ListInputStream();
+ var controller = new StreamController<List<int>>();
if (baseDir == null) baseDir = path.current;
baseDir = getFullPath(baseDir);
@@ -912,15 +891,14 @@ InputStream createTarGz(List contents, {baseDir}) {
// the process choke, so at some point we should save the arguments to a
// file and pass them in via --files-from for tar and -i@filename for 7zip.
startProcess("tar", args).then((process) {
- pipeInputToInput(process.stdout, stream);
-
- // Drain and discard 7zip's stderr. 7zip writes its normal output to
- // stderr. We don't want to show that since it's meaningless.
- // TODO(rnystrom): Should log this and display it if an actual error
- // occurs.
- consumeInputStream(process.stderr);
+ store(process.stdout, controller);
+ }).catchError((e) {
+ // We don't have to worry about double-signaling here, since the store()
+ // above will only be reached if startProcess succeeds.
+ controller.signalError(e.error, e.stackTrace);
+ controller.close();
});
- return stream;
+ return new ByteStream(controller.stream);
}
withTempDir((tempDir) {
@@ -944,15 +922,20 @@ InputStream createTarGz(List contents, {baseDir}) {
args = ["a", "unused", "-tgzip", "-so", tarFile];
return startProcess(command, args);
}).then((process) {
- // Drain and discard 7zip's stderr. 7zip writes its normal output to
- // stderr. We don't want to show that since it's meaningless.
- // TODO(rnystrom): Should log this and display it if an actual error
+ // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't
+ // want to show that since it's meaningless.
+ //
+ // TODO(rnystrom): Should log the stderr and display it if an actual error
// occurs.
- consumeInputStream(process.stderr);
- return pipeInputToInput(process.stdout, stream);
+ store(process.stdout, controller);
});
+ }).catchError((e) {
+ // We don't have to worry about double-signaling here, since the store()
+ // above will only be reached if everything succeeds.
+ controller.signalError(e.error, e.stackTrace);
+ controller.close();
});
- return stream;
+ return new ByteStream(controller.stream);
}
/// Exception thrown when an operation times out.
« no previous file with comments | « utils/pub/hosted_source.dart ('k') | utils/pub/log.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698