OLD | NEW |
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. | 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """ | 5 """ |
6 Utility classes used by server_job.distribute_across_machines(). | 6 Utility classes used by server_job.distribute_across_machines(). |
7 | 7 |
8 test_item: extends the basic test tuple to add include/exclude attributes. | 8 test_item: extends the basic test tuple to add include/exclude attributes. |
9 | 9 |
10 machine_worker: is a thread that manages running tests on a host. It | 10 machine_worker: is a thread that manages running tests on a host. It |
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
73 if self.inc_set is not None: | 73 if self.inc_set is not None: |
74 if not self.inc_set <= machine_attributes: return False | 74 if not self.inc_set <= machine_attributes: return False |
75 if self.exc_set is not None: | 75 if self.exc_set is not None: |
76 if self.exc_set & machine_attributes: return False | 76 if self.exc_set & machine_attributes: return False |
77 return True | 77 return True |
78 | 78 |
79 | 79 |
80 class machine_worker(threading.Thread): | 80 class machine_worker(threading.Thread): |
81 """Thread that runs tests on a remote host machine.""" | 81 """Thread that runs tests on a remote host machine.""" |
82 | 82 |
83 def __init__(self, machine, work_dir, test_queue, queue_lock): | 83 def __init__(self, server_job, machine, work_dir, test_queue, queue_lock): |
84 """Creates an instance of machine_worker to run tests on a remote host. | 84 """Creates an instance of machine_worker to run tests on a remote host. |
85 | 85 |
86 Retrieves that host attributes for this machine and creates the set of | 86 Retrieves that host attributes for this machine and creates the set of |
87 True attributes to validate against test include/exclude attributes. | 87 True attributes to validate against test include/exclude attributes. |
88 | 88 |
89 Creates a directory to hold the log files for tests run and writes the | 89 Creates a directory to hold the log files for tests run and writes the |
90 hostname and tko parser version into keyvals file. | 90 hostname and tko parser version into keyvals file. |
91 | 91 |
92 Args: | 92 Args: |
| 93 server_job: run tests for this server_job. |
93 machine: name of remote host. | 94 machine: name of remote host. |
94 work_dir: directory server job is using. | 95 work_dir: directory server job is using. |
95 test_queue: queue of tests. | 96 test_queue: queue of tests. |
96 queue_lock: lock protecting test_queue. | 97 queue_lock: lock protecting test_queue. |
97 """ | 98 """ |
98 threading.Thread.__init__(self) | 99 threading.Thread.__init__(self) |
| 100 self._server_job = server_job |
99 self._test_queue = test_queue | 101 self._test_queue = test_queue |
100 self._test_queue_lock = queue_lock | 102 self._test_queue_lock = queue_lock |
101 self._tests_run = 0 | 103 self._tests_run = 0 |
102 self._machine = machine | 104 self._machine = machine |
103 self._host = hosts.create_host(self._machine) | 105 self._host = hosts.create_host(self._machine) |
104 self._client_at = autotest.Autotest(self._host) | 106 self._client_at = autotest.Autotest(self._host) |
105 client_attributes = host_attributes.host_attributes(machine) | 107 client_attributes = host_attributes.host_attributes(machine) |
106 self.attribute_set = set([key for key, value in | 108 self.attribute_set = set([key for key, value in |
107 client_attributes.__dict__.items() if value]) | 109 client_attributes.__dict__.items() if value]) |
108 self._results_dir = os.path.join(work_dir, self._machine) | 110 self._results_dir = os.path.join(work_dir, self._machine) |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
146 break | 148 break |
147 | 149 |
148 # Return any skipped tests to the queue. | 150 # Return any skipped tests to the queue. |
149 for st in skipped_tests: | 151 for st in skipped_tests: |
150 self._test_queue.put(st) | 152 self._test_queue.put(st) |
151 | 153 |
152 return good_test | 154 return good_test |
153 | 155 |
154 def run_subcommand(self, active_test): | 156 def run_subcommand(self, active_test): |
155 """Use subcommand to fork process and execute test.""" | 157 """Use subcommand to fork process and execute test.""" |
156 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test]) | 158 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, |
| 159 [active_test], |
| 160 self._results_dir) |
157 sub_cmd.fork_start() | 161 sub_cmd.fork_start() |
158 sub_cmd.fork_waitfor() | 162 sub_cmd.fork_waitfor() |
159 | 163 |
160 def subcommand_wrapper(self, active_test): | 164 def subcommand_wrapper(self, active_test): |
161 """Callback for subcommand to call into with the test parameter.""" | 165 """Callback for subcommand to call into with the test parameter. |
| 166 |
| 167 When this function executes it has forked from the main process so it |
| 168 is safe to modify state on the server_job object. These changes enable |
| 169 continuous parsing which communicates results back to the database |
| 170 while the server_job is running instead of only when the server_job is |
| 171 complete. |
| 172 """ |
| 173 self._server_job._parse_job += "/" + self._machine |
| 174 self._server_job._using_parser = True |
| 175 self._server_job.machines = [self._machine] |
| 176 self._server_job.push_execution_context(self._machine) |
| 177 self._server_job.init_parser() |
162 self._client_at.run_test(active_test.test_name, | 178 self._client_at.run_test(active_test.test_name, |
163 results_dir=self._results_dir, | 179 results_dir=self._results_dir, |
164 **active_test.test_args) | 180 **active_test.test_args) |
| 181 self._server_job.cleanup_parser() |
165 | 182 |
166 def run(self): | 183 def run(self): |
167 """Executes tests on host machine. | 184 """Executes tests on host machine. |
168 | 185 |
169 Uses subprocess to fork the process when running tests so unique client | 186 Uses subprocess to fork the process when running tests so unique client |
170 jobs talk to unique server jobs which prevents log files from | 187 jobs talk to unique server jobs which prevents log files from |
171 simultaneous tests interweaving with each other. | 188 simultaneous tests interweaving with each other. |
172 """ | 189 """ |
173 while True: | 190 while True: |
174 active_test = self.get_test() | 191 active_test = self.get_test() |
175 if active_test is None: | 192 if active_test is None: |
176 break | 193 break |
177 | 194 |
178 logging.info('%s running %s', self._machine, active_test) | 195 logging.info('%s running %s', self._machine, active_test) |
179 try: | 196 try: |
180 self.run_subcommand(active_test) | 197 self.run_subcommand(active_test) |
181 except (error.AutoservError, error.AutotestError): | 198 except (error.AutoservError, error.AutotestError): |
182 logging.exception('Error running test "%s".', active_test) | 199 logging.exception('Error running test "%s".', active_test) |
183 except Exception: | 200 except Exception: |
184 logging.exception('Exception running test "%s".', active_test) | 201 logging.exception('Exception running test "%s".', active_test) |
185 raise | 202 raise |
186 finally: | 203 finally: |
187 self._test_queue.task_done() | 204 self._test_queue.task_done() |
188 self._tests_run += 1 | 205 self._tests_run += 1 |
189 | 206 |
190 logging.info('%s completed %d tests.', self._machine, self._tests_run) | 207 logging.info('%s completed %d tests.', self._machine, self._tests_run) |
OLD | NEW |