| OLD | NEW |
| 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 """ | 5 """ |
| 6 Utility classes used by server_job.distribute_across_machines(). | 6 Utility classes used by server_job.distribute_across_machines(). |
| 7 | 7 |
| 8 test_item: extends the basic test tuple to add include/exclude attributes. | 8 test_item: extends the basic test tuple to add include/exclude attributes. |
| 9 | 9 |
| 10 machine_worker: is a thread that manages running tests on a host. It | 10 machine_worker: is a thread that manages running tests on a host. It |
| (...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 73 if self.inc_set is not None: | 73 if self.inc_set is not None: |
| 74 if not self.inc_set <= machine_attributes: return False | 74 if not self.inc_set <= machine_attributes: return False |
| 75 if self.exc_set is not None: | 75 if self.exc_set is not None: |
| 76 if self.exc_set & machine_attributes: return False | 76 if self.exc_set & machine_attributes: return False |
| 77 return True | 77 return True |
| 78 | 78 |
| 79 | 79 |
| 80 class machine_worker(threading.Thread): | 80 class machine_worker(threading.Thread): |
| 81 """Thread that runs tests on a remote host machine.""" | 81 """Thread that runs tests on a remote host machine.""" |
| 82 | 82 |
| 83 def __init__(self, server_job, machine, work_dir, test_queue, queue_lock): | 83 def __init__(self, server_job, machine, work_dir, test_queue, queue_lock, |
| 84 continuous_parsing=False): |
| 84 """Creates an instance of machine_worker to run tests on a remote host. | 85 """Creates an instance of machine_worker to run tests on a remote host. |
| 85 | 86 |
| 86 Retrieves that host attributes for this machine and creates the set of | 87 Retrieves that host attributes for this machine and creates the set of |
| 87 True attributes to validate against test include/exclude attributes. | 88 True attributes to validate against test include/exclude attributes. |
| 88 | 89 |
| 89 Creates a directory to hold the log files for tests run and writes the | 90 Creates a directory to hold the log files for tests run and writes the |
| 90 hostname and tko parser version into keyvals file. | 91 hostname and tko parser version into keyvals file. |
| 91 | 92 |
| 92 Args: | 93 Args: |
| 93 server_job: run tests for this server_job. | 94 server_job: run tests for this server_job. |
| 94 machine: name of remote host. | 95 machine: name of remote host. |
| 95 work_dir: directory server job is using. | 96 work_dir: directory server job is using. |
| 96 test_queue: queue of tests. | 97 test_queue: queue of tests. |
| 97 queue_lock: lock protecting test_queue. | 98 queue_lock: lock protecting test_queue. |
| 99 continuous_parsing: bool, enable continuous parsing. |
| 98 """ | 100 """ |
| 99 threading.Thread.__init__(self) | 101 threading.Thread.__init__(self) |
| 100 self._server_job = server_job | 102 self._server_job = server_job |
| 101 self._test_queue = test_queue | 103 self._test_queue = test_queue |
| 102 self._test_queue_lock = queue_lock | 104 self._test_queue_lock = queue_lock |
| 105 self._continuous_parsing = continuous_parsing |
| 103 self._tests_run = 0 | 106 self._tests_run = 0 |
| 104 self._machine = machine | 107 self._machine = machine |
| 105 self._host = hosts.create_host(self._machine) | 108 self._host = hosts.create_host(self._machine) |
| 106 self._client_at = autotest.Autotest(self._host) | 109 self._client_at = autotest.Autotest(self._host) |
| 107 client_attributes = host_attributes.host_attributes(machine) | 110 client_attributes = host_attributes.host_attributes(machine) |
| 108 self.attribute_set = set([key for key, value in | 111 self.attribute_set = set([key for key, value in |
| 109 client_attributes.__dict__.items() if value]) | 112 client_attributes.__dict__.items() if value]) |
| 110 self._results_dir = os.path.join(work_dir, self._machine) | 113 self._results_dir = os.path.join(work_dir, self._machine) |
| 111 if not os.path.exists(self._results_dir): | 114 if not os.path.exists(self._results_dir): |
| 112 os.makedirs(self._results_dir) | 115 os.makedirs(self._results_dir) |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 146 | 149 |
| 147 except Queue.Empty: | 150 except Queue.Empty: |
| 148 break | 151 break |
| 149 | 152 |
| 150 # Return any skipped tests to the queue. | 153 # Return any skipped tests to the queue. |
| 151 for st in skipped_tests: | 154 for st in skipped_tests: |
| 152 self._test_queue.put(st) | 155 self._test_queue.put(st) |
| 153 | 156 |
| 154 return good_test | 157 return good_test |
| 155 | 158 |
| 156 def run_subcommand(self, active_test): | 159 def run(self): |
| 157 """Use subcommand to fork process and execute test.""" | 160 """Use subcommand to fork process and execute tests. |
| 158 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, | 161 |
| 159 [active_test], | 162 The forked processes prevents log files from simultaneous tests |
| 163 interweaving with each other. Logging doesn't communicate host autotest |
| 164 to client autotest, it communicates host module to client autotest. So |
| 165 different server side autotest instances share the same module and |
| 166 require split processes to have clean logging. |
| 167 """ |
| 168 sub_cmd = subcommand.subcommand(self._run, |
| 169 [], |
| 160 self._results_dir) | 170 self._results_dir) |
| 161 sub_cmd.fork_start() | 171 sub_cmd.fork_start() |
| 162 sub_cmd.fork_waitfor() | 172 sub_cmd.fork_waitfor() |
| 163 | 173 |
| 164 def subcommand_wrapper(self, active_test): | 174 def _run(self): |
| 165 """Callback for subcommand to call into with the test parameter. | 175 """Executes tests on the host machine. |
| 166 | 176 |
| 167 When this function executes it has forked from the main process so it | 177 If continuous parsing was requested, start the parser before running |
| 168 is safe to modify state on the server_job object. These changes enable | 178 tests. |
| 169 continuous parsing which communicates results back to the database | |
| 170 while the server_job is running instead of only when the server_job is | |
| 171 complete. | |
| 172 """ | 179 """ |
| 173 self._server_job._parse_job += "/" + self._machine | 180 if self._continuous_parsing: |
| 174 self._server_job._using_parser = True | 181 self._server_job._parse_job += "/" + self._machine |
| 175 self._server_job.machines = [self._machine] | 182 self._server_job._using_parser = True |
| 176 self._server_job.push_execution_context(self._machine) | 183 self._server_job.machines = [self._machine] |
| 177 self._server_job.init_parser() | 184 self._server_job.push_execution_context(self._machine) |
| 178 self._client_at.run_test(active_test.test_name, | 185 self._server_job.init_parser() |
| 179 results_dir=self._results_dir, | |
| 180 **active_test.test_args) | |
| 181 self._server_job.cleanup_parser() | |
| 182 | 186 |
| 183 def run(self): | |
| 184 """Executes tests on host machine. | |
| 185 | |
| 186 Uses subprocess to fork the process when running tests so unique client | |
| 187 jobs talk to unique server jobs which prevents log files from | |
| 188 simultaneous tests interweaving with each other. | |
| 189 """ | |
| 190 while True: | 187 while True: |
| 191 active_test = self.get_test() | 188 active_test = self.get_test() |
| 192 if active_test is None: | 189 if active_test is None: |
| 193 break | 190 break |
| 194 | 191 |
| 195 # Install autoest on host before running tests. Do this before | |
| 196 # the subcommand fork so all future forks see that it has been | |
| 197 # installed on the host. | |
| 198 if not self._client_at.installed: | |
| 199 self._client_at.install() | |
| 200 | |
| 201 logging.info('%s running %s', self._machine, active_test) | 192 logging.info('%s running %s', self._machine, active_test) |
| 202 try: | 193 try: |
| 203 self.run_subcommand(active_test) | 194 self._client_at.run_test(active_test.test_name, |
| 195 results_dir=self._results_dir, |
| 196 **active_test.test_args) |
| 204 except (error.AutoservError, error.AutotestError): | 197 except (error.AutoservError, error.AutotestError): |
| 205 logging.exception('Error running test "%s".', active_test) | 198 logging.exception('Error running test "%s".', active_test) |
| 206 except Exception: | 199 except Exception: |
| 207 logging.exception('Exception running test "%s".', active_test) | 200 logging.exception('Exception running test "%s".', active_test) |
| 208 raise | 201 raise |
| 209 finally: | 202 finally: |
| 210 self._test_queue.task_done() | 203 self._test_queue.task_done() |
| 211 self._tests_run += 1 | 204 self._tests_run += 1 |
| 212 | 205 |
| 206 if self._continuous_parsing: |
| 207 self._server_job.cleanup_parser() |
| 213 logging.info('%s completed %d tests.', self._machine, self._tests_run) | 208 logging.info('%s completed %d tests.', self._machine, self._tests_run) |
| OLD | NEW |