Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(468)

Side by Side Diff: pkg/scheduled_test/lib/scheduled_process.dart

Issue 14753009: Make StreamSubscription be the active part of a stream. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Address comments. Created 7 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | pkg/scheduled_test/lib/src/mock_clock.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library scheduled_test.scheduled_process; 5 library scheduled_test.scheduled_process;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:io'; 8 import 'dart:io';
9 9
10 import 'scheduled_test.dart'; 10 import 'scheduled_test.dart';
(...skipping 21 matching lines...) Expand all
32 final Encoding _encoding; 32 final Encoding _encoding;
33 33
34 /// The process that's scheduled to run. 34 /// The process that's scheduled to run.
35 ValueFuture<Process> _process; 35 ValueFuture<Process> _process;
36 36
37 /// A fork of [_stdout] that records the standard output of the process. Used 37 /// A fork of [_stdout] that records the standard output of the process. Used
38 /// for debugging information. 38 /// for debugging information.
39 Stream<String> _stdoutLog; 39 Stream<String> _stdoutLog;
40 40
41 /// A line-by-line view of the standard output stream of the process. 41 /// A line-by-line view of the standard output stream of the process.
42 Stream<String> _stdout; 42 StreamIterator<String> _stdout;
43 43
44 /// A canceller that controls both [_stdout] and [_stdoutLog]. 44 /// A canceller that controls both [_stdout] and [_stdoutLog].
45 StreamCanceller _stdoutCanceller; 45 StreamCanceller _stdoutCanceller;
46 46
47 /// A fork of [_stderr] that records the standard error of the process. Used 47 /// A fork of [_stderr] that records the standard error of the process. Used
48 /// for debugging information. 48 /// for debugging information.
49 Stream<String> _stderrLog; 49 Stream<String> _stderrLog;
50 50
51 /// A line-by-line view of the standard error stream of the process. 51 /// A line-by-line view of the standard error stream of the process.
52 Stream<String> _stderr; 52 StreamIterator<String> _stderr;
53 53
54 /// A canceller that controls both [_stderr] and [_stderrLog]. 54 /// A canceller that controls both [_stderr] and [_stderrLog].
55 StreamCanceller _stderrCanceller; 55 StreamCanceller _stderrCanceller;
56 56
57 /// The exit code of the process that's scheduled to run. This will naturally 57 /// The exit code of the process that's scheduled to run. This will naturally
58 /// only complete once the process has terminated. 58 /// only complete once the process has terminated.
59 ValueFuture<int> _exitCode; 59 ValueFuture<int> _exitCode;
60 60
61 /// Whether the user has scheduled the end of this process by calling either 61 /// Whether the user has scheduled the end of this process by calling either
62 /// [shouldExit] or [kill]. 62 /// [shouldExit] or [kill].
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
96 var stdoutWithCanceller = _lineStreamWithCanceller( 96 var stdoutWithCanceller = _lineStreamWithCanceller(
97 _process.then((p) => p.stdout)); 97 _process.then((p) => p.stdout));
98 _stdoutCanceller = stdoutWithCanceller.last; 98 _stdoutCanceller = stdoutWithCanceller.last;
99 _stdoutLog = stdoutWithCanceller.first; 99 _stdoutLog = stdoutWithCanceller.first;
100 100
101 var stderrWithCanceller = _lineStreamWithCanceller( 101 var stderrWithCanceller = _lineStreamWithCanceller(
102 _process.then((p) => p.stderr)); 102 _process.then((p) => p.stderr));
103 _stderrCanceller = stderrWithCanceller.last; 103 _stderrCanceller = stderrWithCanceller.last;
104 _stderrLog = stderrWithCanceller.first; 104 _stderrLog = stderrWithCanceller.first;
105 105
106 _stdout = stdoutStream(); 106 _stdout = new StreamIterator<String>(stdoutStream());
107 _stderr = stderrStream(); 107 _stderr = new StreamIterator<String>(stderrStream());
108 } 108 }
109 109
110 /// Updates [_description] to reflect [executable] and [arguments], which are 110 /// Updates [_description] to reflect [executable] and [arguments], which are
111 /// the same values as in [start]. 111 /// the same values as in [start].
112 void _updateDescription(executable, arguments) { 112 void _updateDescription(executable, arguments) {
113 if (_explicitDescription) return; 113 if (_explicitDescription) return;
114 if (executable is Future) { 114 if (executable is Future) {
115 _description = "future process"; 115 _description = "future process";
116 } else if (arguments is Future || arguments.any((e) => e is Future)) { 116 } else if (arguments is Future || arguments.any((e) => e is Future)) {
117 _description = executable; 117 _description = executable;
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
241 "$exitDescription\n" 241 "$exitDescription\n"
242 "Standard output:\n" 242 "Standard output:\n"
243 "${prefixLines(stdout)}\n" 243 "${prefixLines(stdout)}\n"
244 "Standard error:\n" 244 "Standard error:\n"
245 "${prefixLines(stderr)}"); 245 "${prefixLines(stderr)}");
246 }); 246 });
247 }, "cleaning up process '$description'"); 247 }, "cleaning up process '$description'");
248 } 248 }
249 249
250 /// Reads the next line of stdout from the process. 250 /// Reads the next line of stdout from the process.
251 Future<String> nextLine() => schedule(() => streamFirst(_stdout), 251 Future<String> nextLine() => schedule(() => streamIteratorFirst(_stdout),
252 "reading the next stdout line from process '$description'"); 252 "reading the next stdout line from process '$description'");
253 253
254 /// Reads the next line of stderr from the process. 254 /// Reads the next line of stderr from the process.
255 Future<String> nextErrLine() => schedule(() => streamFirst(_stderr), 255 Future<String> nextErrLine() => schedule(() => streamIteratorFirst(_stderr),
256 "reading the next stderr line from process '$description'"); 256 "reading the next stderr line from process '$description'");
257 257
258 /// Reads the remaining stdout from the process. This should only be called 258 /// Reads the remaining stdout from the process. This should only be called
259 /// after kill() or shouldExit(). 259 /// after kill() or shouldExit().
260 Future<String> remainingStdout() { 260 Future<String> remainingStdout() {
261 if (!_endScheduled) { 261 if (!_endScheduled) {
262 throw new StateError("remainingStdout() should only be called after " 262 throw new StateError("remainingStdout() should only be called after "
263 "kill() or shouldExit()."); 263 "kill() or shouldExit().");
264 } 264 }
265 265 return schedule(() => concatRest(_stdout),
266 return schedule(() => _stdout.toList().then((lines) => lines.join("\n")),
267 "reading the remaining stdout from process '$description'"); 266 "reading the remaining stdout from process '$description'");
268 } 267 }
269 268
270 /// Reads the remaining stderr from the process. This should only be called 269 /// Reads the remaining stderr from the process. This should only be called
271 /// after kill() or shouldExit(). 270 /// after kill() or shouldExit().
272 Future<String> remainingStderr() { 271 Future<String> remainingStderr() {
273 if (!_endScheduled) { 272 if (!_endScheduled) {
274 throw new StateError("remainingStderr() should only be called after " 273 throw new StateError("remainingStderr() should only be called after "
275 "kill() or shouldExit()."); 274 "kill() or shouldExit().");
276 } 275 }
277 276
278 return schedule(() => _stderr.toList().then((lines) => lines.join("\n")), 277 return schedule(() => concatRest(_stderr),
279 "reading the remaining stderr from process '$description'"); 278 "reading the remaining stderr from process '$description'");
280 } 279 }
281 280
282 /// Returns a stream that will emit anything the process emits via the 281 /// Returns a stream that will emit anything the process emits via the
283 /// process's standard output from now on. 282 /// process's standard output from now on.
284 /// 283 ///
285 /// This stream will be independent from any other methods that deal with 284 /// This stream will be independent from any other methods that deal with
286 /// standard output, including other calls to [stdoutStream]. 285 /// standard output, including other calls to [stdoutStream].
287 /// 286 ///
288 /// This can be overridden by subclasses to return a derived standard output 287 /// This can be overridden by subclasses to return a derived standard output
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
344 schedule(() { 343 schedule(() {
345 _endExpected = true; 344 _endExpected = true;
346 return _exitCode.then((exitCode) { 345 return _exitCode.then((exitCode) {
347 if (expectedExitCode != null) { 346 if (expectedExitCode != null) {
348 expect(exitCode, equals(expectedExitCode)); 347 expect(exitCode, equals(expectedExitCode));
349 } 348 }
350 }); 349 });
351 }, "waiting for process '$description' to exit"); 350 }, "waiting for process '$description' to exit");
352 } 351 }
353 } 352 }
OLDNEW
« no previous file with comments | « no previous file | pkg/scheduled_test/lib/src/mock_clock.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698