Index: utils/pub/io.dart |
diff --git a/utils/pub/io.dart b/utils/pub/io.dart |
index 5bb01cdc7c785b1431aad2198996220735c67c35..b0e43b77900f257c74db86cada092908ad231091 100644 |
--- a/utils/pub/io.dart |
+++ b/utils/pub/io.dart |
@@ -188,18 +188,18 @@ Future<Directory> ensureDir(path) { |
} |
return ensureDir(dirname(path)).then((_) { |
- return createDir(path) |
- .catchError((error) { |
- if (error is! DirectoryIOException) return false; |
- // Error 17 means the directory already exists (or 183 on Windows). |
- if (error.osError.errorCode != 17 && |
- error.osError.errorCode != 183) { |
+ return createDir(path).catchError((asyncError) { |
+ var error = getRealError(asyncError); |
+ if (error is! DirectoryIOException) throw asyncError; |
+ // Error 17 means the directory already exists (or 183 on Windows). |
+ if (error.osError.errorCode != 17 && |
+ error.osError.errorCode != 183) { |
log.fine("Got 'already exists' error when creating directory."); |
Bob Nystrom
2013/01/09 16:49:23
This log is wrong. This "if" is for when the error
|
- return false; |
+ throw asyncError; |
} |
- return _getDirectory(path); |
- }); |
+ return _getDirectory(path); |
+ }); |
}); |
}); |
} |
@@ -562,14 +562,61 @@ Future<String> consumeStringInputStream(StringInputStream stream) { |
return completer.future; |
} |
-/// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] |
-/// input stream. This is useful for spawned processes which will not exit until |
-/// their output streams have been drained. |
-/// TODO(rnystrom): We should use this logic anywhere we spawn a process. |
-InputStream wrapInputStream(InputStream source) { |
- var sink = new ListInputStream(); |
- pipeInputToInput(source, sink); |
- return sink; |
+/// Wraps [stream] in a single-subscription [ByteStream] that emits the same |
+/// data. |
+Stream<List<int>> wrapInputStream(InputStream stream) { |
Bob Nystrom
2013/01/09 16:49:23
Type the return as a ByteStream?
Also, can we hav
|
+ if (stream.closed) return new StreamController.singleSubscription()..close(); |
Bob Nystrom
2013/01/09 16:49:23
The duplicate constructor is a bit gross. How abou
|
+ |
+ var controller = new StreamController.singleSubscription(); |
+ stream.onClosed = controller.close; |
+ stream.onData = () => controller.add(stream.read()); |
+ stream.onError = (e) => controller.signalError(new AsyncError(e)); |
+ return controller.stream; |
+} |
+ |
+// TODO(nweiz): remove this ASAP |
Bob Nystrom
2013/01/09 16:49:23
Tracking bug?
|
+/// Wraps [stream] in an [InputStream]. |
+InputStream wrapByteStream(Stream<List<int>> stream) { |
Bob Nystrom
2013/01/09 16:49:23
Maybe these method names should be more explicit.
|
+ var inputStream = new ListInputStream(); |
+ stream.listen((chunk) => inputStream.write(chunk), |
+ onDone: inputStream.markEndOfStream); |
+ return inputStream; |
+} |
+ |
+/// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
+/// using [Stream.pipe]. |
+StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
Bob Nystrom
2013/01/09 16:49:23
"toStreamConsumer" ?
|
+ new _OutputStreamConsumer(stream); |
+ |
+/// A [StreamConsumer] that pipes data into an [OutputStream]. |
+class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
+ final OutputStream _outputStream; |
+ |
+ _OutputStreamConsumer(this._outputStream) |
+ : super(); |
Bob Nystrom
2013/01/09 16:49:23
Unnecessary, especially since you aren't extending
|
+ |
+ Future consume(Stream<List<int>> stream) { |
+ // TODO(nweiz): we have to manually keep track of whether or not the |
+ // completer has completed since the output stream could signal an error |
+ // after close() has been called but before it has shut down internally. See |
+ // the following TODO. |
+ var completed = false; |
+ var completer = new Completer(); |
+ stream.listen((data) => _outputStream.write(data), onDone: () { |
+ _outputStream.close(); |
+ // TODO(nweiz): wait until _outputStream.onClosed is called once issue |
+ // 7761 is fixed. |
+ if (!completed) completer.complete(null); |
+ completed = true; |
+ }); |
+ |
+ _outputStream.onError = (e) { |
+ if (!completed) completer.completeError(e); |
Bob Nystrom
2013/01/09 16:49:23
Want to do that little hack to grab a stack trace
|
+ completed = true; |
+ }; |
+ |
+ return completer.future; |
+ } |
} |
/// Spawns and runs the process located at [executable], passing in [args]. |
@@ -625,11 +672,21 @@ class _WrappedProcess implements Process { |
_WrappedProcess(Process process) |
: _process = process, |
- stderr = wrapInputStream(process.stderr), |
- stdout = wrapInputStream(process.stdout); |
+ stderr = _wrapInputStream(process.stderr), |
+ stdout = _wrapInputStream(process.stdout); |
bool kill([ProcessSignal signal = ProcessSignal.SIGTERM]) => |
_process.kill(signal); |
+ |
+ /// Wrap an InputStream in a ListInputStream. This eagerly drains the [source] |
+ /// input stream. This is useful for spawned processes which will not exit until |
Bob Nystrom
2013/01/09 16:49:23
Long line.
|
+ /// their output streams have been drained. |
+ /// TODO(rnystrom): We should use this logic anywhere we spawn a process. |
+ static InputStream _wrapInputStream(InputStream source) { |
+ var sink = new ListInputStream(); |
+ pipeInputToInput(source, sink); |
+ return sink; |
+ } |
} |
/// Calls [fn] with appropriately modified arguments. [fn] should have the same |
@@ -678,17 +735,15 @@ Future timeout(Future input, int milliseconds, String description) { |
completer.completeError(new TimeoutException( |
'Timed out while $description.')); |
}); |
- input |
- .then((value) { |
- if (completed) return; |
- timer.cancel(); |
- completer.complete(value); |
- }) |
- .catchError((e) { |
- if (completed) return; |
- timer.cancel(); |
- completer.completeError(e.error, e.stackTrace); |
- }); |
+ input.then((value) { |
+ if (completed) return; |
+ timer.cancel(); |
+ completer.complete(value); |
+ }).catchError((e) { |
+ if (completed) return; |
+ timer.cancel(); |
+ completer.completeError(e.error, e.stackTrace); |
+ }); |
return completer.future; |
} |
@@ -697,15 +752,13 @@ Future timeout(Future input, int milliseconds, String description) { |
/// will be deleted. |
Future withTempDir(Future fn(String path)) { |
var tempDir; |
- var future = createTempDir().then((dir) { |
+ return asyncWhenComplete(createTempDir().then((dir) { |
tempDir = dir; |
return fn(tempDir.path); |
- }); |
- future.catchError((_) {}).then((_) { |
+ }), () { |
log.fine('Cleaning up temp directory ${tempDir.path}.'); |
- deleteDir(tempDir); |
+ return deleteDir(tempDir); |
}); |
- return future; |
} |
/// Tests whether or not the git command-line app is available for use. |