Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(298)

Unified Diff: utils/pub/io.dart

Issue 12095050: Roll back Pub stream changes. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Actually roll back changes Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « utils/pub/hosted_source.dart ('k') | utils/pub/log.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/pub/io.dart
diff --git a/utils/pub/io.dart b/utils/pub/io.dart
index 60876c8f83f06db30076c59847963d18271c2ec6..79f0a06524cc5b91b5e953fdf03d42ccb5b6ca82 100644
--- a/utils/pub/io.dart
+++ b/utils/pub/io.dart
@@ -12,14 +12,9 @@ import 'dart:json';
import 'dart:uri';
import '../../pkg/path/lib/path.dart' as path;
-import '../../pkg/http/lib/http.dart' show ByteStream;
-import 'error_group.dart';
-import 'exit_codes.dart' as exit_codes;
import 'log.dart' as log;
import 'utils.dart';
-export '../../pkg/http/lib/http.dart' show ByteStream;
-
final NEWLINE_PATTERN = new RegExp("\r\n?|\n\r?");
/// Joins a number of path string parts into a single path. Handles
@@ -125,15 +120,44 @@ Future<File> deleteFile(file) {
/// Writes [stream] to a new file at [path], which may be a [String] or a
/// [File]. Will replace any file already at that path. Completes when the file
/// is done being written.
-Future<File> createFileFromStream(Stream<List<int>> stream, path) {
+Future<File> createFileFromStream(InputStream stream, path) {
path = _getPath(path);
log.io("Creating $path from stream.");
+ var completer = new Completer<File>();
+ var completed = false;
var file = new File(path);
- return stream.pipe(wrapOutputStream(file.openOutputStream())).then((_) {
+ var outputStream = file.openOutputStream();
+ stream.pipe(outputStream);
+
+ outputStream.onClosed = () {
log.fine("Created $path from stream.");
- });
+ completed = true;
+ completer.complete(file);
+ };
+
+ // TODO(nweiz): remove this when issue 4061 is fixed.
+ var stackTrace;
+ try {
+ throw "";
+ } catch (_, localStackTrace) {
+ stackTrace = localStackTrace;
+ }
+
+ completeError(error) {
+ if (!completed) {
+ completed = true;
+ completer.completeError(error, stackTrace);
+ } else {
+ log.fine("Got error after stream was closed: $error");
+ }
+ }
+
+ stream.onError = completeError;
+ outputStream.onError = completeError;
+
+ return completer.future;
}
/// Creates a directory [dir]. Returns a [Future] that completes when the
@@ -409,33 +433,8 @@ String relativeToPub(String target) {
return path.normalize(join(utilDir, 'pub', target));
}
-// TODO(nweiz): add a ByteSink wrapper to make writing strings to stdout/stderr
-// nicer.
-
-/// A sink that writes to standard output. Errors piped to this stream will be
-/// surfaced to the top-level error handler.
-final StreamSink<List<int>> stdoutSink = _wrapStdio(stdout, "stdout");
-
-/// A sink that writes to standard error. Errors piped to this stream will be
-/// surfaced to the top-level error handler.
-final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
-
-/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are
-/// logged, and then the program is terminated. [name] is used for debugging.
-StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) {
- var pair = consumerToSink(wrapOutputStream(stream));
- pair.last.catchError((e) {
- // This log may or may not work, depending on how the stream failed. Not
- // much we can do about that.
- log.error("Error writing to $name: $e");
- exit(exit_codes.IO);
- });
- return pair.first;
-}
-
-/// A line-by-line stream of standard input.
-final Stream<String> stdinLines =
- streamToLines(wrapInputStream(stdin).toStringStream());
+/// A StringInputStream reading from stdin.
+final _stringStdin = new StringInputStream(stdin);
/// Displays a message and reads a yes/no confirmation from the user. Returns
/// a [Future] that completes to `true` if the user confirms or `false` if they
@@ -445,28 +444,143 @@ final Stream<String> stdinLines =
/// should just be a fragment like, "Are you sure you want to proceed".
Future<bool> confirm(String message) {
log.fine('Showing confirm message: $message');
- stdoutSink.add("$message (y/n)? ".charCodes);
- return streamFirst(stdinLines)
- .then((line) => new RegExp(r"^[yY]").hasMatch(line));
+ stdout.writeString("$message (y/n)? ");
+ return readLine().then((line) => new RegExp(r"^[yY]").hasMatch(line));
+}
+
+/// Returns a single line read from a [StringInputStream]. By default, reads
+/// from stdin.
+///
+/// A [StringInputStream] passed to this should have no callbacks registered.
+Future<String> readLine([StringInputStream stream]) {
+ if (stream == null) stream = _stringStdin;
+ if (stream.closed) return new Future.immediate('');
+ void removeCallbacks() {
+ stream.onClosed = null;
+ stream.onLine = null;
+ stream.onError = null;
+ }
+
+ // TODO(nweiz): remove this when issue 4061 is fixed.
+ var stackTrace;
+ try {
+ throw "";
+ } catch (_, localStackTrace) {
+ stackTrace = localStackTrace;
+ }
+
+ var completer = new Completer();
+ stream.onClosed = () {
+ removeCallbacks();
+ completer.complete('');
+ };
+
+ stream.onLine = () {
+ removeCallbacks();
+ var line = stream.readLine();
+ log.io('Read line: $line');
+ completer.complete(line);
+ };
+
+ stream.onError = (e) {
+ removeCallbacks();
+ completer.completeError(e, stackTrace);
+ };
+
+ return completer.future;
+}
+
+/// Takes all input from [source] and writes it to [sink].
+///
+/// Returns a future that completes when [source] is closed.
+Future pipeInputToInput(InputStream source, ListInputStream sink) {
+ var completer = new Completer();
+ source.onClosed = () {
+ sink.markEndOfStream();
+ completer.complete(null);
+ };
+ source.onData = () {
+ // Even if the sink is closed and we aren't going to do anything with more
+ // data, we still need to drain it from source to work around issue 7218.
+ var data = source.read();
+ try {
+ if (!sink.closed) sink.write(data);
+ } on StreamException catch (e, stackTrace) {
+ // Ignore an exception to work around issue 4222.
+ log.io("Writing to an unclosed ListInputStream caused exception $e\n"
+ "$stackTrace");
+ }
+ };
+ // TODO(nweiz): propagate this error to the sink. See issue 3657.
+ source.onError = (e) { throw e; };
+ return completer.future;
+}
+
+/// Buffers all input from an InputStream and returns it as a future.
+Future<List<int>> consumeInputStream(InputStream stream) {
+ if (stream.closed) return new Future.immediate(<int>[]);
+
+ // TODO(nweiz): remove this when issue 4061 is fixed.
+ var stackTrace;
+ try {
+ throw "";
+ } catch (_, localStackTrace) {
+ stackTrace = localStackTrace;
+ }
+
+ var completer = new Completer<List<int>>();
+ var buffer = <int>[];
+ stream.onClosed = () => completer.complete(buffer);
+ stream.onData = () => buffer.addAll(stream.read());
+ stream.onError = (e) => completer.completeError(e, stackTrace);
+ return completer.future;
+}
+
+/// Buffers all input from a StringInputStream and returns it as a future.
+Future<String> consumeStringInputStream(StringInputStream stream) {
+ if (stream.closed) return new Future.immediate('');
+
+ // TODO(nweiz): remove this when issue 4061 is fixed.
+ var stackTrace;
+ try {
+ throw "";
+ } catch (_, localStackTrace) {
+ stackTrace = localStackTrace;
+ }
+
+ var completer = new Completer<String>();
+ var buffer = new StringBuffer();
+ stream.onClosed = () => completer.complete(buffer.toString());
+ stream.onData = () => buffer.add(stream.read());
+ stream.onError = (e) => completer.completeError(e, stackTrace);
+ return completer.future;
}
/// Wraps [stream] in a single-subscription [Stream] that emits the same data.
-ByteStream wrapInputStream(InputStream stream) {
+Stream<List<int>> wrapInputStream(InputStream stream) {
var controller = new StreamController();
if (stream.closed) {
controller.close();
- return new ByteStream(controller.stream);
+ return controller.stream;
}
stream.onClosed = controller.close;
stream.onData = () => controller.add(stream.read());
stream.onError = (e) => controller.signalError(new AsyncError(e));
- return new ByteStream(controller.stream);
+ return controller.stream;
+}
+
+// TODO(nweiz): remove this ASAP (issue 7807).
+/// Wraps [stream] in an [InputStream].
+InputStream streamToInputStream(Stream<List<int>> stream) {
+ var inputStream = new ListInputStream();
+ stream.listen((chunk) => inputStream.write(chunk),
+ onDone: inputStream.markEndOfStream);
+ return inputStream;
}
/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
-/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be
-/// forwarded to the [Future] returned by [Stream.pipe].
+/// using [Stream.pipe].
StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
new _OutputStreamConsumer(stream);
@@ -494,9 +608,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
if (!completed) completer.completeError(e, stack);
completed = true;
}
- }, onError: (e) {
- if (!completed) completer.completeError(e.error, e.stackTrace);
- completed = true;
}, onDone: () => _outputStream.close());
_outputStream.onError = (e) {
@@ -513,43 +624,6 @@ class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
}
}
-/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that
-/// will succeed when [StreamSink] is closed or fail with any errors that occur
-/// while writing.
-Pair<StreamSink, Future> consumerToSink(StreamConsumer consumer) {
- var controller = new StreamController();
- var done = controller.stream.pipe(consumer);
- return new Pair<StreamSink, Future>(controller.sink, done);
-}
-
-// TODO(nweiz): remove this when issue 7786 is fixed.
-/// Pipes all data and errors from [stream] into [sink]. When [stream] is done,
-/// the returned [Future] is completed and [sink] is closed if [closeSink] is
-/// true.
-///
-/// When an error occurs on [stream], that error is passed to [sink]. If
-/// [unsubscribeOnError] is true, [Future] will be completed successfully and no
-/// more data or errors will be piped from [stream] to [sink]. If
-/// [unsubscribeOnError] and [closeSink] are both true, [sink] will then be
-/// closed.
-Future store(Stream stream, StreamSink sink,
- {bool unsubscribeOnError: true, closeSink: true}) {
- var completer = new Completer();
- stream.listen(sink.add,
- onError: (e) {
- sink.signalError(e);
- if (unsubscribeOnError) {
- completer.complete();
- if (closeSink) sink.close();
- }
- },
- onDone: () {
- if (closeSink) sink.close();
- completer.complete();
- }, unsubscribeOnError: unsubscribeOnError);
- return completer.future;
-}
-
/// Spawns and runs the process located at [executable], passing in [args].
/// Returns a [Future] that will complete with the results of the process after
/// it has ended.
@@ -583,92 +657,41 @@ Future<PubProcessResult> runProcess(String executable, List<String> args,
/// The spawned process will inherit its parent's environment variables. If
/// [environment] is provided, that will be used to augment (not replace) the
/// the inherited variables.
-Future<PubProcess> startProcess(String executable, List<String> args,
+Future<Process> startProcess(String executable, List<String> args,
{workingDir, Map<String, String> environment}) =>
_doProcess(Process.start, executable, args, workingDir, environment)
- .then((process) => new PubProcess(process));
+ .then((process) => new _WrappedProcess(process));
-/// A wrapper around [Process] that exposes `dart:async`-style APIs.
-class PubProcess {
- /// The underlying `dart:io` [Process].
+/// A wrapper around [Process] that buffers the stdout and stderr to avoid
+/// running into issue 7218.
+class _WrappedProcess implements Process {
final Process _process;
+ final InputStream stderr;
+ final InputStream stdout;
- /// The mutable field for [stdin].
- StreamSink<List<int>> _stdin;
-
- /// The mutable field for [stdinClosed].
- Future _stdinClosed;
-
- /// The mutable field for [stdout].
- ByteStream _stdout;
-
- /// The mutable field for [stderr].
- ByteStream _stderr;
-
- /// The mutable field for [exitCode].
- Future<int> _exitCode;
-
- /// The sink used for passing data to the process's standard input stream.
- /// Errors on this stream are surfaced through [stdinClosed], [stdout],
- /// [stderr], and [exitCode], which are all members of an [ErrorGroup].
- StreamSink<List<int>> get stdin => _stdin;
-
- // TODO(nweiz): write some more sophisticated Future machinery so that this
- // doesn't surface errors from the other streams/futures, but still passes its
- // unhandled errors to them. Right now it's impossible to recover from a stdin
- // error and continue interacting with the process.
- /// A [Future] that completes when [stdin] is closed, either by the user or by
- /// the process itself.
- ///
- /// This is in an [ErrorGroup] with [stdout], [stderr], and [exitCode], so any
- /// error in process will be passed to it, but won't reach the top-level error
- /// handler unless nothing has handled it.
- Future get stdinClosed => _stdinClosed;
-
- /// The process's standard output stream.
- ///
- /// This is in an [ErrorGroup] with [stdinClosed], [stderr], and [exitCode],
- /// so any error in process will be passed to it, but won't reach the
- /// top-level error handler unless nothing has handled it.
- ByteStream get stdout => _stdout;
-
- /// The process's standard error stream.
- ///
- /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [exitCode],
- /// so any error in process will be passed to it, but won't reach the
- /// top-level error handler unless nothing has handled it.
- ByteStream get stderr => _stderr;
-
- /// A [Future] that will complete to the process's exit code once the process
- /// has finished running.
- ///
- /// This is in an [ErrorGroup] with [stdinClosed], [stdout], and [stderr], so
- /// any error in process will be passed to it, but won't reach the top-level
- /// error handler unless nothing has handled it.
- Future<int> get exitCode => _exitCode;
-
- /// Creates a new [PubProcess] wrapping [process].
- PubProcess(Process process)
- : _process = process {
- var errorGroup = new ErrorGroup();
-
- var pair = consumerToSink(wrapOutputStream(process.stdin));
- _stdin = pair.first;
- _stdinClosed = errorGroup.registerFuture(pair.last);
-
- _stdout = new ByteStream(
- errorGroup.registerStream(wrapInputStream(process.stdout)));
- _stderr = new ByteStream(
- errorGroup.registerStream(wrapInputStream(process.stderr)));
-
- var exitCodeCompleter = new Completer();
- _exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
- _process.onExit = (code) => exitCodeCompleter.complete(code);
+ OutputStream get stdin => _process.stdin;
+
+ void set onExit(void callback(int exitCode)) {
+ _process.onExit = callback;
}
- /// Sends [signal] to the underlying process.
+ _WrappedProcess(Process process)
+ : _process = process,
+ stderr = _wrapInputStream(process.stderr),
+ stdout = _wrapInputStream(process.stdout);
+
bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) =>
_process.kill(signal);
+
+ /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source]
+ /// input stream. This is useful for spawned processes which will not exit
+ /// until their output streams have been drained. TODO(rnystrom): We should
+ /// use this logic anywhere we spawn a process.
+ static InputStream _wrapInputStream(InputStream source) {
+ var sink = new ListInputStream();
+ pipeInputToInput(source, sink);
+ return sink;
+ }
}
/// Calls [fn] with appropriately modified arguments. [fn] should have the same
@@ -745,7 +768,7 @@ Future withTempDir(Future fn(String path)) {
/// Extracts a `.tar.gz` file from [stream] to [destination], which can be a
/// directory or a path. Returns whether or not the extraction was successful.
-Future<bool> extractTarGz(Stream<List<int>> stream, destination) {
+Future<bool> extractTarGz(InputStream stream, destination) {
destination = _getPath(destination);
log.fine("Extracting .tar.gz stream to $destination.");
@@ -754,29 +777,27 @@ Future<bool> extractTarGz(Stream<List<int>> stream, destination) {
return _extractTarGzWindows(stream, destination);
}
- return startProcess("tar",
- ["--extract", "--gunzip", "--directory", destination]).then((process) {
- // Ignore errors on process.std{out,err}. They'll be passed to
- // process.exitCode, and we don't want them being top-levelled by
- // std{out,err}Sink.
- store(process.stdout.handleError((_) {}), stdoutSink, closeSink: false);
- store(process.stderr.handleError((_) {}), stderrSink, closeSink: false);
- return Future.wait([
- store(stream, process.stdin),
- process.exitCode
- ]);
- }).then((results) {
- var exitCode = results[1];
- if (exitCode != 0) {
- throw "Failed to extract .tar.gz stream to $destination (exit code "
- "$exitCode).";
- }
+ var completer = new Completer<int>();
+ var processFuture = startProcess("tar",
+ ["--extract", "--gunzip", "--directory", destination]);
+ processFuture.then((process) {
+ process.onExit = (exitCode) => completer.complete(exitCode);
+ stream.pipe(process.stdin);
+ process.stdout.pipe(stdout, close: false);
+ process.stderr.pipe(stderr, close: false);
+ }).catchError((e) {
+ completer.completeError(e.error, e.stackTrace);
+ });
+
+ return completer.future.then((exitCode) {
log.fine("Extracted .tar.gz stream to $destination. Exit code $exitCode.");
+ // TODO(rnystrom): Does anything check this result value? If not, it should
+ // throw on a bad exit code.
+ return exitCode == 0;
});
}
-Future<bool> _extractTarGzWindows(Stream<List<int>> stream,
- String destination) {
+Future<bool> _extractTarGzWindows(InputStream stream, String destination) {
// TODO(rnystrom): In the repo's history, there is an older implementation of
// this that does everything in memory by piping streams directly together
// instead of writing out temp files. The code is simpler, but unfortunately,
@@ -839,8 +860,8 @@ Future<bool> _extractTarGzWindows(Stream<List<int>> stream,
/// Create a .tar.gz archive from a list of entries. Each entry can be a
/// [String], [Directory], or [File] object. The root of the archive is
/// considered to be [baseDir], which defaults to the current working directory.
-/// Returns a [ByteStream] that will emit the contents of the archive.
-ByteStream createTarGz(List contents, {baseDir}) {
+/// Returns an [InputStream] that will emit the contents of the archive.
+InputStream createTarGz(List contents, {baseDir}) {
var buffer = new StringBuffer();
buffer.add('Creating .tag.gz stream containing:\n');
contents.forEach((file) => buffer.add('$file\n'));
@@ -848,7 +869,7 @@ ByteStream createTarGz(List contents, {baseDir}) {
// TODO(nweiz): Propagate errors to the returned stream (including non-zero
// exit codes). See issue 3657.
- var controller = new StreamController<List<int>>();
+ var stream = new ListInputStream();
if (baseDir == null) baseDir = path.current;
baseDir = getFullPath(baseDir);
@@ -867,14 +888,15 @@ ByteStream createTarGz(List contents, {baseDir}) {
// the process choke, so at some point we should save the arguments to a
// file and pass them in via --files-from for tar and -i@filename for 7zip.
startProcess("tar", args).then((process) {
- store(process.stdout, controller);
- }).catchError((e) {
- // We don't have to worry about double-signaling here, since the store()
- // above will only be reached if startProcess succeeds.
- controller.signalError(e.error, e.stackTrace);
- controller.close();
+ pipeInputToInput(process.stdout, stream);
+
+ // Drain and discard 7zip's stderr. 7zip writes its normal output to
+ // stderr. We don't want to show that since it's meaningless.
+ // TODO(rnystrom): Should log this and display it if an actual error
+ // occurs.
+ consumeInputStream(process.stderr);
});
- return new ByteStream(controller.stream);
+ return stream;
}
withTempDir((tempDir) {
@@ -898,20 +920,15 @@ ByteStream createTarGz(List contents, {baseDir}) {
args = ["a", "unused", "-tgzip", "-so", tarFile];
return startProcess(command, args);
}).then((process) {
- // Ignore 7zip's stderr. 7zip writes its normal output to stderr. We don't
- // want to show that since it's meaningless.
- //
- // TODO(rnystrom): Should log the stderr and display it if an actual error
+ // Drain and discard 7zip's stderr. 7zip writes its normal output to
+ // stderr. We don't want to show that since it's meaningless.
+ // TODO(rnystrom): Should log this and display it if an actual error
// occurs.
- store(process.stdout, controller);
+ consumeInputStream(process.stderr);
+ return pipeInputToInput(process.stdout, stream);
});
- }).catchError((e) {
- // We don't have to worry about double-signaling here, since the store()
- // above will only be reached if everything succeeds.
- controller.signalError(e.error, e.stackTrace);
- controller.close();
});
- return new ByteStream(controller.stream);
+ return stream;
}
/// Exception thrown when an operation times out.
« no previous file with comments | « utils/pub/hosted_source.dart ('k') | utils/pub/log.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698