Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1014)

Unified Diff: pkg/scheduled_test/lib/scheduled_process.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: pkg/scheduled_test/lib/scheduled_process.dart
diff --git a/pkg/scheduled_test/lib/scheduled_process.dart b/pkg/scheduled_test/lib/scheduled_process.dart
index 23370f10942602b5ccc0e42e56dbce3c3d87c70e..049d8b8739140c37f1a6347dffa145760f8f6295 100644
--- a/pkg/scheduled_test/lib/scheduled_process.dart
+++ b/pkg/scheduled_test/lib/scheduled_process.dart
@@ -39,7 +39,7 @@ class ScheduledProcess {
Stream<String> _stdoutLog;
/// A line-by-line view of the standard output stream of the process.
- Stream<String> _stdout;
+ StreamIterator<String> _stdout;
nweiz 2013/05/22 18:42:41 I'm not sure I understand why using a StreamIterat
floitsch 2013/05/22 20:23:35 The moment you cancel a stream it is dead. We have
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Exactly. The current code subscribed to the same
nweiz 2013/05/24 20:12:31 Okay, I suppose that makes sense. It seems a littl
Lasse Reichstein Nielsen 2013/05/27 08:04:53 Done.
/// A canceller that controls both [_stdout] and [_stdoutLog].
StreamCanceller _stdoutCanceller;
@@ -49,7 +49,7 @@ class ScheduledProcess {
Stream<String> _stderrLog;
/// A line-by-line view of the standard error stream of the process.
- Stream<String> _stderr;
+ StreamIterator<String> _stderr;
/// A canceller that controls both [_stderr] and [_stderrLog].
StreamCanceller _stderrCanceller;
@@ -102,8 +102,8 @@ class ScheduledProcess {
_stderrCanceller = stderrWithCanceller.last;
_stderrLog = stderrWithCanceller.first;
- _stdout = stdoutStream();
- _stderr = stderrStream();
+ _stdout = new StreamIterator<String>(stdoutStream());
+ _stderr = new StreamIterator<String>(stderrStream());
}
/// Updates [_description] to reflect [executable] and [arguments], which are
@@ -239,14 +239,49 @@ class ScheduledProcess {
}, "cleaning up process '$description'");
}
+ // Similar to [streamFirst], but works on streamIterators.
+ // And it works at all, which streamFirst doesn't.
floitsch 2013/05/22 16:26:29 Reword. This could be read like an insult.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Ack, working comment left in :)
+ Future<String> _streamIteratorFirst(StreamIterator<String> streamIterator) {
nweiz 2013/05/22 18:42:41 This belongs in utils.dart.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Ok, will move it.
+ StackTrace stackTrace;
+ try {
+ throw 0;
+ } catch (_, thrownStackTrace) {
+ stackTrace = thrownStackTrace;
+ }
Bob Nystrom 2013/05/22 16:23:46 Instead of doing this by hand, use new Trace.curre
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Will do.
+ return streamIterator.moveNext().then((hasNext) {
+ if (hasNext) {
+ return streamIterator.current;
+ } else {
+ return new Future.error(new StateError("No elements"), stackTrace);
+ }
+ });
+ }
+
/// Reads the next line of stdout from the process.
- Future<String> nextLine() => schedule(() => streamFirst(_stdout),
+ Future<String> nextLine() => schedule(() => _streamIteratorFirst(_stdout),
"reading the next stdout line from process '$description'");
/// Reads the next line of stderr from the process.
- Future<String> nextErrLine() => schedule(() => streamFirst(_stderr),
+ Future<String> nextErrLine() => schedule(() => _streamIteratorFirst(_stderr),
"reading the next stderr line from process '$description'");
+ /// Collects all remaining lines from a [StreamIterator] of lines.
+ ///
+ /// Returns the concatenation of the collected lines joined by newlines.
+ static Future<String> _concatRest(StreamIterator<String> streamIterator) {
nweiz 2013/05/22 18:42:41 This also belongs in utils.dart.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
+ Future<String> collectAll(List<String> accumulator) {
+ return streamIterator.moveNext().then((bool hasNext) {
Bob Nystrom 2013/05/22 16:23:46 Don't bother type annotating parameters in fn expr
Lasse Reichstein Nielsen 2013/05/24 06:02:49 ok. Grudgingly. How else would I know the type of
Bob Nystrom 2013/05/24 15:54:38 1. You know what you're passing that callback to,
Lasse Reichstein Nielsen 2013/05/27 08:04:53 I don't believe in "knowing" being good enough as
+ if (hasNext) {
+ accumulator.add(streamIterator.current);
+ return collectAll(accumulator);
Bob Nystrom 2013/05/22 16:23:46 If the stream has a lot of output, this can build
Lasse Reichstein Nielsen 2013/05/24 06:02:49 I've rewritten this to not chain the futures, and
nweiz 2013/05/24 20:12:31 Working around this here before we know it's actua
Lasse Reichstein Nielsen 2013/05/27 08:04:53 In this case, I actually think the new version is
+ } else {
+ return accumulator.join("\n");
Bob Nystrom 2013/05/22 16:23:46 A StringBuffer seems like a more natural fit for t
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Good point.
+ }
+ });
+ }
+ return collectAll(<String>[]);
+ }
+
/// Reads the remaining stdout from the process. This should only be called
/// after kill() or shouldExit().
Future<String> remainingStdout() {
@@ -254,8 +289,8 @@ class ScheduledProcess {
throw new StateError("remainingStdout() should only be called after "
"kill() or shouldExit().");
}
-
- return schedule(() => _stdout.toList().then((lines) => lines.join("\n")),
+ List<String> accumulator = <String>[];
Bob Nystrom 2013/05/22 16:23:46 Unused.
floitsch 2013/05/22 16:26:29 Looks unused.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Well spotted.
+ return schedule(() => _concatRest(_stdout),
"reading the remaining stdout from process '$description'");
}
@@ -267,7 +302,7 @@ class ScheduledProcess {
"kill() or shouldExit().");
}
- return schedule(() => _stderr.toList().then((lines) => lines.join("\n")),
+ return schedule(() => _concatRest(_stderr),
"reading the remaining stderr from process '$description'");
}
« no previous file with comments | « no previous file | pkg/scheduled_test/lib/src/mock_clock.dart » ('j') | pkg/scheduled_test/lib/src/mock_clock.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698