Index: utils/pub/io.dart |
diff --git a/utils/pub/io.dart b/utils/pub/io.dart |
index a031da21f13bd6d769c30a9fdbc7338d8721a278..2a4be97eabf96a981a178fd83de82c85055ec45e 100644 |
--- a/utils/pub/io.dart |
+++ b/utils/pub/io.dart |
@@ -81,8 +81,7 @@ String writeBinaryFile(String file, List<int> contents) { |
Future<String> createFileFromStream(Stream<List<int>> stream, String file) { |
log.io("Creating $file from stream."); |
- var outputStream = new File(file).openOutputStream(); |
- return stream.pipe(wrapOutputStream(outputStream)).then((_) { |
+ return stream.pipe(new File(file).openWrite()).then((_) { |
log.fine("Created $file from stream."); |
return file; |
}); |
@@ -158,41 +157,38 @@ Future<List<String>> listDir(String dir, |
log.io("Listing directory $dir."); |
var lister = new Directory(dir).list(); |
- lister.onDone = (done) { |
- // TODO(rnystrom): May need to sort here if it turns out onDir and onFile |
- // aren't guaranteed to be called in a certain order. So far, they seem to. |
- if (done) { |
- log.fine("Listed directory $dir:\n${contents.join('\n')}"); |
- completer.complete(contents); |
- } |
- }; |
- |
- // TODO(nweiz): remove this when issue 4061 is fixed. |
- var stackTrace; |
- try { |
- throw ""; |
- } catch (_, localStackTrace) { |
- stackTrace = localStackTrace; |
- } |
- |
var children = []; |
- lister.onError = (error) => completer.completeError(error, stackTrace); |
- lister.onDir = (file) { |
- if (!includeHiddenFiles && path.basename(file).startsWith('.')) return; |
- file = path.join(dir, path.basename(file)); |
- contents.add(file); |
- // TODO(nweiz): don't manually recurse once issue 7358 is fixed. Note that |
- // once we remove the manual recursion, we'll need to explicitly filter |
- // out files in hidden directories. |
- if (recursive) { |
- children.add(doList(file, listedDirectories)); |
- } |
- }; |
- |
- lister.onFile = (file) { |
- if (!includeHiddenFiles && path.basename(file).startsWith('.')) return; |
- contents.add(path.join(dir, path.basename(file))); |
- }; |
+ lister.listen( |
+ (entity) { |
+ if (entity is File) { |
+ var file = entity.name; |
+ if (!includeHiddenFiles && path.basename(file).startsWith('.')) { |
+ return; |
+ } |
+ contents.add(path.join(dir, path.basename(file))); |
+ } else if (entity is Directory) { |
+ var file = entity.path; |
+ if (!includeHiddenFiles && path.basename(file).startsWith('.')) { |
+ return; |
+ } |
+ file = path.join(dir, path.basename(file)); |
+ contents.add(file); |
+ // TODO(nweiz): don't manually recurse once issue 7358 is fixed. |
+ // Note that once we remove the manual recursion, we'll need to |
+ // explicitly filter out files in hidden directories. |
+ if (recursive) { |
+ children.add(doList(file, listedDirectories)); |
+ } |
+ } |
+ }, |
+ onDone: () { |
+ // TODO(rnystrom): May need to sort here if it turns out |
+ // onDir and onFile aren't guaranteed to be called in a |
+ // certain order. So far, they seem to. |
+ log.fine("Listed directory $dir:\n${contents.join('\n')}"); |
+ completer.complete(contents); |
+ }, |
+ onError: (error) => completer.completeError(error, stackTrace)); |
return completer.future.then((contents) { |
return Future.wait(children).then((childContents) { |
@@ -364,8 +360,8 @@ final StreamSink<List<int>> stderrSink = _wrapStdio(stderr, "stderr"); |
/// Wrap the standard output or error [stream] in a [StreamSink]. Any errors are |
/// logged, and then the program is terminated. [name] is used for debugging. |
-StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { |
- var pair = consumerToSink(wrapOutputStream(stream)); |
+StreamSink<List<int>> _wrapStdio(IOSink sink, String name) { |
+ var pair = consumerToSink(sink); |
pair.last.catchError((e) { |
// This log may or may not work, depending on how the stream failed. Not |
// much we can do about that. |
@@ -376,8 +372,8 @@ StreamSink<List<int>> _wrapStdio(OutputStream stream, String name) { |
} |
/// A line-by-line stream of standard input. |
-final Stream<String> stdinLines = |
- streamToLines(wrapInputStream(stdin).toStringStream()); |
+final Stream<String> stdinLines = streamToLines( |
+ new ByteStream(stdin).toStringStream()); |
/// Displays a message and reads a yes/no confirmation from the user. Returns |
/// a [Future] that completes to `true` if the user confirms or `false` if they |
@@ -392,82 +388,10 @@ Future<bool> confirm(String message) { |
.then((line) => new RegExp(r"^[yY]").hasMatch(line)); |
} |
-/// Reads and discards all output from [inputStream]. Returns a [Future] that |
+/// Reads and discards all output from [stream]. Returns a [Future] that |
/// completes when the stream is closed. |
-Future drainInputStream(InputStream inputStream) { |
- var completer = new Completer(); |
- if (inputStream.closed) { |
- completer.complete(); |
- return completer.future; |
- } |
- |
- inputStream.onClosed = () => completer.complete(); |
- inputStream.onData = inputStream.read; |
- inputStream.onError = (error) => completer.completeError(error); |
- return completer.future; |
-} |
- |
-/// Wraps [stream] in a single-subscription [Stream] that emits the same data. |
-ByteStream wrapInputStream(InputStream stream) { |
- var controller = new StreamController(); |
- if (stream.closed) { |
- controller.close(); |
- return new ByteStream(controller.stream); |
- } |
- |
- stream.onClosed = controller.close; |
- stream.onData = () => controller.add(stream.read()); |
- stream.onError = (e) => controller.signalError(new AsyncError(e)); |
- return new ByteStream(controller.stream); |
-} |
- |
-/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
-/// using [Stream.pipe]. Errors piped to the returned [StreamConsumer] will be |
-/// forwarded to the [Future] returned by [Stream.pipe]. |
-StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
- new _OutputStreamConsumer(stream); |
- |
-/// A [StreamConsumer] that pipes data into an [OutputStream]. |
-class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
- final OutputStream _outputStream; |
- |
- _OutputStreamConsumer(this._outputStream); |
- |
- Future consume(Stream<List<int>> stream) { |
- // TODO(nweiz): we have to manually keep track of whether or not the |
- // completer has completed since the output stream could signal an error |
- // after close() has been called but before it has shut down internally. See |
- // the following TODO. |
- var completed = false; |
- var completer = new Completer(); |
- stream.listen((data) { |
- // Writing empty data to a closed stream can cause errors. |
- if (data.isEmpty) return; |
- |
- // TODO(nweiz): remove this try/catch when issue 7836 is fixed. |
- try { |
- _outputStream.write(data); |
- } catch (e, stack) { |
- if (!completed) completer.completeError(e, stack); |
- completed = true; |
- } |
- }, onError: (e) { |
- if (!completed) completer.completeError(e.error, e.stackTrace); |
- completed = true; |
- }, onDone: () => _outputStream.close()); |
- |
- _outputStream.onError = (e) { |
- if (!completed) completer.completeError(e); |
- completed = true; |
- }; |
- |
- _outputStream.onClosed = () { |
- if (!completed) completer.complete(); |
- completed = true; |
- }; |
- |
- return completer.future; |
- } |
+Future drainStream(Stream stream) { |
+ return stream.reduce(null, (x, y) {}); |
} |
/// Returns a [StreamSink] that pipes all data to [consumer] and a [Future] that |
@@ -609,18 +533,18 @@ class PubProcess { |
: _process = process { |
var errorGroup = new ErrorGroup(); |
- var pair = consumerToSink(wrapOutputStream(process.stdin)); |
+ var pair = consumerToSink(process.stdin); |
_stdin = pair.first; |
_stdinClosed = errorGroup.registerFuture(pair.last); |
_stdout = new ByteStream( |
- errorGroup.registerStream(wrapInputStream(process.stdout))); |
+ errorGroup.registerStream(process.stdout)); |
_stderr = new ByteStream( |
- errorGroup.registerStream(wrapInputStream(process.stderr))); |
+ errorGroup.registerStream(process.stderr)); |
var exitCodeCompleter = new Completer(); |
_exitCode = errorGroup.registerFuture(exitCodeCompleter.future); |
- _process.onExit = (code) => exitCodeCompleter.complete(code); |
+ _process.exitCode.then((code) => exitCodeCompleter.complete(code)); |
} |
/// Sends [signal] to the underlying process. |
@@ -796,8 +720,6 @@ ByteStream createTarGz(List contents, {baseDir}) { |
contents.forEach((file) => buffer.add('$file\n')); |
log.fine(buffer.toString()); |
- // TODO(nweiz): Propagate errors to the returned stream (including non-zero |
- // exit codes). See issue 3657. |
var controller = new StreamController<List<int>>(); |
if (baseDir == null) baseDir = path.current; |