Chromium Code Reviews| Index: utils/pub/io.dart |
| diff --git a/utils/pub/io.dart b/utils/pub/io.dart |
| index 5bb01cdc7c785b1431aad2198996220735c67c35..b0e43b77900f257c74db86cada092908ad231091 100644 |
| --- a/utils/pub/io.dart |
| +++ b/utils/pub/io.dart |
| @@ -188,18 +188,18 @@ Future<Directory> ensureDir(path) { |
| } |
| return ensureDir(dirname(path)).then((_) { |
| - return createDir(path) |
| - .catchError((error) { |
| - if (error is! DirectoryIOException) return false; |
| - // Error 17 means the directory already exists (or 183 on Windows). |
| - if (error.osError.errorCode != 17 && |
| - error.osError.errorCode != 183) { |
| + return createDir(path).catchError((asyncError) { |
| + var error = getRealError(asyncError); |
| + if (error is! DirectoryIOException) throw asyncError; |
| + // Error 17 means the directory already exists (or 183 on Windows). |
| + if (error.osError.errorCode != 17 && |
| + error.osError.errorCode != 183) { |
| log.fine("Got 'already exists' error when creating directory."); |
|
Bob Nystrom
2013/01/09 16:49:23
This log is wrong. This "if" is for when the error
|
| - return false; |
| + throw asyncError; |
| } |
| - return _getDirectory(path); |
| - }); |
| + return _getDirectory(path); |
| + }); |
| }); |
| }); |
| } |
| @@ -562,14 +562,61 @@ Future<String> consumeStringInputStream(StringInputStream stream) { |
| return completer.future; |
| } |
| -/// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] |
| -/// input stream. This is useful for spawned processes which will not exit until |
| -/// their output streams have been drained. |
| -/// TODO(rnystrom): We should use this logic anywhere we spawn a process. |
| -InputStream wrapInputStream(InputStream source) { |
| - var sink = new ListInputStream(); |
| - pipeInputToInput(source, sink); |
| - return sink; |
| +/// Wraps [stream] in a single-subscription [ByteStream] that emits the same |
| +/// data. |
| +Stream<List<int>> wrapInputStream(InputStream stream) { |
|
Bob Nystrom
2013/01/09 16:49:23
Type the return as a ByteStream?
Also, can we hav
|
| + if (stream.closed) return new StreamController.singleSubscription()..close(); |
|
Bob Nystrom
2013/01/09 16:49:23
The duplicate constructor is a bit gross. How abou
|
| + |
| + var controller = new StreamController.singleSubscription(); |
| + stream.onClosed = controller.close; |
| + stream.onData = () => controller.add(stream.read()); |
| + stream.onError = (e) => controller.signalError(new AsyncError(e)); |
| + return controller.stream; |
| +} |
| + |
| +// TODO(nweiz): remove this ASAP |
|
Bob Nystrom
2013/01/09 16:49:23
Tracking bug?
|
| +/// Wraps [stream] in an [InputStream]. |
| +InputStream wrapByteStream(Stream<List<int>> stream) { |
|
Bob Nystrom
2013/01/09 16:49:23
Maybe these method names should be more explicit.
|
| + var inputStream = new ListInputStream(); |
| + stream.listen((chunk) => inputStream.write(chunk), |
| + onDone: inputStream.markEndOfStream); |
| + return inputStream; |
| +} |
| + |
| +/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
| +/// using [Stream.pipe]. |
| +StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
|
Bob Nystrom
2013/01/09 16:49:23
"toStreamConsumer" ?
|
| + new _OutputStreamConsumer(stream); |
| + |
| +/// A [StreamConsumer] that pipes data into an [OutputStream]. |
| +class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
| + final OutputStream _outputStream; |
| + |
| + _OutputStreamConsumer(this._outputStream) |
| + : super(); |
|
Bob Nystrom
2013/01/09 16:49:23
Unnecessary, especially since you aren't extending
|
| + |
| + Future consume(Stream<List<int>> stream) { |
| + // TODO(nweiz): we have to manually keep track of whether or not the |
| + // completer has completed since the output stream could signal an error |
| + // after close() has been called but before it has shut down internally. See |
| + // the following TODO. |
| + var completed = false; |
| + var completer = new Completer(); |
| + stream.listen((data) => _outputStream.write(data), onDone: () { |
| + _outputStream.close(); |
| + // TODO(nweiz): wait until _outputStream.onClosed is called once issue |
| + // 7761 is fixed. |
| + if (!completed) completer.complete(null); |
| + completed = true; |
| + }); |
| + |
| + _outputStream.onError = (e) { |
| + if (!completed) completer.completeError(e); |
|
Bob Nystrom
2013/01/09 16:49:23
Want to do that little hack to grab a stack trace
|
| + completed = true; |
| + }; |
| + |
| + return completer.future; |
| + } |
| } |
| /// Spawns and runs the process located at [executable], passing in [args]. |
| @@ -625,11 +672,21 @@ class _WrappedProcess implements Process { |
| _WrappedProcess(Process process) |
| : _process = process, |
| - stderr = wrapInputStream(process.stderr), |
| - stdout = wrapInputStream(process.stdout); |
| + stderr = _wrapInputStream(process.stderr), |
| + stdout = _wrapInputStream(process.stdout); |
| bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => |
| _process.kill(signal); |
| + |
| + /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] |
| + /// input stream. This is useful for spawned processes which will not exit until |
|
Bob Nystrom
2013/01/09 16:49:23
Long line.
|
| + /// their output streams have been drained. |
| + /// TODO(rnystrom): We should use this logic anywhere we spawn a process. |
| + static InputStream _wrapInputStream(InputStream source) { |
| + var sink = new ListInputStream(); |
| + pipeInputToInput(source, sink); |
| + return sink; |
| + } |
| } |
| /// Calls [fn] with appropriately modified arguments. [fn] should have the same |
| @@ -678,17 +735,15 @@ Future timeout(Future input, int milliseconds, String description) { |
| completer.completeError(new TimeoutException( |
| 'Timed out while $description.')); |
| }); |
| - input |
| - .then((value) { |
| - if (completed) return; |
| - timer.cancel(); |
| - completer.complete(value); |
| - }) |
| - .catchError((e) { |
| - if (completed) return; |
| - timer.cancel(); |
| - completer.completeError(e.error, e.stackTrace); |
| - }); |
| + input.then((value) { |
| + if (completed) return; |
| + timer.cancel(); |
| + completer.complete(value); |
| + }).catchError((e) { |
| + if (completed) return; |
| + timer.cancel(); |
| + completer.completeError(e.error, e.stackTrace); |
| + }); |
| return completer.future; |
| } |
| @@ -697,15 +752,13 @@ Future timeout(Future input, int milliseconds, String description) { |
| /// will be deleted. |
| Future withTempDir(Future fn(String path)) { |
| var tempDir; |
| - var future = createTempDir().then((dir) { |
| + return asyncWhenComplete(createTempDir().then((dir) { |
| tempDir = dir; |
| return fn(tempDir.path); |
| - }); |
| - future.catchError((_) {}).then((_) { |
| + }), () { |
| log.fine('Cleaning up temp directory ${tempDir.path}.'); |
| - deleteDir(tempDir); |
| + return deleteDir(tempDir); |
| }); |
| - return future; |
| } |
| /// Tests whether or not the git command-line app is available for use. |