Index: server/server_job_utils.py |
diff --git a/server/server_job_utils.py b/server/server_job_utils.py |
index 4cca5061080626c1cf67b9452f52912cfa87a34a..1c5c38c4c4f30680d94d0e2804e724da490aaab2 100644 |
--- a/server/server_job_utils.py |
+++ b/server/server_job_utils.py |
@@ -80,7 +80,8 @@ class test_item(object): |
class machine_worker(threading.Thread): |
"""Thread that runs tests on a remote host machine.""" |
- def __init__(self, server_job, machine, work_dir, test_queue, queue_lock): |
+ def __init__(self, server_job, machine, work_dir, test_queue, queue_lock, |
+ continuous_parsing=False): |
"""Creates an instance of machine_worker to run tests on a remote host. |
Retrieves that host attributes for this machine and creates the set of |
@@ -95,11 +96,13 @@ class machine_worker(threading.Thread): |
work_dir: directory server job is using. |
test_queue: queue of tests. |
queue_lock: lock protecting test_queue. |
+ continuous_parsing: bool, enable continuous parsing. |
""" |
threading.Thread.__init__(self) |
self._server_job = server_job |
self._test_queue = test_queue |
self._test_queue_lock = queue_lock |
+ self._continuous_parsing = continuous_parsing |
self._tests_run = 0 |
self._machine = machine |
self._host = hosts.create_host(self._machine) |
@@ -153,54 +156,44 @@ class machine_worker(threading.Thread): |
return good_test |
- def run_subcommand(self, active_test): |
- """Use subcommand to fork process and execute test.""" |
- sub_cmd = subcommand.subcommand(self.subcommand_wrapper, |
- [active_test], |
+ def run(self): |
+ """Use subcommand to fork process and execute tests. |
+ |
+ The forked processes prevents log files from simultaneous tests |
+ interweaving with each other. Logging doesn't communicate host autotest |
+ to client autotest, it communicates host module to client autotest. So |
+ different server side autotest instances share the same module and |
+ require split processes to have clean logging. |
+ """ |
+ sub_cmd = subcommand.subcommand(self._run, |
+ [], |
self._results_dir) |
sub_cmd.fork_start() |
sub_cmd.fork_waitfor() |
- def subcommand_wrapper(self, active_test): |
- """Callback for subcommand to call into with the test parameter. |
+ def _run(self): |
+ """Executes tests on the host machine. |
- When this function executes it has forked from the main process so it |
- is safe to modify state on the server_job object. These changes enable |
- continuous parsing which communicates results back to the database |
- while the server_job is running instead of only when the server_job is |
- complete. |
+ If continuous parsing was requested, start the parser before running |
+ tests. |
""" |
- self._server_job._parse_job += "/" + self._machine |
- self._server_job._using_parser = True |
- self._server_job.machines = [self._machine] |
- self._server_job.push_execution_context(self._machine) |
- self._server_job.init_parser() |
- self._client_at.run_test(active_test.test_name, |
- results_dir=self._results_dir, |
- **active_test.test_args) |
- self._server_job.cleanup_parser() |
+ if self._continuous_parsing: |
+ self._server_job._parse_job += "/" + self._machine |
+ self._server_job._using_parser = True |
+ self._server_job.machines = [self._machine] |
+ self._server_job.push_execution_context(self._machine) |
+ self._server_job.init_parser() |
- def run(self): |
- """Executes tests on host machine. |
- |
- Uses subprocess to fork the process when running tests so unique client |
- jobs talk to unique server jobs which prevents log files from |
- simultaneous tests interweaving with each other. |
- """ |
while True: |
active_test = self.get_test() |
if active_test is None: |
break |
- # Install autoest on host before running tests. Do this before |
- # the subcommand fork so all future forks see that it has been |
- # installed on the host. |
- if not self._client_at.installed: |
- self._client_at.install() |
- |
logging.info('%s running %s', self._machine, active_test) |
try: |
- self.run_subcommand(active_test) |
+ self._client_at.run_test(active_test.test_name, |
+ results_dir=self._results_dir, |
+ **active_test.test_args) |
except (error.AutoservError, error.AutotestError): |
logging.exception('Error running test "%s".', active_test) |
except Exception: |
@@ -210,4 +203,6 @@ class machine_worker(threading.Thread): |
self._test_queue.task_done() |
self._tests_run += 1 |
+ if self._continuous_parsing: |
+ self._server_job.cleanup_parser() |
logging.info('%s completed %d tests.', self._machine, self._tests_run) |