Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(309)

Side by Side Diff: pkg/scheduled_test/lib/scheduled_process.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Addressed comments. Created 7 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library scheduled_test.scheduled_process; 5 library scheduled_test.scheduled_process;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:io'; 8 import 'dart:io';
9 9
10 import 'scheduled_test.dart'; 10 import 'scheduled_test.dart';
(...skipping 21 matching lines...) Expand all
32 final Encoding _encoding; 32 final Encoding _encoding;
33 33
34 /// The process that's scheduled to run. 34 /// The process that's scheduled to run.
35 ValueFuture<Process> _process; 35 ValueFuture<Process> _process;
36 36
37 /// A fork of [_stdout] that records the standard output of the process. Used 37 /// A fork of [_stdout] that records the standard output of the process. Used
38 /// for debugging information. 38 /// for debugging information.
39 Stream<String> _stdoutLog; 39 Stream<String> _stdoutLog;
40 40
41 /// A line-by-line view of the standard output stream of the process. 41 /// A line-by-line view of the standard output stream of the process.
42 Stream<String> _stdout; 42 StreamIterator<String> _stdout;
43 43
44 /// A canceller that controls both [_stdout] and [_stdoutLog]. 44 /// A canceller that controls both [_stdout] and [_stdoutLog].
45 StreamCanceller _stdoutCanceller; 45 StreamCanceller _stdoutCanceller;
46 46
47 /// A fork of [_stderr] that records the standard error of the process. Used 47 /// A fork of [_stderr] that records the standard error of the process. Used
48 /// for debugging information. 48 /// for debugging information.
49 Stream<String> _stderrLog; 49 Stream<String> _stderrLog;
50 50
51 /// A line-by-line view of the standard error stream of the process. 51 /// A line-by-line view of the standard error stream of the process.
52 Stream<String> _stderr; 52 StreamIterator<String> _stderr;
53 53
54 /// A canceller that controls both [_stderr] and [_stderrLog]. 54 /// A canceller that controls both [_stderr] and [_stderrLog].
55 StreamCanceller _stderrCanceller; 55 StreamCanceller _stderrCanceller;
56 56
57 /// The exit code of the process that's scheduled to run. This will naturally 57 /// The exit code of the process that's scheduled to run. This will naturally
58 /// only complete once the process has terminated. 58 /// only complete once the process has terminated.
59 ValueFuture<int> _exitCode; 59 ValueFuture<int> _exitCode;
60 60
61 /// Whether the user has scheduled the end of this process by calling either 61 /// Whether the user has scheduled the end of this process by calling either
62 /// [shouldExit] or [kill]. 62 /// [shouldExit] or [kill].
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
96 var stdoutWithCanceller = _lineStreamWithCanceller( 96 var stdoutWithCanceller = _lineStreamWithCanceller(
97 _process.then((p) => p.stdout)); 97 _process.then((p) => p.stdout));
98 _stdoutCanceller = stdoutWithCanceller.last; 98 _stdoutCanceller = stdoutWithCanceller.last;
99 _stdoutLog = stdoutWithCanceller.first; 99 _stdoutLog = stdoutWithCanceller.first;
100 100
101 var stderrWithCanceller = _lineStreamWithCanceller( 101 var stderrWithCanceller = _lineStreamWithCanceller(
102 _process.then((p) => p.stderr)); 102 _process.then((p) => p.stderr));
103 _stderrCanceller = stderrWithCanceller.last; 103 _stderrCanceller = stderrWithCanceller.last;
104 _stderrLog = stderrWithCanceller.first; 104 _stderrLog = stderrWithCanceller.first;
105 105
106 _stdout = stdoutStream(); 106 _stdout = new StreamIterator<String>(stdoutStream());
107 _stderr = stderrStream(); 107 _stderr = new StreamIterator<String>(stderrStream());
108 } 108 }
109 109
110 /// Updates [_description] to reflect [executable] and [arguments], which are 110 /// Updates [_description] to reflect [executable] and [arguments], which are
111 /// the same values as in [start]. 111 /// the same values as in [start].
112 void _updateDescription(executable, arguments) { 112 void _updateDescription(executable, arguments) {
113 if (_explicitDescription) return; 113 if (_explicitDescription) return;
114 if (executable is Future) { 114 if (executable is Future) {
115 _description = "future process"; 115 _description = "future process";
116 } else if (arguments is Future || arguments.any((e) => e is Future)) { 116 } else if (arguments is Future || arguments.any((e) => e is Future)) {
117 _description = executable; 117 _description = executable;
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
242 "$exitDescription\n" 242 "$exitDescription\n"
243 "Standard output:\n" 243 "Standard output:\n"
244 "${prefixLines(stdout)}\n" 244 "${prefixLines(stdout)}\n"
245 "Standard error:\n" 245 "Standard error:\n"
246 "${prefixLines(stderr)}"); 246 "${prefixLines(stderr)}");
247 }); 247 });
248 }, "cleaning up process '$description'"); 248 }, "cleaning up process '$description'");
249 } 249 }
250 250
251 /// Reads the next line of stdout from the process. 251 /// Reads the next line of stdout from the process.
252 Future<String> nextLine() => schedule(() => streamFirst(_stdout), 252 Future<String> nextLine() => schedule(() => streamIteratorFirst(_stdout),
253 "reading the next stdout line from process '$description'"); 253 "reading the next stdout line from process '$description'");
254 254
255 /// Reads the next line of stderr from the process. 255 /// Reads the next line of stderr from the process.
256 Future<String> nextErrLine() => schedule(() => streamFirst(_stderr), 256 Future<String> nextErrLine() => schedule(() => streamIteratorFirst(_stderr),
257 "reading the next stderr line from process '$description'"); 257 "reading the next stderr line from process '$description'");
258 258
259 /// Reads the remaining stdout from the process. This should only be called 259 /// Reads the remaining stdout from the process. This should only be called
260 /// after kill() or shouldExit(). 260 /// after kill() or shouldExit().
261 Future<String> remainingStdout() { 261 Future<String> remainingStdout() {
262 if (!_endScheduled) { 262 if (!_endScheduled) {
263 throw new StateError("remainingStdout() should only be called after " 263 throw new StateError("remainingStdout() should only be called after "
264 "kill() or shouldExit()."); 264 "kill() or shouldExit().");
265 } 265 }
266 266 return schedule(() => concatRest(_stdout),
267 return schedule(() => _stdout.toList().then((lines) => lines.join("\n")),
268 "reading the remaining stdout from process '$description'"); 267 "reading the remaining stdout from process '$description'");
269 } 268 }
270 269
271 /// Reads the remaining stderr from the process. This should only be called 270 /// Reads the remaining stderr from the process. This should only be called
272 /// after kill() or shouldExit(). 271 /// after kill() or shouldExit().
273 Future<String> remainingStderr() { 272 Future<String> remainingStderr() {
274 if (!_endScheduled) { 273 if (!_endScheduled) {
275 throw new StateError("remainingStderr() should only be called after " 274 throw new StateError("remainingStderr() should only be called after "
276 "kill() or shouldExit()."); 275 "kill() or shouldExit().");
277 } 276 }
278 277
279 return schedule(() => _stderr.toList().then((lines) => lines.join("\n")), 278 return schedule(() => concatRest(_stderr),
280 "reading the remaining stderr from process '$description'"); 279 "reading the remaining stderr from process '$description'");
281 } 280 }
282 281
283 /// Returns a stream that will emit anything the process emits via the 282 /// Returns a stream that will emit anything the process emits via the
284 /// process's standard output from now on. 283 /// process's standard output from now on.
285 /// 284 ///
286 /// This stream will be independent from any other methods that deal with 285 /// This stream will be independent from any other methods that deal with
287 /// standard output, including other calls to [stdoutStream]. 286 /// standard output, including other calls to [stdoutStream].
288 /// 287 ///
289 /// This can be overridden by subclasses to return a derived standard output 288 /// This can be overridden by subclasses to return a derived standard output
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
345 schedule(() { 344 schedule(() {
346 _endExpected = true; 345 _endExpected = true;
347 return _exitCode.then((exitCode) { 346 return _exitCode.then((exitCode) {
348 if (expectedExitCode != null) { 347 if (expectedExitCode != null) {
349 expect(exitCode, equals(expectedExitCode)); 348 expect(exitCode, equals(expectedExitCode));
350 } 349 }
351 }); 350 });
352 }, "waiting for process '$description' to exit"); 351 }, "waiting for process '$description' to exit");
353 } 352 }
354 } 353 }
OLDNEW
« no previous file with comments | « no previous file | pkg/scheduled_test/lib/src/mock_clock.dart » ('j') | sdk/lib/async/stream_impl.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698