Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(40)

Unified Diff: utils/tests/pub/test_pub.dart

Issue 12021008: Use the dart:async Stream API thoroughly in Pub. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Code review changes Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « utils/tests/pub/pub_uploader_test.dart ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: utils/tests/pub/test_pub.dart
diff --git a/utils/tests/pub/test_pub.dart b/utils/tests/pub/test_pub.dart
index 87053b9a3f1b4ae0e6e09a2e6c44b570b8d27f5b..339d600b61807fe42b7ecdc330b77cd7e941c753 100644
--- a/utils/tests/pub/test_pub.dart
+++ b/utils/tests/pub/test_pub.dart
@@ -95,8 +95,7 @@ void serve([List<Descriptor> contents]) {
return;
}
- var future = consumeInputStream(stream);
- future.then((data) {
+ stream.toBytes().then((data) {
response.statusCode = 200;
response.contentLength = data.length;
response.outputStream.write(data);
@@ -765,7 +764,7 @@ abstract class Descriptor {
/// Loads the file at [path] from within this descriptor. If [path] is empty,
/// loads the contents of the descriptor itself.
- InputStream load(List<String> path);
+ ByteStream load(List<String> path);
/// Schedules the directory to be created before Pub is run with [runPub].
/// The directory will be created relative to the sandbox directory.
@@ -880,16 +879,13 @@ class FileDescriptor extends Descriptor {
}
/// Loads the contents of the file.
- InputStream load(List<String> path) {
+ ByteStream load(List<String> path) {
if (!path.isEmpty) {
var joinedPath = Strings.join(path, '/');
throw "Can't load $joinedPath from within $name: not a directory.";
}
- var stream = new ListInputStream();
- stream.write(contents.charCodes);
- stream.markEndOfStream();
- return stream;
+ return new ByteStream.fromBytes(contents.charCodes);
}
}
@@ -941,7 +937,7 @@ class DirectoryDescriptor extends Descriptor {
}
/// Loads [path] from within this directory.
- InputStream load(List<String> path) {
+ ByteStream load(List<String> path) {
if (path.isEmpty) {
throw "Can't load the contents of $name: is a directory.";
}
@@ -971,10 +967,10 @@ class FutureDescriptor extends Descriptor {
Future delete(dir) => _future.then((desc) => desc.delete(dir));
- InputStream load(List<String> path) {
- var resultStream = new ListInputStream();
- _future.then((desc) => pipeInputToInput(desc.load(path), resultStream));
- return resultStream;
+ ByteStream load(List<String> path) {
+ var controller = new StreamController<List<int>>();
+ _future.then((desc) => store(desc.load(path), controller));
+ return new ByteStream(controller.stream);
}
}
@@ -1074,7 +1070,7 @@ class TarFileDescriptor extends Descriptor {
tempDir = _tempDir;
return Future.wait(contents.mappedBy((child) => child.create(tempDir)));
}).then((createdContents) {
- return consumeInputStream(createTarGz(createdContents, baseDir: tempDir));
+ return createTarGz(createdContents, baseDir: tempDir).toBytes();
}).then((bytes) {
return new File(join(parentDir, _stringName)).writeAsBytes(bytes);
}).then((file) {
@@ -1093,13 +1089,13 @@ class TarFileDescriptor extends Descriptor {
}
/// Loads the contents of this tar file.
- InputStream load(List<String> path) {
+ ByteStream load(List<String> path) {
if (!path.isEmpty) {
var joinedPath = Strings.join(path, '/');
throw "Can't load $joinedPath from within $name: not a directory.";
}
- var sinkStream = new ListInputStream();
+ var controller = new StreamController<List<int>>();
var tempDir;
// TODO(rnystrom): Use withTempDir() here.
// TODO(nweiz): propagate any errors to the return value. See issue 3657.
@@ -1108,11 +1104,11 @@ class TarFileDescriptor extends Descriptor {
return create(tempDir);
}).then((tar) {
var sourceStream = tar.openInputStream();
- return pipeInputToInput(sourceStream, sinkStream).then((_) {
+ return store(wrapInputStream(sourceStream), controller).then((_) {
tempDir.delete(recursive: true);
});
});
- return sinkStream;
+ return new ByteStream(controller.stream);
}
}
@@ -1129,7 +1125,7 @@ class NothingDescriptor extends Descriptor {
});
}
- InputStream load(List<String> path) {
+ ByteStream load(List<String> path) {
if (path.isEmpty) {
throw "Can't load the contents of $name: it doesn't exist.";
} else {
@@ -1200,7 +1196,7 @@ class ScheduledProcess {
final String name;
/// The process future that's scheduled to run.
- Future<Process> _processFuture;
+ Future<PubProcess> _processFuture;
/// The process that's scheduled to run. It may be null.
Process _process;
@@ -1208,13 +1204,35 @@ class ScheduledProcess {
/// The exit code of the scheduled program. It may be null.
int _exitCode;
- /// A [StringInputStream] wrapping the stdout of the process that's scheduled
- /// to run.
- final Future<StringInputStream> _stdoutFuture;
+ /// A future that will complete to a list of all the lines emitted on the
+ /// process's standard output stream. This is independent of what data is read
+ /// from [_stdout].
+ Future<List<String>> _stdoutLines;
- /// A [StringInputStream] wrapping the stderr of the process that's scheduled
- /// to run.
- final Future<StringInputStream> _stderrFuture;
+ /// A [Stream] of stdout lines emitted by the process that's scheduled to run.
+ /// It may be null.
+ Stream<String> _stdout;
+
+ /// A [Future] that will resolve to [_stdout] once it's available.
+ Future get _stdoutFuture => _processFuture.then((_) => _stdout);
+
+ /// A [StreamSubscription] that controls [_stdout].
+ StreamSubscription _stdoutSubscription;
+
+ /// A future that will complete to a list of all the lines emitted on the
+ /// process's standard error stream. This is independent of what data is read
+ /// from [_stderr].
+ Future<List<String>> _stderrLines;
+
+ /// A [Stream] of stderr lines emitted by the process that's scheduled to run.
+ /// It may be null.
+ Stream<String> _stderr;
+
+ /// A [Future] that will resolve to [_stderr] once it's available.
+ Future get _stderrFuture => _processFuture.then((_) => _stderr);
+
+ /// A [StreamSubscription] that controls [_stderr].
+ StreamSubscription _stderrSubscription;
/// The exit code of the process that's scheduled to run. This will naturally
/// only complete once the process has terminated.
@@ -1231,42 +1249,57 @@ class ScheduledProcess {
bool _endExpected = false;
/// Wraps a [Process] [Future] in a scheduled process.
- ScheduledProcess(this.name, Future<Process> process)
- : _processFuture = process,
- _stdoutFuture = process.then((p) => new StringInputStream(p.stdout)),
- _stderrFuture = process.then((p) => new StringInputStream(p.stderr)) {
- process.then((p) {
+ ScheduledProcess(this.name, Future<PubProcess> process)
+ : _processFuture = process {
+ var pairFuture = process.then((p) {
_process = p;
+
+ var stdoutTee = tee(p.stdout.handleError((e) {
+ registerException(e.error, e.stackTrace);
+ }));
+ var stdoutPair = streamWithSubscription(stdoutTee.last);
+ _stdout = stdoutPair.first;
+ _stdoutSubscription = stdoutPair.last;
+
+ var stderrTee = tee(p.stderr.handleError((e) {
+ registerException(e.error, e.stackTrace);
+ }));
+ var stderrPair = streamWithSubscription(stderrTee.last);
+ _stderr = stderrPair.first;
+ _stderrSubscription = stderrPair.last;
+
+ return new Pair(stdoutTee.first, stderrTee.first);
});
+ _stdoutLines = pairFuture.then((pair) => pair.first.toList());
+ _stderrLines = pairFuture.then((pair) => pair.last.toList());
+
_schedule((_) {
if (!_endScheduled) {
throw new StateError("Scheduled process $name must have shouldExit() "
"or kill() called before the test is run.");
}
- return process.then((p) {
- p.onExit = (c) {
+ return process.then((p) => p.exitCode).then((exitCode) {
+ if (_endExpected) {
+ _exitCode = exitCode;
+ _exitCodeCompleter.complete(exitCode);
+ return;
+ }
+
+ // Sleep for half a second in case _endExpected is set in the next
+ // scheduled event.
+ sleep(500).then((_) {
if (_endExpected) {
- _exitCode = c;
- _exitCodeCompleter.complete(c);
+ _exitCodeCompleter.complete(exitCode);
return;
}
- // Sleep for half a second in case _endExpected is set in the next
- // scheduled event.
- sleep(500).then((_) {
- if (_endExpected) {
- _exitCodeCompleter.complete(c);
- return;
- }
-
- _printStreams().then((_) {
- registerException(new ExpectException("Process $name ended "
- "earlier than scheduled with exit code $c"));
- });
+ return _printStreams().then((_) {
+ registerException(new ExpectException("Process $name ended "
+ "earlier than scheduled with exit code $exitCode"));
});
- };
+ });
});
});
@@ -1286,15 +1319,15 @@ class ScheduledProcess {
if (_process == null) return;
// Ensure that the process is dead and we aren't waiting on any IO.
_process.kill();
- _process.stdout.close();
- _process.stderr.close();
+ _stdoutSubscription.cancel();
+ _stderrSubscription.cancel();
});
}
/// Reads the next line of stdout from the process.
Future<String> nextLine() {
return _scheduleValue((_) {
- return timeout(_stdoutFuture.then((stream) => readLine(stream)),
+ return timeout(_stdoutFuture.then((stream) => streamFirst(stream)),
_SCHEDULE_TIMEOUT,
"waiting for the next stdout line from process $name");
});
@@ -1303,7 +1336,7 @@ class ScheduledProcess {
/// Reads the next line of stderr from the process.
Future<String> nextErrLine() {
return _scheduleValue((_) {
- return timeout(_stderrFuture.then((stream) => readLine(stream)),
+ return timeout(_stderrFuture.then((stream) => streamFirst(stream)),
_SCHEDULE_TIMEOUT,
"waiting for the next stderr line from process $name");
});
@@ -1318,7 +1351,8 @@ class ScheduledProcess {
}
return _scheduleValue((_) {
- return timeout(_stdoutFuture.then(consumeStringInputStream),
+ return timeout(_stdoutFuture.then((stream) => stream.toList())
+ .then((lines) => lines.join("\n")),
_SCHEDULE_TIMEOUT,
"waiting for the last stdout line from process $name");
});
@@ -1333,7 +1367,8 @@ class ScheduledProcess {
}
return _scheduleValue((_) {
- return timeout(_stderrFuture.then(consumeStringInputStream),
+ return timeout(_stderrFuture.then((stream) => stream.toList())
+ .then((lines) => lines.join("\n")),
_SCHEDULE_TIMEOUT,
"waiting for the last stderr line from process $name");
});
@@ -1342,7 +1377,7 @@ class ScheduledProcess {
/// Writes [line] to the process as stdin.
void writeLine(String line) {
_schedule((_) => _processFuture.then(
- (p) => p.stdin.writeString('$line\n')));
+ (p) => p.stdin.write('$line\n'.charCodes)));
}
/// Kills the process, and waits until it's dead.
@@ -1351,7 +1386,7 @@ class ScheduledProcess {
_schedule((_) {
_endExpected = true;
_process.kill();
- timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT,
+ timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT,
"waiting for process $name to die");
});
}
@@ -1362,7 +1397,7 @@ class ScheduledProcess {
_endScheduled = true;
_schedule((_) {
_endExpected = true;
- return timeout(_exitCodeCompleter.future, _SCHEDULE_TIMEOUT,
+ return timeout(_exitCodeFuture, _SCHEDULE_TIMEOUT,
"waiting for process $name to exit").then((exitCode) {
if (expectedExitCode != null) {
expect(exitCode, equals(expectedExitCode));
@@ -1372,24 +1407,21 @@ class ScheduledProcess {
}
/// Prints the remaining data in the process's stdout and stderr streams.
- /// Prints nothing if the straems are empty.
+ /// Prints nothing if the streams are empty.
Future _printStreams() {
- Future printStream(String streamName, StringInputStream stream) {
- return consumeStringInputStream(stream).then((output) {
- if (output.isEmpty) return;
+ void printStream(String streamName, List<String> lines) {
+ if (lines.isEmpty) return;
- print('\nProcess $name $streamName:');
- for (var line in output.trim().split("\n")) {
- print('| $line');
- }
- return;
- });
+ print('\nProcess $name $streamName:');
+ for (var line in lines) {
+ print('| $line');
+ }
}
- return _stdoutFuture.then((stdout) {
- return _stderrFuture.then((stderr) {
- return printStream('stdout', stdout)
- .then((_) => printStream('stderr', stderr));
+ return _stdoutLines.then((stdoutLines) {
+ printStream('stdout', stdoutLines);
+ return _stderrLines.then((stderrLines) {
+ printStream('stderr', stderrLines);
});
});
}
« no previous file with comments | « utils/tests/pub/pub_uploader_test.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698