Index: pkg/scheduled_test/lib/scheduled_process.dart |
diff --git a/pkg/scheduled_test/lib/scheduled_process.dart b/pkg/scheduled_test/lib/scheduled_process.dart |
index 23370f10942602b5ccc0e42e56dbce3c3d87c70e..049d8b8739140c37f1a6347dffa145760f8f6295 100644 |
--- a/pkg/scheduled_test/lib/scheduled_process.dart |
+++ b/pkg/scheduled_test/lib/scheduled_process.dart |
@@ -39,7 +39,7 @@ class ScheduledProcess { |
Stream<String> _stdoutLog; |
/// A line-by-line view of the standard output stream of the process. |
- Stream<String> _stdout; |
+ StreamIterator<String> _stdout; |
nweiz
2013/05/22 18:42:41
I'm not sure I understand why using a StreamIterat
floitsch
2013/05/22 20:23:35
The moment you cancel a stream it is dead. We have
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Exactly.
The current code subscribed to the same
nweiz
2013/05/24 20:12:31
Okay, I suppose that makes sense. It seems a littl
Lasse Reichstein Nielsen
2013/05/27 08:04:53
Done.
|
/// A canceller that controls both [_stdout] and [_stdoutLog]. |
StreamCanceller _stdoutCanceller; |
@@ -49,7 +49,7 @@ class ScheduledProcess { |
Stream<String> _stderrLog; |
/// A line-by-line view of the standard error stream of the process. |
- Stream<String> _stderr; |
+ StreamIterator<String> _stderr; |
/// A canceller that controls both [_stderr] and [_stderrLog]. |
StreamCanceller _stderrCanceller; |
@@ -102,8 +102,8 @@ class ScheduledProcess { |
_stderrCanceller = stderrWithCanceller.last; |
_stderrLog = stderrWithCanceller.first; |
- _stdout = stdoutStream(); |
- _stderr = stderrStream(); |
+ _stdout = new StreamIterator<String>(stdoutStream()); |
+ _stderr = new StreamIterator<String>(stderrStream()); |
} |
/// Updates [_description] to reflect [executable] and [arguments], which are |
@@ -239,14 +239,49 @@ class ScheduledProcess { |
}, "cleaning up process '$description'"); |
} |
+ // Similar to [streamFirst], but works on streamIterators. |
+ // And it works at all, which streamFirst doesn't. |
floitsch
2013/05/22 16:26:29
Reword. This could be read like an insult.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Ack, working comment left in :)
|
+ Future<String> _streamIteratorFirst(StreamIterator<String> streamIterator) { |
nweiz
2013/05/22 18:42:41
This belongs in utils.dart.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Ok, will move it.
|
+ StackTrace stackTrace; |
+ try { |
+ throw 0; |
+ } catch (_, thrownStackTrace) { |
+ stackTrace = thrownStackTrace; |
+ } |
Bob Nystrom
2013/05/22 16:23:46
Instead of doing this by hand, use new Trace.curre
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Will do.
|
+ return streamIterator.moveNext().then((hasNext) { |
+ if (hasNext) { |
+ return streamIterator.current; |
+ } else { |
+ return new Future.error(new StateError("No elements"), stackTrace); |
+ } |
+ }); |
+ } |
+ |
/// Reads the next line of stdout from the process. |
- Future<String> nextLine() => schedule(() => streamFirst(_stdout), |
+ Future<String> nextLine() => schedule(() => _streamIteratorFirst(_stdout), |
"reading the next stdout line from process '$description'"); |
/// Reads the next line of stderr from the process. |
- Future<String> nextErrLine() => schedule(() => streamFirst(_stderr), |
+ Future<String> nextErrLine() => schedule(() => _streamIteratorFirst(_stderr), |
"reading the next stderr line from process '$description'"); |
+ /// Collects all remaining lines from a [StreamIterator] of lines. |
+ /// |
+ /// Returns the concatenation of the collected lines joined by newlines. |
+ static Future<String> _concatRest(StreamIterator<String> streamIterator) { |
nweiz
2013/05/22 18:42:41
This also belongs in utils.dart.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Done.
|
+ Future<String> collectAll(List<String> accumulator) { |
+ return streamIterator.moveNext().then((bool hasNext) { |
Bob Nystrom
2013/05/22 16:23:46
Don't bother type annotating parameters in fn expr
Lasse Reichstein Nielsen
2013/05/24 06:02:49
ok.
Grudgingly.
How else would I know the type of
Bob Nystrom
2013/05/24 15:54:38
1. You know what you're passing that callback to,
Lasse Reichstein Nielsen
2013/05/27 08:04:53
I don't believe in "knowing" being good enough as
|
+ if (hasNext) { |
+ accumulator.add(streamIterator.current); |
+ return collectAll(accumulator); |
Bob Nystrom
2013/05/22 16:23:46
If the stream has a lot of output, this can build
Lasse Reichstein Nielsen
2013/05/24 06:02:49
I've rewritten this to not chain the futures, and
nweiz
2013/05/24 20:12:31
Working around this here before we know it's actua
Lasse Reichstein Nielsen
2013/05/27 08:04:53
In this case, I actually think the new version is
|
+ } else { |
+ return accumulator.join("\n"); |
Bob Nystrom
2013/05/22 16:23:46
A StringBuffer seems like a more natural fit for t
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Good point.
|
+ } |
+ }); |
+ } |
+ return collectAll(<String>[]); |
+ } |
+ |
/// Reads the remaining stdout from the process. This should only be called |
/// after kill() or shouldExit(). |
Future<String> remainingStdout() { |
@@ -254,8 +289,8 @@ class ScheduledProcess { |
throw new StateError("remainingStdout() should only be called after " |
"kill() or shouldExit()."); |
} |
- |
- return schedule(() => _stdout.toList().then((lines) => lines.join("\n")), |
+ List<String> accumulator = <String>[]; |
Bob Nystrom
2013/05/22 16:23:46
Unused.
floitsch
2013/05/22 16:26:29
Looks unused.
Lasse Reichstein Nielsen
2013/05/24 06:02:49
Well spotted.
|
+ return schedule(() => _concatRest(_stdout), |
"reading the remaining stdout from process '$description'"); |
} |
@@ -267,7 +302,7 @@ class ScheduledProcess { |
"kill() or shouldExit()."); |
} |
- return schedule(() => _stderr.toList().then((lines) => lines.join("\n")), |
+ return schedule(() => _concatRest(_stderr), |
"reading the remaining stderr from process '$description'"); |
} |