Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(826)

Unified Diff: utils/pub/io.dart

Issue 12316036: Merge IO v2 branch to bleeding edge (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Rebased to r18818 Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « utils/pub/http.dart ('k') | utils/pub/oauth2.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/pub/io.dart
diff --git a/utils/pub/io.dart b/utils/pub/io.dart
index a031da21f13bd6d769c30a9fdbc7338d8721a278..2a4be97eabf96a981a178fd83de82c85055ec45e 100644
--- a/utils/pub/io.dart
+++ b/utils/pub/io.dart
@@ -81,8 +81,7 @@ String writeBinaryFile(String file, List<int> contents) {
Future<String> createFileFromStream(Stream<List<int>> stream, String file) {
log.io("Creating $file from stream.");
- var outputStream = new File(file).openOutputStream();
- return stream.pipe(wrapOutputStream(outputStream)).then((_) {
+ return stream.pipe(new File(file).openWrite()).then((_) {
log.fine("Created $file from stream.");
return file;
});
@@ -158,41 +157,38 @@ Future<List<String>> listDir(String dir,
log.io("Listing directory $dir.");
var lister = new Directory(dir).list();
- lister.onDone = (done) {
- // TODO(rnystrom): May need to sort here if it turns out onDir and onFile
- // aren't guaranteed to be called in a certain order. So far, they seem to.
- if (done) {
- log.fine("Listed directory $dir:\n${contents.join('\n')}");
- completer.complete(contents);
- }
- };
-
- // TODO(nweiz): remove this when issue 4061 is fixed.
- var stackTrace;
- try {
- throw "";
- } catch (_, localStackTrace) {
- stackTrace = localStackTrace;
- }
-
var children = [];
- lister.onError = (error) => completer.completeError(error, stackTrace);
- lister.onDir = (file) {
- if (!includeHiddenFiles && path.basename(file).startsWith('.')) return;
- file = path.join(dir, path.basename(file));
- contents.add(file);
- // TODO(nweiz): don't manually recurse once issue 7358 is fixed. Note that
- // once we remove the manual recursion, we'll need to explicitly filter
- // out files in hidden directories.
- if (recursive) {
- children.add(doList(file, listedDirectories));
- }
- };
-
- lister.onFile = (file) {
- if (!includeHiddenFiles && path.basename(file).startsWith('.')) return;
- contents.add(path.join(dir, path.basename(file)));
- };
+ lister.listen(
+ (entity) {
+ if (entity is File) {
+ var file = entity.name;
+ if (!includeHiddenFiles && path.basename(file).startsWith('.')) {
+ return;
+ }
+ contents.add(path.join(dir, path.basename(file)));
+ } else if (entity is Directory) {
+ var file = entity.path;
+ if (!includeHiddenFiles && path.basename(file).startsWith('.')) {
+ return;
+ }
+ file = path.join(dir, path.basename(file));
+ contents.add(file);
+ // TODO(nweiz): don't manually recurse once issue 7358 is fixed.
+ // Note that once we remove the manual recursion, we'll need to
+ // explicitly filter out files in hidden directories.
+ if (recursive) {
+ children.add(doList(file, listedDirectories));
+ }
+ }
+ },
+ onDone: () {
+ // TODO(rnystrom): May need to sort here if it turns out
+ // onDir and onFile aren't guaranteed to be called in a
+ // certain order. So far, they seem to.
+ log.fine("Listed directory $dir:\n${contents.join('\n')}");
+ completer.complete(contents);
+ },
+ onError: (error) => completer.completeError(error, stackTrace));
return completer.future.then((contents) {
return Future.wait(children).then((childContents) {
@@ -364,8 +360,8 @@ final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr");
/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are
/// logged, and then the program is terminated. [name] is used for debugging.
-StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) {
- var pair = consumerToSink(wrapOutputStream(stream));
+StreamSink<List<int>> _wrapStdio(IOSink sink, String name) {
+ var pair = consumerToSink(sink);
pair.last.catchError((e) {
// This log may or may not work, depending on how the stream failed. Not
// much we can do about that.
@@ -376,8 +372,8 @@ StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) {
}
/// A line-by-line stream of standard input.
-final Stream<String> stdinLines =
- streamToLines(wrapInputStream(stdin).toStringStream());
+final Stream<String> stdinLines = streamToLines(
+ new ByteStream(stdin).toStringStream());
/// Displays a message and reads a yes/no confirmation from the user. Returns
/// a [Future] that completes to `true` if the user confirms or `false` if they
@@ -392,82 +388,10 @@ Future<bool> confirm(String message) {
.then((line) => new RegExp(r"^[yY]").hasMatch(line));
}
-/// Reads and discards all output from [inputStream]. Returns a [Future] that
+/// Reads and discards all output from [stream]. Returns a [Future] that
/// completes when the stream is closed.
-Future drainInputStream(InputStream inputStream) {
- var completer = new Completer();
- if (inputStream.closed) {
- completer.complete();
- return completer.future;
- }
-
- inputStream.onClosed = () => completer.complete();
- inputStream.onData = inputStream.read;
- inputStream.onError = (error) => completer.completeError(error);
- return completer.future;
-}
-
-/// Wraps [stream] in a single-subscription [Stream] that emits the same data.
-ByteStream wrapInputStream(InputStream stream) {
- var controller = new StreamController();
- if (stream.closed) {
- controller.close();
- return new ByteStream(controller.stream);
- }
-
- stream.onClosed = controller.close;
- stream.onData = () => controller.add(stream.read());
- stream.onError = (e) => controller.signalError(new AsyncError(e));
- return new ByteStream(controller.stream);
-}
-
-/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it
-/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be
-/// forwarded to the [Future] returned by [Stream.pipe].
-StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) =>
- new _OutputStreamConsumer(stream);
-
-/// A [StreamConsumer] that pipes data into an [OutputStream].
-class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> {
- final OutputStream _outputStream;
-
- _OutputStreamConsumer(this._outputStream);
-
- Future consume(Stream<List<int>> stream) {
- // TODO(nweiz): we have to manually keep track of whether or not the
- // completer has completed since the output stream could signal an error
- // after close() has been called but before it has shut down internally. See
- // the following TODO.
- var completed = false;
- var completer = new Completer();
- stream.listen((data) {
- // Writing empty data to a closed stream can cause errors.
- if (data.isEmpty) return;
-
- // TODO(nweiz): remove this try/catch when issue 7836 is fixed.
- try {
- _outputStream.write(data);
- } catch (e, stack) {
- if (!completed) completer.completeError(e, stack);
- completed = true;
- }
- }, onError: (e) {
- if (!completed) completer.completeError(e.error, e.stackTrace);
- completed = true;
- }, onDone: () => _outputStream.close());
-
- _outputStream.onError = (e) {
- if (!completed) completer.completeError(e);
- completed = true;
- };
-
- _outputStream.onClosed = () {
- if (!completed) completer.complete();
- completed = true;
- };
-
- return completer.future;
- }
+Future drainStream(Stream stream) {
+ return stream.reduce(null, (x, y) {});
}
/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that
@@ -609,18 +533,18 @@ class PubProcess {
: _process = process {
var errorGroup = new ErrorGroup();
- var pair = consumerToSink(wrapOutputStream(process.stdin));
+ var pair = consumerToSink(process.stdin);
_stdin = pair.first;
_stdinClosed = errorGroup.registerFuture(pair.last);
_stdout = new ByteStream(
- errorGroup.registerStream(wrapInputStream(process.stdout)));
+ errorGroup.registerStream(process.stdout));
_stderr = new ByteStream(
- errorGroup.registerStream(wrapInputStream(process.stderr)));
+ errorGroup.registerStream(process.stderr));
var exitCodeCompleter = new Completer();
_exitCode = errorGroup.registerFuture(exitCodeCompleter.future);
- _process.onExit = (code) => exitCodeCompleter.complete(code);
+ _process.exitCode.then((code) => exitCodeCompleter.complete(code));
}
/// Sends [signal] to the underlying process.
@@ -796,8 +720,6 @@ ByteStream createTarGz(List contents, {baseDir}) {
contents.forEach((file) => buffer.add('$file\n'));
log.fine(buffer.toString());
- // TODO(nweiz): Propagate errors to the returned stream (including non-zero
- // exit codes). See issue 3657.
var controller = new StreamController<List<int>>();
if (baseDir == null) baseDir = path.current;
« no previous file with comments | « utils/pub/http.dart ('k') | utils/pub/oauth2.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698