Chromium Code Reviews| Index: pkg/scheduled_test/lib/scheduled_process.dart |
| diff --git a/pkg/scheduled_test/lib/scheduled_process.dart b/pkg/scheduled_test/lib/scheduled_process.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..7e6b649256d7bbaa80f3fc8892d17e35c47f1fdc |
| --- /dev/null |
| +++ b/pkg/scheduled_test/lib/scheduled_process.dart |
| @@ -0,0 +1,278 @@ |
| +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library scheduled_process; |
| + |
| +import 'dart:async'; |
| +import 'dart:io'; |
| + |
| +import 'scheduled_test.dart'; |
| +import 'src/utils.dart'; |
| +import 'src/value_future.dart'; |
| + |
| +/// A class representing a [Process] that is scheduled to run in the course of |
| +/// the test. This class allows actions on the process to be scheduled |
| +/// synchronously. All operations on this class are scheduled. |
| +/// |
| +/// Before running the test, either [shouldExit] or [kill] must be called on |
| +/// this to ensure that the process terminates when expected. |
| +/// |
| +/// If the test fails, this will automatically print out any stdout and stderr |
| +/// from the process to aid debugging. |
| +class ScheduledProcess { |
| + // A description of the process. Used for error reporting. |
| + String get description => _description; |
| + String _description; |
| + |
| + /// The encoding used for the process's input and output streams. |
| + final Encoding _encoding; |
| + |
| + /// The process that's scheduled to run. |
| + ValueFuture<Process> _process; |
| + |
| + /// A fork of [_stdout] that records the standard output of the process. Used |
| + /// for debugging information. |
| + Stream<String> _stdoutLog; |
| + |
| + /// A line-by-line view of the standard output stream of the process. |
| + Stream<String> _stdout; |
| + |
| + /// A subscription that controls both [_stdout] and [_stdoutLog]. |
| + StreamSubscription<String> _stdoutSubscription; |
| + |
| + /// A fork of [_stderr] that records the standard error of the process. Used |
| + /// for debugging information. |
| + Stream<String> _stderrLog; |
| + |
| + /// A line-by-line view of the standard error stream of the process. |
| + Stream<String> _stderr; |
| + |
| + /// A subscription that controls both [_stderr] and [_stderrLog]. |
| + StreamSubscription<String> _stderrSubscription; |
| + |
| + /// The exit code of the process that's scheduled to run. This will naturally |
| + /// only complete once the process has terminated. |
| + ValueFuture<int> _exitCode; |
| + |
| + /// Whether the user has scheduled the end of this process by calling either |
| + /// [shouldExit] or [kill]. |
| + var _endScheduled = false; |
| + |
| + /// The task that runs immediately before this process is scheduled to end. If |
| + /// the process ends during this task, we treat that as expected. |
| + Task _taskBeforeEnd; |
| + |
| + /// Whether the process is expected to terminate at this point. |
| + var _endExpected = false; |
| + |
| + /// Schedules a process to start. [executable], [arguments], and [options] |
| + /// have the same meaning as for [Process.start]. [description] is a string |
| + /// description of this process; it defaults to the command-line invocation. |
| + /// [encoding] is the [Encoding] that will be used for the process's input and |
| + /// output. |
| + /// |
| + /// [executable], [arguments], and [options] may be either a [Future] or a |
| + /// concrete value. If any are [Future]s, the process won't start until the |
| + /// [Future]s have completed. In addition, [arguments] may be a [List] |
| + /// containing a mix of strings and [Future]s. |
| + ScheduledProcess.start(executable, arguments, |
| + {options, String description, Encoding encoding: Encoding.UTF_8}) |
|
Bob Nystrom
2013/03/04 23:52:00
This is quite a large method with a bunch of async
|
| + : _encoding = encoding { |
| + assert(currentSchedule.state == ScheduleState.SET_UP); |
| + |
| + if (executable is Future) { |
| + _description = "future process"; |
| + } else if (arguments is Future || arguments.any((e) => e is Future)) { |
| + _description = executable; |
| + } else { |
| + _description = "$executable ${arguments.map((a) => '"$a"').join(' ')}"; |
| + } |
|
Bob Nystrom
2013/03/04 23:52:00
How about pulling this into a separate _updateDesc
nweiz
2013/03/05 02:16:09
Done.
|
| + |
| + var exitCodeCompleter = new Completer(); |
| + _exitCode = new ValueFuture(exitCodeCompleter.future); |
| + |
| + _process = new ValueFuture(schedule(() { |
| + if (!_endScheduled) { |
| + throw new StateError("Scheduled process '${this.description}' must " |
| + "have shouldExit() or kill() called before the test is run."); |
| + } |
| + |
| + // We purposefully avoid using wrapFuture here. If an error occurs while a |
| + // process is running, we want the schedule to move to the onException |
| + // queue where the process will be killed, rather than blocking the tasks |
| + // queue waiting for the process to exit. |
| + _process.then((p) => p.exitCode).then((exitCode) { |
|
Bob Nystrom
2013/03/04 23:52:00
Asynchronously accessing the result of a future st
nweiz
2013/03/05 02:16:09
Done.
|
| + if (_endExpected) { |
| + exitCodeCompleter.complete(exitCode); |
| + return; |
| + } |
| + |
| + wrapFuture(pumpEventQueue().then((_) { |
| + if (currentSchedule.currentTask != _taskBeforeEnd) return; |
| + // If we're one task before the end was scheduled, wait for that task |
| + // to complete and pump the event queue so that _endExpected will be |
| + // set. |
| + return _taskBeforeEnd.result.then((_) => pumpEventQueue()); |
| + }).then((_) { |
| + exitCodeCompleter.complete(exitCode); |
| + |
| + if (!_endExpected) { |
| + throw "Process '${this.description}' ended earlier than scheduled " |
| + "with exit code $exitCode."; |
| + } |
| + })); |
| + }); |
| + |
| + return Future.wait([ |
| + new Future.of(() => executable), |
| + awaitObject(arguments), |
| + new Future.of(() => options) |
| + ]).then((results) { |
| + executable = results[0]; |
| + arguments = results[1]; |
| + options = results[2]; |
| + _description = "$executable ${arguments.map((a) => '"$a"').join(' ')}"; |
|
Bob Nystrom
2013/03/04 23:52:00
Call _updateDescription() here.
nweiz
2013/03/05 02:16:09
Done.
|
| + return Process.start(executable, arguments, options); |
| + }); |
| + }, "starting process '${this.description}'")); |
| + |
| + var stdoutWithSubscription = _lineStreamWithSubscription( |
| + _process.then((p) => p.stdout)); |
| + _stdoutSubscription = stdoutWithSubscription.last; |
| + var stdoutTee = tee(stdoutWithSubscription.first); |
| + _stdout = stdoutTee.first; |
| + _stdoutLog = stdoutTee.last; |
| + |
| + var stderrWithSubscription = _lineStreamWithSubscription( |
| + _process.then((p) => p.stderr)); |
| + _stderrSubscription = stderrWithSubscription.last; |
| + var stderrTee = tee(stderrWithSubscription.first); |
| + _stderr = stderrTee.first; |
| + _stderrLog = stderrTee.last; |
| + |
| + currentSchedule.onException.schedule(() { |
|
Bob Nystrom
2013/03/04 23:52:00
Move this to a separate fn.
nweiz
2013/03/05 02:16:09
Done.
|
| + _stdoutSubscription.cancel(); |
| + _stderrSubscription.cancel(); |
| + |
| + if (!_process.hasValue) return; |
| + |
| + var killedPrematurely = false; |
| + if (!_exitCode.hasValue) { |
| + var killedPrematurely = true; |
| + _endExpected = true; |
| + _process.value.kill(); |
| + // Ensure that the onException queue waits for the process to actually |
| + // exit after being killed. |
| + wrapFuture(_process.value.exitCode); |
| + } |
| + |
| + return Future.wait([ |
| + _stdoutLog.toList(), |
| + _stderrLog.toList() |
| + ]).then((results) { |
| + var stdout = results[0].join("\n"); |
| + var stderr = results[1].join("\n"); |
| + |
| + var exitDescription = killedPrematurely |
| + ? "Process was killed prematurely." |
| + : "Process exited with exit code ${_exitCode.value}."; |
| + currentSchedule.addDebugInfo( |
| + "Results of running '${this.description}':\n" |
| + "$exitDescription\n" |
| + "Standard output:\n" |
| + "${prefixLines(stdout)}\n" |
| + "Standard error:\n" |
| + "${prefixLines(stderr)}"); |
| + }); |
| + }, "cleaning up process '${this.description}'"); |
| + } |
| + |
| + /// Converts a stream of bytes to a stream of lines and returns that along |
| + /// with a [StreamSubscription] controlling it. |
| + Pair<Stream<String>, StreamSubscription<String>> _lineStreamWithSubscription( |
| + Future<Stream<int>> streamFuture) { |
| + return streamWithSubscription(futureStream(streamFuture) |
| + .handleError((e) => currentSchedule.signalError(e)) |
| + .transform(new StringDecoder(_encoding)) |
| + .transform(new LineTransformer())); |
| + } |
| + |
| + /// Reads the next line of stdout from the process. |
| + Future<String> nextLine() => schedule(() => streamFirst(_stdout), |
| + "reading the next stdout line from process '$description'"); |
| + |
| + /// Reads the next line of stderr from the process. |
| + Future<String> nextErrLine() => schedule(() => streamFirst(_stderr), |
| + "reading the next stderr line from process '$description'"); |
| + |
| + /// Reads the remaining stdout from the process. This should only be called |
| + /// after kill() or shouldExit(). |
| + Future<String> remainingStdout() { |
| + if (!_endScheduled) { |
| + throw new StateError("remainingStdout() should only be called after " |
| + "kill() or shouldExit()."); |
| + } |
| + |
| + return schedule(() => _stdout.toList().then((lines) => lines.join("\n")), |
| + "reading the remaining stdout from process '$description'"); |
| + } |
| + |
| + /// Reads the remaining stderr from the process. This should only be called |
| + /// after kill() or shouldExit(). |
| + Future<String> remainingStderr() { |
| + if (!_endScheduled) { |
| + throw new StateError("remainingStderr() should only be called after " |
| + "kill() or shouldExit()."); |
| + } |
| + |
| + return schedule(() => _stderr.toList().then((lines) => lines.join("\n")), |
| + "reading the remaining stderr from process '$description'"); |
| + } |
| + |
| + /// Writes [line] to the process as stdin. |
| + void writeLine(String line) { |
| + schedule(() { |
| + return _process.then((p) => p.stdin.addString('$line\n', _encoding)); |
| + }, "writing '$line' to stdin for process '$description'"); |
| + } |
| + |
| + /// Closes the process's stdin stream. |
| + void closeStdin() { |
| + schedule(() => _process.then((p) => p.stdin.close()), |
| + "closing stdin for process '$description'"); |
| + } |
| + |
| + /// Kills the process, and waits until it's dead. |
| + void kill() { |
| + if (_endScheduled) { |
| + throw new StateError("shouldExit() or kill() already called."); |
| + } |
| + |
| + _endScheduled = true; |
| + _taskBeforeEnd = currentSchedule.tasks.contents.last; |
| + schedule(() { |
| + _endExpected = true; |
| + return _process.then((p) => p.kill()).then((_) => _exitCode); |
| + }, "waiting for process '$description' to die"); |
| + } |
| + |
| + /// Waits for the process to exit, and verifies that the exit code matches |
| + /// [expectedExitCode] (if given). |
| + void shouldExit([int expectedExitCode]) { |
| + if (_endScheduled) { |
| + throw new StateError("shouldExit() or kill() already called."); |
| + } |
| + |
| + _endScheduled = true; |
| + _taskBeforeEnd = currentSchedule.tasks.contents.last; |
| + schedule(() { |
| + _endExpected = true; |
| + return _exitCode.then((exitCode) { |
| + if (expectedExitCode != null) { |
| + expect(exitCode, equals(expectedExitCode)); |
| + } |
| + }); |
| + }, "waiting for process '$description' to exit"); |
| + } |
| +} |