Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(346)

Side by Side Diff: pkg/scheduled_test/lib/scheduled_process.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Made tests run (mostly) Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library scheduled_test.scheduled_process; 5 library scheduled_test.scheduled_process;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:io'; 8 import 'dart:io';
9 9
10 import 'scheduled_test.dart'; 10 import 'scheduled_test.dart';
(...skipping 21 matching lines...) Expand all
32 final Encoding _encoding; 32 final Encoding _encoding;
33 33
34 /// The process that's scheduled to run. 34 /// The process that's scheduled to run.
35 ValueFuture<Process> _process; 35 ValueFuture<Process> _process;
36 36
37 /// A fork of [_stdout] that records the standard output of the process. Used 37 /// A fork of [_stdout] that records the standard output of the process. Used
38 /// for debugging information. 38 /// for debugging information.
39 Stream<String> _stdoutLog; 39 Stream<String> _stdoutLog;
40 40
41 /// A line-by-line view of the standard output stream of the process. 41 /// A line-by-line view of the standard output stream of the process.
42 Stream<String> _stdout; 42 StreamIterator<String> _stdout;
nweiz 2013/05/22 18:42:41 I'm not sure I understand why using a StreamIterat
floitsch 2013/05/22 20:23:35 The moment you cancel a stream it is dead. We have
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Exactly. The current code subscribed to the same
nweiz 2013/05/24 20:12:31 Okay, I suppose that makes sense. It seems a littl
Lasse Reichstein Nielsen 2013/05/27 08:04:53 Done.
43 43
44 /// A canceller that controls both [_stdout] and [_stdoutLog]. 44 /// A canceller that controls both [_stdout] and [_stdoutLog].
45 StreamCanceller _stdoutCanceller; 45 StreamCanceller _stdoutCanceller;
46 46
47 /// A fork of [_stderr] that records the standard error of the process. Used 47 /// A fork of [_stderr] that records the standard error of the process. Used
48 /// for debugging information. 48 /// for debugging information.
49 Stream<String> _stderrLog; 49 Stream<String> _stderrLog;
50 50
51 /// A line-by-line view of the standard error stream of the process. 51 /// A line-by-line view of the standard error stream of the process.
52 Stream<String> _stderr; 52 StreamIterator<String> _stderr;
53 53
54 /// A canceller that controls both [_stderr] and [_stderrLog]. 54 /// A canceller that controls both [_stderr] and [_stderrLog].
55 StreamCanceller _stderrCanceller; 55 StreamCanceller _stderrCanceller;
56 56
57 /// The exit code of the process that's scheduled to run. This will naturally 57 /// The exit code of the process that's scheduled to run. This will naturally
58 /// only complete once the process has terminated. 58 /// only complete once the process has terminated.
59 ValueFuture<int> _exitCode; 59 ValueFuture<int> _exitCode;
60 60
61 /// Whether the user has scheduled the end of this process by calling either 61 /// Whether the user has scheduled the end of this process by calling either
62 /// [shouldExit] or [kill]. 62 /// [shouldExit] or [kill].
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
95 var stdoutWithCanceller = _lineStreamWithCanceller( 95 var stdoutWithCanceller = _lineStreamWithCanceller(
96 _process.then((p) => p.stdout)); 96 _process.then((p) => p.stdout));
97 _stdoutCanceller = stdoutWithCanceller.last; 97 _stdoutCanceller = stdoutWithCanceller.last;
98 _stdoutLog = stdoutWithCanceller.first; 98 _stdoutLog = stdoutWithCanceller.first;
99 99
100 var stderrWithCanceller = _lineStreamWithCanceller( 100 var stderrWithCanceller = _lineStreamWithCanceller(
101 _process.then((p) => p.stderr)); 101 _process.then((p) => p.stderr));
102 _stderrCanceller = stderrWithCanceller.last; 102 _stderrCanceller = stderrWithCanceller.last;
103 _stderrLog = stderrWithCanceller.first; 103 _stderrLog = stderrWithCanceller.first;
104 104
105 _stdout = stdoutStream(); 105 _stdout = new StreamIterator<String>(stdoutStream());
106 _stderr = stderrStream(); 106 _stderr = new StreamIterator<String>(stderrStream());
107 } 107 }
108 108
109 /// Updates [_description] to reflect [executable] and [arguments], which are 109 /// Updates [_description] to reflect [executable] and [arguments], which are
110 /// the same values as in [start]. 110 /// the same values as in [start].
111 void _updateDescription(executable, arguments) { 111 void _updateDescription(executable, arguments) {
112 if (_explicitDescription) return; 112 if (_explicitDescription) return;
113 if (executable is Future) { 113 if (executable is Future) {
114 _description = "future process"; 114 _description = "future process";
115 } else if (arguments is Future || arguments.any((e) => e is Future)) { 115 } else if (arguments is Future || arguments.any((e) => e is Future)) {
116 _description = executable; 116 _description = executable;
(...skipping 115 matching lines...) Expand 10 before | Expand all | Expand 10 after
232 "Results of running '$description':\n" 232 "Results of running '$description':\n"
233 "$exitDescription\n" 233 "$exitDescription\n"
234 "Standard output:\n" 234 "Standard output:\n"
235 "${prefixLines(stdout)}\n" 235 "${prefixLines(stdout)}\n"
236 "Standard error:\n" 236 "Standard error:\n"
237 "${prefixLines(stderr)}"); 237 "${prefixLines(stderr)}");
238 }); 238 });
239 }, "cleaning up process '$description'"); 239 }, "cleaning up process '$description'");
240 } 240 }
241 241
242 // Similar to [streamFirst], but works on streamIterators.
243 // And it works at all, which streamFirst doesn't.
floitsch 2013/05/22 16:26:29 Reword. This could be read like an insult.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Ack, working comment left in :)
244 Future<String> _streamIteratorFirst(StreamIterator<String> streamIterator) {
nweiz 2013/05/22 18:42:41 This belongs in utils.dart.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Ok, will move it.
245 StackTrace stackTrace;
246 try {
247 throw 0;
248 } catch (_, thrownStackTrace) {
249 stackTrace = thrownStackTrace;
250 }
Bob Nystrom 2013/05/22 16:23:46 Instead of doing this by hand, use new Trace.curre
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Will do.
251 return streamIterator.moveNext().then((hasNext) {
252 if (hasNext) {
253 return streamIterator.current;
254 } else {
255 return new Future.error(new StateError("No elements"), stackTrace);
256 }
257 });
258 }
259
242 /// Reads the next line of stdout from the process. 260 /// Reads the next line of stdout from the process.
243 Future<String> nextLine() => schedule(() => streamFirst(_stdout), 261 Future<String> nextLine() => schedule(() => _streamIteratorFirst(_stdout),
244 "reading the next stdout line from process '$description'"); 262 "reading the next stdout line from process '$description'");
245 263
246 /// Reads the next line of stderr from the process. 264 /// Reads the next line of stderr from the process.
247 Future<String> nextErrLine() => schedule(() => streamFirst(_stderr), 265 Future<String> nextErrLine() => schedule(() => _streamIteratorFirst(_stderr),
248 "reading the next stderr line from process '$description'"); 266 "reading the next stderr line from process '$description'");
249 267
268 /// Collects all remaining lines from a [StreamIterator] of lines.
269 ///
270 /// Returns the concatenation of the collected lines joined by newlines.
271 static Future<String> _concatRest(StreamIterator<String> streamIterator) {
nweiz 2013/05/22 18:42:41 This also belongs in utils.dart.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Done.
272 Future<String> collectAll(List<String> accumulator) {
273 return streamIterator.moveNext().then((bool hasNext) {
Bob Nystrom 2013/05/22 16:23:46 Don't bother type annotating parameters in fn expr
Lasse Reichstein Nielsen 2013/05/24 06:02:49 ok. Grudgingly. How else would I know the type of
Bob Nystrom 2013/05/24 15:54:38 1. You know what you're passing that callback to,
Lasse Reichstein Nielsen 2013/05/27 08:04:53 I don't believe in "knowing" being good enough as
274 if (hasNext) {
275 accumulator.add(streamIterator.current);
276 return collectAll(accumulator);
Bob Nystrom 2013/05/22 16:23:46 If the stream has a lot of output, this can build
Lasse Reichstein Nielsen 2013/05/24 06:02:49 I've rewritten this to not chain the futures, and
nweiz 2013/05/24 20:12:31 Working around this here before we know it's actua
Lasse Reichstein Nielsen 2013/05/27 08:04:53 In this case, I actually think the new version is
277 } else {
278 return accumulator.join("\n");
Bob Nystrom 2013/05/22 16:23:46 A StringBuffer seems like a more natural fit for t
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Good point.
279 }
280 });
281 }
282 return collectAll(<String>[]);
283 }
284
250 /// Reads the remaining stdout from the process. This should only be called 285 /// Reads the remaining stdout from the process. This should only be called
251 /// after kill() or shouldExit(). 286 /// after kill() or shouldExit().
252 Future<String> remainingStdout() { 287 Future<String> remainingStdout() {
253 if (!_endScheduled) { 288 if (!_endScheduled) {
254 throw new StateError("remainingStdout() should only be called after " 289 throw new StateError("remainingStdout() should only be called after "
255 "kill() or shouldExit()."); 290 "kill() or shouldExit().");
256 } 291 }
257 292 List<String> accumulator = <String>[];
Bob Nystrom 2013/05/22 16:23:46 Unused.
floitsch 2013/05/22 16:26:29 Looks unused.
Lasse Reichstein Nielsen 2013/05/24 06:02:49 Well spotted.
258 return schedule(() => _stdout.toList().then((lines) => lines.join("\n")), 293 return schedule(() => _concatRest(_stdout),
259 "reading the remaining stdout from process '$description'"); 294 "reading the remaining stdout from process '$description'");
260 } 295 }
261 296
262 /// Reads the remaining stderr from the process. This should only be called 297 /// Reads the remaining stderr from the process. This should only be called
263 /// after kill() or shouldExit(). 298 /// after kill() or shouldExit().
264 Future<String> remainingStderr() { 299 Future<String> remainingStderr() {
265 if (!_endScheduled) { 300 if (!_endScheduled) {
266 throw new StateError("remainingStderr() should only be called after " 301 throw new StateError("remainingStderr() should only be called after "
267 "kill() or shouldExit()."); 302 "kill() or shouldExit().");
268 } 303 }
269 304
270 return schedule(() => _stderr.toList().then((lines) => lines.join("\n")), 305 return schedule(() => _concatRest(_stderr),
271 "reading the remaining stderr from process '$description'"); 306 "reading the remaining stderr from process '$description'");
272 } 307 }
273 308
274 /// Returns a stream that will emit anything the process emits via the 309 /// Returns a stream that will emit anything the process emits via the
275 /// process's standard output from now on. 310 /// process's standard output from now on.
276 /// 311 ///
277 /// This stream will be independent from any other methods that deal with 312 /// This stream will be independent from any other methods that deal with
278 /// standard output, including other calls to [stdoutStream]. 313 /// standard output, including other calls to [stdoutStream].
279 /// 314 ///
280 /// This can be overridden by subclasses to return a derived standard output 315 /// This can be overridden by subclasses to return a derived standard output
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
336 schedule(() { 371 schedule(() {
337 _endExpected = true; 372 _endExpected = true;
338 return _exitCode.then((exitCode) { 373 return _exitCode.then((exitCode) {
339 if (expectedExitCode != null) { 374 if (expectedExitCode != null) {
340 expect(exitCode, equals(expectedExitCode)); 375 expect(exitCode, equals(expectedExitCode));
341 } 376 }
342 }); 377 });
343 }, "waiting for process '$description' to exit"); 378 }, "waiting for process '$description' to exit");
344 } 379 }
345 } 380 }
OLDNEW
« no previous file with comments | « no previous file | pkg/scheduled_test/lib/src/mock_clock.dart » ('j') | pkg/scheduled_test/lib/src/mock_clock.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698