| Index: utils/tests/pub/test_pub.dart
|
| diff --git a/utils/tests/pub/test_pub.dart b/utils/tests/pub/test_pub.dart
|
| index 87053b9a3f1b4ae0e6e09a2e6c44b570b8d27f5b..339d600b61807fe42b7ecdc330b77cd7e941c753 100644
|
| --- a/utils/tests/pub/test_pub.dart
|
| +++ b/utils/tests/pub/test_pub.dart
|
| @@ -95,8 +95,7 @@ void serve([List<Descriptor> contents]) {
|
| return;
|
| }
|
|
|
| - var future = consumeInputStream(stream);
|
| - future.then((data) {
|
| + stream.toBytes().then((data) {
|
| response.statusCode = 200;
|
| response.contentLength = data.length;
|
| response.outputStream.write(data);
|
| @@ -765,7 +764,7 @@ abstract class Descriptor {
|
|
|
| /// Loads the file at [path] from within this descriptor. If [path] is empty,
|
| /// loads the contents of the descriptor itself.
|
| - InputStream load(List<String> path);
|
| + ByteStream load(List<String> path);
|
|
|
| /// Schedules the directory to be created before Pub is run with [runPub].
|
| /// The directory will be created relative to the sandbox directory.
|
| @@ -880,16 +879,13 @@ class FileDescriptor extends Descriptor {
|
| }
|
|
|
| /// Loads the contents of the file.
|
| - InputStream load(List<String> path) {
|
| + ByteStream load(List<String> path) {
|
| if (!path.isEmpty) {
|
| var joinedPath = Strings.join(path, '/');
|
| throw "Can't load $joinedPath from within $name: not a directory.";
|
| }
|
|
|
| - var stream = new ListInputStream();
|
| - stream.write(contents.charCodes);
|
| - stream.markEndOfStream();
|
| - return stream;
|
| + return new ByteStream.fromBytes(contents.charCodes);
|
| }
|
| }
|
|
|
| @@ -941,7 +937,7 @@ class DirectoryDescriptor extends Descriptor {
|
| }
|
|
|
| /// Loads [path] from within this directory.
|
| - InputStream load(List<String> path) {
|
| + ByteStream load(List<String> path) {
|
| if (path.isEmpty) {
|
| throw "Can't load the contents of $name: is a directory.";
|
| }
|
| @@ -971,10 +967,10 @@ class FutureDescriptor extends Descriptor {
|
|
|
| Future delete(dir) => _future.then((desc) => desc.delete(dir));
|
|
|
| - InputStream load(List<String> path) {
|
| - var resultStream = new ListInputStream();
|
| - _future.then((desc) => pipeInputToInput(desc.load(path), resultStream));
|
| - return resultStream;
|
| + ByteStream load(List<String> path) {
|
| + var controller = new StreamController<List<int>>();
|
| + _future.then((desc) => store(desc.load(path), controller));
|
| + return new ByteStream(controller.stream);
|
| }
|
| }
|
|
|
| @@ -1074,7 +1070,7 @@ class TarFileDescriptor extends Descriptor {
|
| tempDir = _tempDir;
|
| return Future.wait(contents.mappedBy((child) => child.create(tempDir)));
|
| }).then((createdContents) {
|
| - return consumeInputStream(createTarGz(createdContents, baseDir: tempDir));
|
| + return createTarGz(createdContents, baseDir: tempDir).toBytes();
|
| }).then((bytes) {
|
| return new File(join(parentDir, _stringName)).writeAsBytes(bytes);
|
| }).then((file) {
|
| @@ -1093,13 +1089,13 @@ class TarFileDescriptor extends Descriptor {
|
| }
|
|
|
| /// Loads the contents of this tar file.
|
| - InputStream load(List<String> path) {
|
| + ByteStream load(List<String> path) {
|
| if (!path.isEmpty) {
|
| var joinedPath = Strings.join(path, '/');
|
| throw "Can't load $joinedPath from within $name: not a directory.";
|
| }
|
|
|
| - var sinkStream = new ListInputStream();
|
| + var controller = new StreamController<List<int>>();
|
| var tempDir;
|
| // TODO(rnystrom): Use withTempDir() here.
|
| // TODO(nweiz): propagate any errors to the return value. See issue 3657.
|
| @@ -1108,11 +1104,11 @@ class TarFileDescriptor extends Descriptor {
|
| return create(tempDir);
|
| }).then((tar) {
|
| var sourceStream = tar.openInputStream();
|
| - return pipeInputToInput(sourceStream, sinkStream).then((_) {
|
| + return store(wrapInputStream(sourceStream), controller).then((_) {
|
| tempDir.delete(recursive: true);
|
| });
|
| });
|
| - return sinkStream;
|
| + return new ByteStream(controller.stream);
|
| }
|
| }
|
|
|
| @@ -1129,7 +1125,7 @@ class NothingDescriptor extends Descriptor {
|
| });
|
| }
|
|
|
| - InputStream load(List<String> path) {
|
| + ByteStream load(List<String> path) {
|
| if (path.isEmpty) {
|
| throw "Can't load the contents of $name: it doesn't exist.";
|
| } else {
|
| @@ -1200,7 +1196,7 @@ class ScheduledProcess {
|
| final String name;
|
|
|
| /// The process future that's scheduled to run.
|
| - Future<Process> _processFuture;
|
| + Future<PubProcess> _processFuture;
|
|
|
| /// The process that's scheduled to run. It may be null.
|
| Process _process;
|
| @@ -1208,13 +1204,35 @@ class ScheduledProcess {
|
| /// The exit code of the scheduled program. It may be null.
|
| int _exitCode;
|
|
|
| - /// A [StringInputStream] wrapping the stdout of the process that's scheduled
|
| - /// to run.
|
| - final Future<StringInputStream> _stdoutFuture;
|
| + /// A future that will complete to a list of all the lines emitted on the
|
| + /// process's standard output stream. This is independent of what data is read
|
| + /// from [_stdout].
|
| + Future<List<String>> _stdoutLines;
|
|
|
| - /// A [StringInputStream] wrapping the stderr of the process that's scheduled
|
| - /// to run.
|
| - final Future<StringInputStream> _stderrFuture;
|
| + /// A [Stream] of stdout lines emitted by the process that's scheduled to run.
|
| + /// It may be null.
|
| + Stream<String> _stdout;
|
| +
|
| + /// A [Future] that will resolve to [_stdout] once it's available.
|
| + Future get _stdoutFuture => _processFuture.then((_) => _stdout);
|
| +
|
| + /// A [StreamSubscription] that controls [_stdout].
|
| + StreamSubscription _stdoutSubscription;
|
| +
|
| + /// A future that will complete to a list of all the lines emitted on the
|
| + /// process's standard error stream. This is independent of what data is read
|
| + /// from [_stderr].
|
| + Future<List<String>> _stderrLines;
|
| +
|
| + /// A [Stream] of stderr lines emitted by the process that's scheduled to run.
|
| + /// It may be null.
|
| + Stream<String> _stderr;
|
| +
|
| + /// A [Future] that will resolve to [_stderr] once it's available.
|
| + Future get _stderrFuture => _processFuture.then((_) => _stderr);
|
| +
|
| + /// A [StreamSubscription] that controls [_stderr].
|
| + StreamSubscription _stderrSubscription;
|
|
|
| /// The exit code of the process that's scheduled to run. This will naturally
|
| /// only complete once the process has terminated.
|
| @@ -1231,42 +1249,57 @@ class ScheduledProcess {
|
| bool _endExpected = false;
|
|
|
| /// Wraps a [Process] [Future] in a scheduled process.
|
| - ScheduledProcess(this.name, Future<Process> process)
|
| - : _processFuture = process,
|
| - _stdoutFuture = process.then((p) => new StringInputStream(p.stdout)),
|
| - _stderrFuture = process.then((p) => new StringInputStream(p.stderr)) {
|
| - process.then((p) {
|
| + ScheduledProcess(this.name, Future<PubProcess> process)
|
| + : _processFuture = process {
|
| + var pairFuture = process.then((p) {
|
| _process = p;
|
| +
|
| + var stdoutTee = tee(p.stdout.handleError((e) {
|
| + registerException(e.error, e.stackTrace);
|
| + }));
|
| + var stdoutPair = streamWithSubscription(stdoutTee.last);
|
| + _stdout = stdoutPair.first;
|
| + _stdoutSubscription = stdoutPair.last;
|
| +
|
| + var stderrTee = tee(p.stderr.handleError((e) {
|
| + registerException(e.error, e.stackTrace);
|
| + }));
|
| + var stderrPair = streamWithSubscription(stderrTee.last);
|
| + _stderr = stderrPair.first;
|
| + _stderrSubscription = stderrPair.last;
|
| +
|
| + return new Pair(stdoutTee.first, stderrTee.first);
|
| });
|
|
|
| + _stdoutLines = pairFuture.then((pair) => pair.first.toList());
|
| + _stderrLines = pairFuture.then((pair) => pair.last.toList());
|
| +
|
| _schedule((_) {
|
| if (!_endScheduled) {
|
| throw new StateError("Scheduled process $name must have shouldExit() "
|
| "or kill() called before the test is run.");
|
| }
|
|
|
| - return process.then((p) {
|
| - p.onExit = (c) {
|
| + return process.then((p) => p.exitCode).then((exitCode) {
|
| + if (_endExpected) {
|
| + _exitCode = exitCode;
|
| + _exitCodeCompleter.complete(exitCode);
|
| + return;
|
| + }
|
| +
|
| + // Sleep for half a second in case _endExpected is set in the next
|
| + // scheduled event.
|
| + sleep(500).then((_) {
|
| if (_endExpected) {
|
| - _exitCode = c;
|
| - _exitCodeCompleter.complete(c);
|
| + _exitCodeCompleter.complete(exitCode);
|
| return;
|
| }
|
|
|
| - // Sleep for half a second in case _endExpected is set in the next
|
| - // scheduled event.
|
| - sleep(500).then((_) {
|
| - if (_endExpected) {
|
| - _exitCodeCompleter.complete(c);
|
| - return;
|
| - }
|
| -
|
| - _printStreams().then((_) {
|
| - registerException(new ExpectException("Process $name ended "
|
| - "earlier than scheduled with exit code $c"));
|
| - });
|
| + return _printStreams().then((_) {
|
| + registerException(new ExpectException("Process $name ended "
|
| + "earlier than scheduled with exit code $exitCode"));
|
| });
|
| - };
|
| + });
|
| });
|
| });
|
|
|
| @@ -1286,15 +1319,15 @@ class ScheduledProcess {
|
| if (_process == null) return;
|
| // Ensure that the process is dead and we aren't waiting on any IO.
|
| _process.kill();
|
| - _process.stdout.close();
|
| - _process.stderr.close();
|
| + _stdoutSubscription.cancel();
|
| + _stderrSubscription.cancel();
|
| });
|
| }
|
|
|
| /// Reads the next line of stdout from the process.
|
| Future<String> nextLine() {
|
| return _scheduleValue((_) {
|
| - return timeout(_stdoutFuture.then((stream) => readLine(stream)),
|
| + return timeout(_stdoutFuture.then((stream) => streamFirst(stream)),
|
| _SCHEDULE_TIMEOUT,
|
| "waiting for the next stdout line from process $name");
|
| });
|
| @@ -1303,7 +1336,7 @@ class ScheduledProcess {
|
| /// Reads the next line of stderr from the process.
|
| Future<String> nextErrLine() {
|
| return _scheduleValue((_) {
|
| - return timeout(_stderrFuture.then((stream) => readLine(stream)),
|
| + return timeout(_stderrFuture.then((stream) => streamFirst(stream)),
|
| _SCHEDULE_TIMEOUT,
|
| "waiting for the next stderr line from process $name");
|
| });
|
| @@ -1318,7 +1351,8 @@ class ScheduledProcess {
|
| }
|
|
|
| return _scheduleValue((_) {
|
| - return timeout(_stdoutFuture.then(consumeStringInputStream),
|
| + return timeout(_stdoutFuture.then((stream) => stream.toList())
|
| + .then((lines) => lines.join("\n")),
|
| _SCHEDULE_TIMEOUT,
|
| "waiting for the last stdout line from process $name");
|
| });
|
| @@ -1333,7 +1367,8 @@ class ScheduledProcess {
|
| }
|
|
|
| return _scheduleValue((_) {
|
| - return timeout(_stderrFuture.then(consumeStringInputStream),
|
| + return timeout(_stderrFuture.then((stream) => stream.toList())
|
| + .then((lines) => lines.join("\n")),
|
| _SCHEDULE_TIMEOUT,
|
| "waiting for the last stderr line from process $name");
|
| });
|
| @@ -1342,7 +1377,7 @@ class ScheduledProcess {
|
| /// Writes [line] to the process as stdin.
|
| void writeLine(String line) {
|
| _schedule((_) => _processFuture.then(
|
| - (p) => p.stdin.writeString('$line\n')));
|
| + (p) => p.stdin.write('$line\n'.charCodes)));
|
| }
|
|
|
| /// Kills the process, and waits until it's dead.
|
| @@ -1351,7 +1386,7 @@ class ScheduledProcess {
|
| _schedule((_) {
|
| _endExpected = true;
|
| _process.kill();
|
| - timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT,
|
| + timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT,
|
| "waiting for process $name to die");
|
| });
|
| }
|
| @@ -1362,7 +1397,7 @@ class ScheduledProcess {
|
| _endScheduled = true;
|
| _schedule((_) {
|
| _endExpected = true;
|
| - return timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT,
|
| + return timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT,
|
| "waiting for process $name to exit").then((exitCode) {
|
| if (expectedExitCode != null) {
|
| expect(exitCode, equals(expectedExitCode));
|
| @@ -1372,24 +1407,21 @@ class ScheduledProcess {
|
| }
|
|
|
| /// Prints the remaining data in the process's stdout and stderr streams.
|
| - /// Prints nothing if the straems are empty.
|
| + /// Prints nothing if the streams are empty.
|
| Future _printStreams() {
|
| - Future printStream(String streamName, StringInputStream stream) {
|
| - return consumeStringInputStream(stream).then((output) {
|
| - if (output.isEmpty) return;
|
| + void printStream(String streamName, List<String> lines) {
|
| + if (lines.isEmpty) return;
|
|
|
| - print('\nProcess $name $streamName:');
|
| - for (var line in output.trim().split("\n")) {
|
| - print('| $line');
|
| - }
|
| - return;
|
| - });
|
| + print('\nProcess $name $streamName:');
|
| + for (var line in lines) {
|
| + print('| $line');
|
| + }
|
| }
|
|
|
| - return _stdoutFuture.then((stdout) {
|
| - return _stderrFuture.then((stderr) {
|
| - return printStream('stdout', stdout)
|
| - .then((_) => printStream('stderr', stderr));
|
| + return _stdoutLines.then((stdoutLines) {
|
| + printStream('stdout', stdoutLines);
|
| + return _stderrLines.then((stderrLines) {
|
| + printStream('stderr', stderrLines);
|
| });
|
| });
|
| }
|
|
|