Index: utils/tests/pub/test_pub.dart |
diff --git a/utils/tests/pub/test_pub.dart b/utils/tests/pub/test_pub.dart |
index 87053b9a3f1b4ae0e6e09a2e6c44b570b8d27f5b..339d600b61807fe42b7ecdc330b77cd7e941c753 100644 |
--- a/utils/tests/pub/test_pub.dart |
+++ b/utils/tests/pub/test_pub.dart |
@@ -95,8 +95,7 @@ void serve([List<Descriptor> contents]) { |
return; |
} |
- var future = consumeInputStream(stream); |
- future.then((data) { |
+ stream.toBytes().then((data) { |
response.statusCode = 200; |
response.contentLength = data.length; |
response.outputStream.write(data); |
@@ -765,7 +764,7 @@ abstract class Descriptor { |
/// Loads the file at [path] from within this descriptor. If [path] is empty, |
/// loads the contents of the descriptor itself. |
- InputStream load(List<String> path); |
+ ByteStream load(List<String> path); |
/// Schedules the directory to be created before Pub is run with [runPub]. |
/// The directory will be created relative to the sandbox directory. |
@@ -880,16 +879,13 @@ class FileDescriptor extends Descriptor { |
} |
/// Loads the contents of the file. |
- InputStream load(List<String> path) { |
+ ByteStream load(List<String> path) { |
if (!path.isEmpty) { |
var joinedPath = Strings.join(path, '/'); |
throw "Can't load $joinedPath from within $name: not a directory."; |
} |
- var stream = new ListInputStream(); |
- stream.write(contents.charCodes); |
- stream.markEndOfStream(); |
- return stream; |
+ return new ByteStream.fromBytes(contents.charCodes); |
} |
} |
@@ -941,7 +937,7 @@ class DirectoryDescriptor extends Descriptor { |
} |
/// Loads [path] from within this directory. |
- InputStream load(List<String> path) { |
+ ByteStream load(List<String> path) { |
if (path.isEmpty) { |
throw "Can't load the contents of $name: is a directory."; |
} |
@@ -971,10 +967,10 @@ class FutureDescriptor extends Descriptor { |
Future delete(dir) => _future.then((desc) => desc.delete(dir)); |
- InputStream load(List<String> path) { |
- var resultStream = new ListInputStream(); |
- _future.then((desc) => pipeInputToInput(desc.load(path), resultStream)); |
- return resultStream; |
+ ByteStream load(List<String> path) { |
+ var controller = new StreamController<List<int>>(); |
+ _future.then((desc) => store(desc.load(path), controller)); |
+ return new ByteStream(controller.stream); |
} |
} |
@@ -1074,7 +1070,7 @@ class TarFileDescriptor extends Descriptor { |
tempDir = _tempDir; |
return Future.wait(contents.mappedBy((child) => child.create(tempDir))); |
}).then((createdContents) { |
- return consumeInputStream(createTarGz(createdContents, baseDir: tempDir)); |
+ return createTarGz(createdContents, baseDir: tempDir).toBytes(); |
}).then((bytes) { |
return new File(join(parentDir, _stringName)).writeAsBytes(bytes); |
}).then((file) { |
@@ -1093,13 +1089,13 @@ class TarFileDescriptor extends Descriptor { |
} |
/// Loads the contents of this tar file. |
- InputStream load(List<String> path) { |
+ ByteStream load(List<String> path) { |
if (!path.isEmpty) { |
var joinedPath = Strings.join(path, '/'); |
throw "Can't load $joinedPath from within $name: not a directory."; |
} |
- var sinkStream = new ListInputStream(); |
+ var controller = new StreamController<List<int>>(); |
var tempDir; |
// TODO(rnystrom): Use withTempDir() here. |
// TODO(nweiz): propagate any errors to the return value. See issue 3657. |
@@ -1108,11 +1104,11 @@ class TarFileDescriptor extends Descriptor { |
return create(tempDir); |
}).then((tar) { |
var sourceStream = tar.openInputStream(); |
- return pipeInputToInput(sourceStream, sinkStream).then((_) { |
+ return store(wrapInputStream(sourceStream), controller).then((_) { |
tempDir.delete(recursive: true); |
}); |
}); |
- return sinkStream; |
+ return new ByteStream(controller.stream); |
} |
} |
@@ -1129,7 +1125,7 @@ class NothingDescriptor extends Descriptor { |
}); |
} |
- InputStream load(List<String> path) { |
+ ByteStream load(List<String> path) { |
if (path.isEmpty) { |
throw "Can't load the contents of $name: it doesn't exist."; |
} else { |
@@ -1200,7 +1196,7 @@ class ScheduledProcess { |
final String name; |
/// The process future that's scheduled to run. |
- Future<Process> _processFuture; |
+ Future<PubProcess> _processFuture; |
/// The process that's scheduled to run. It may be null. |
Process _process; |
@@ -1208,13 +1204,35 @@ class ScheduledProcess { |
/// The exit code of the scheduled program. It may be null. |
int _exitCode; |
- /// A [StringInputStream] wrapping the stdout of the process that's scheduled |
- /// to run. |
- final Future<StringInputStream> _stdoutFuture; |
+ /// A future that will complete to a list of all the lines emitted on the |
+ /// process's standard output stream. This is independent of what data is read |
+ /// from [_stdout]. |
+ Future<List<String>> _stdoutLines; |
- /// A [StringInputStream] wrapping the stderr of the process that's scheduled |
- /// to run. |
- final Future<StringInputStream> _stderrFuture; |
+ /// A [Stream] of stdout lines emitted by the process that's scheduled to run. |
+ /// It may be null. |
+ Stream<String> _stdout; |
+ |
+ /// A [Future] that will resolve to [_stdout] once it's available. |
+ Future get _stdoutFuture => _processFuture.then((_) => _stdout); |
+ |
+ /// A [StreamSubscription] that controls [_stdout]. |
+ StreamSubscription _stdoutSubscription; |
+ |
+ /// A future that will complete to a list of all the lines emitted on the |
+ /// process's standard error stream. This is independent of what data is read |
+ /// from [_stderr]. |
+ Future<List<String>> _stderrLines; |
+ |
+ /// A [Stream] of stderr lines emitted by the process that's scheduled to run. |
+ /// It may be null. |
+ Stream<String> _stderr; |
+ |
+ /// A [Future] that will resolve to [_stderr] once it's available. |
+ Future get _stderrFuture => _processFuture.then((_) => _stderr); |
+ |
+ /// A [StreamSubscription] that controls [_stderr]. |
+ StreamSubscription _stderrSubscription; |
/// The exit code of the process that's scheduled to run. This will naturally |
/// only complete once the process has terminated. |
@@ -1231,42 +1249,57 @@ class ScheduledProcess { |
bool _endExpected = false; |
/// Wraps a [Process] [Future] in a scheduled process. |
- ScheduledProcess(this.name, Future<Process> process) |
- : _processFuture = process, |
- _stdoutFuture = process.then((p) => new StringInputStream(p.stdout)), |
- _stderrFuture = process.then((p) => new StringInputStream(p.stderr)) { |
- process.then((p) { |
+ ScheduledProcess(this.name, Future<PubProcess> process) |
+ : _processFuture = process { |
+ var pairFuture = process.then((p) { |
_process = p; |
+ |
+ var stdoutTee = tee(p.stdout.handleError((e) { |
+ registerException(e.error, e.stackTrace); |
+ })); |
+ var stdoutPair = streamWithSubscription(stdoutTee.last); |
+ _stdout = stdoutPair.first; |
+ _stdoutSubscription = stdoutPair.last; |
+ |
+ var stderrTee = tee(p.stderr.handleError((e) { |
+ registerException(e.error, e.stackTrace); |
+ })); |
+ var stderrPair = streamWithSubscription(stderrTee.last); |
+ _stderr = stderrPair.first; |
+ _stderrSubscription = stderrPair.last; |
+ |
+ return new Pair(stdoutTee.first, stderrTee.first); |
}); |
+ _stdoutLines = pairFuture.then((pair) => pair.first.toList()); |
+ _stderrLines = pairFuture.then((pair) => pair.last.toList()); |
+ |
_schedule((_) { |
if (!_endScheduled) { |
throw new StateError("Scheduled process $name must have shouldExit() " |
"or kill() called before the test is run."); |
} |
- return process.then((p) { |
- p.onExit = (c) { |
+ return process.then((p) => p.exitCode).then((exitCode) { |
+ if (_endExpected) { |
+ _exitCode = exitCode; |
+ _exitCodeCompleter.complete(exitCode); |
+ return; |
+ } |
+ |
+ // Sleep for half a second in case _endExpected is set in the next |
+ // scheduled event. |
+ sleep(500).then((_) { |
if (_endExpected) { |
- _exitCode = c; |
- _exitCodeCompleter.complete(c); |
+ _exitCodeCompleter.complete(exitCode); |
return; |
} |
- // Sleep for half a second in case _endExpected is set in the next |
- // scheduled event. |
- sleep(500).then((_) { |
- if (_endExpected) { |
- _exitCodeCompleter.complete(c); |
- return; |
- } |
- |
- _printStreams().then((_) { |
- registerException(new ExpectException("Process $name ended " |
- "earlier than scheduled with exit code $c")); |
- }); |
+ return _printStreams().then((_) { |
+ registerException(new ExpectException("Process $name ended " |
+ "earlier than scheduled with exit code $exitCode")); |
}); |
- }; |
+ }); |
}); |
}); |
@@ -1286,15 +1319,15 @@ class ScheduledProcess { |
if (_process == null) return; |
// Ensure that the process is dead and we aren't waiting on any IO. |
_process.kill(); |
- _process.stdout.close(); |
- _process.stderr.close(); |
+ _stdoutSubscription.cancel(); |
+ _stderrSubscription.cancel(); |
}); |
} |
/// Reads the next line of stdout from the process. |
Future<String> nextLine() { |
return _scheduleValue((_) { |
- return timeout(_stdoutFuture.then((stream) => readLine(stream)), |
+ return timeout(_stdoutFuture.then((stream) => streamFirst(stream)), |
_SCHEDULE_TIMEOUT, |
"waiting for the next stdout line from process $name"); |
}); |
@@ -1303,7 +1336,7 @@ class ScheduledProcess { |
/// Reads the next line of stderr from the process. |
Future<String> nextErrLine() { |
return _scheduleValue((_) { |
- return timeout(_stderrFuture.then((stream) => readLine(stream)), |
+ return timeout(_stderrFuture.then((stream) => streamFirst(stream)), |
_SCHEDULE_TIMEOUT, |
"waiting for the next stderr line from process $name"); |
}); |
@@ -1318,7 +1351,8 @@ class ScheduledProcess { |
} |
return _scheduleValue((_) { |
- return timeout(_stdoutFuture.then(consumeStringInputStream), |
+ return timeout(_stdoutFuture.then((stream) => stream.toList()) |
+ .then((lines) => lines.join("\n")), |
_SCHEDULE_TIMEOUT, |
"waiting for the last stdout line from process $name"); |
}); |
@@ -1333,7 +1367,8 @@ class ScheduledProcess { |
} |
return _scheduleValue((_) { |
- return timeout(_stderrFuture.then(consumeStringInputStream), |
+ return timeout(_stderrFuture.then((stream) => stream.toList()) |
+ .then((lines) => lines.join("\n")), |
_SCHEDULE_TIMEOUT, |
"waiting for the last stderr line from process $name"); |
}); |
@@ -1342,7 +1377,7 @@ class ScheduledProcess { |
/// Writes [line] to the process as stdin. |
void writeLine(String line) { |
_schedule((_) => _processFuture.then( |
- (p) => p.stdin.writeString('$line\n'))); |
+ (p) => p.stdin.write('$line\n'.charCodes))); |
} |
/// Kills the process, and waits until it's dead. |
@@ -1351,7 +1386,7 @@ class ScheduledProcess { |
_schedule((_) { |
_endExpected = true; |
_process.kill(); |
- timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT, |
+ timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT, |
"waiting for process $name to die"); |
}); |
} |
@@ -1362,7 +1397,7 @@ class ScheduledProcess { |
_endScheduled = true; |
_schedule((_) { |
_endExpected = true; |
- return timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT, |
+ return timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT, |
"waiting for process $name to exit").then((exitCode) { |
if (expectedExitCode != null) { |
expect(exitCode, equals(expectedExitCode)); |
@@ -1372,24 +1407,21 @@ class ScheduledProcess { |
} |
/// Prints the remaining data in the process's stdout and stderr streams. |
- /// Prints nothing if the straems are empty. |
+ /// Prints nothing if the streams are empty. |
Future _printStreams() { |
- Future printStream(String streamName, StringInputStream stream) { |
- return consumeStringInputStream(stream).then((output) { |
- if (output.isEmpty) return; |
+ void printStream(String streamName, List<String> lines) { |
+ if (lines.isEmpty) return; |
- print('\nProcess $name $streamName:'); |
- for (var line in output.trim().split("\n")) { |
- print('| $line'); |
- } |
- return; |
- }); |
+ print('\nProcess $name $streamName:'); |
+ for (var line in lines) { |
+ print('| $line'); |
+ } |
} |
- return _stdoutFuture.then((stdout) { |
- return _stderrFuture.then((stderr) { |
- return printStream('stdout', stdout) |
- .then((_) => printStream('stderr', stderr)); |
+ return _stdoutLines.then((stdoutLines) { |
+ printStream('stdout', stdoutLines); |
+ return _stderrLines.then((stderrLines) { |
+ printStream('stderr', stderrLines); |
}); |
}); |
} |