Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: server/server_job_utils.py

Issue 6902033: Reduce load on autotest by cutting number of forks and parsers. (Closed) Base URL: ssh://gitrw.chromium.org:9222/autotest.git@master
Patch Set: Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « server/server_job.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 1 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """ 5 """
6 Utility classes used by server_job.distribute_across_machines(). 6 Utility classes used by server_job.distribute_across_machines().
7 7
8 test_item: extends the basic test tuple to add include/exclude attributes. 8 test_item: extends the basic test tuple to add include/exclude attributes.
9 9
10 machine_worker: is a thread that manages running tests on a host. It 10 machine_worker: is a thread that manages running tests on a host. It
(...skipping 62 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 if self.inc_set is not None: 73 if self.inc_set is not None:
74 if not self.inc_set <= machine_attributes: return False 74 if not self.inc_set <= machine_attributes: return False
75 if self.exc_set is not None: 75 if self.exc_set is not None:
76 if self.exc_set & machine_attributes: return False 76 if self.exc_set & machine_attributes: return False
77 return True 77 return True
78 78
79 79
80 class machine_worker(threading.Thread): 80 class machine_worker(threading.Thread):
81 """Thread that runs tests on a remote host machine.""" 81 """Thread that runs tests on a remote host machine."""
82 82
83 def __init__(self, server_job, machine, work_dir, test_queue, queue_lock): 83 def __init__(self, server_job, machine, work_dir, test_queue, queue_lock,
84 continuous_parsing=False):
84 """Creates an instance of machine_worker to run tests on a remote host. 85 """Creates an instance of machine_worker to run tests on a remote host.
85 86
86 Retrieves that host attributes for this machine and creates the set of 87 Retrieves that host attributes for this machine and creates the set of
87 True attributes to validate against test include/exclude attributes. 88 True attributes to validate against test include/exclude attributes.
88 89
89 Creates a directory to hold the log files for tests run and writes the 90 Creates a directory to hold the log files for tests run and writes the
90 hostname and tko parser version into keyvals file. 91 hostname and tko parser version into keyvals file.
91 92
92 Args: 93 Args:
93 server_job: run tests for this server_job. 94 server_job: run tests for this server_job.
94 machine: name of remote host. 95 machine: name of remote host.
95 work_dir: directory server job is using. 96 work_dir: directory server job is using.
96 test_queue: queue of tests. 97 test_queue: queue of tests.
97 queue_lock: lock protecting test_queue. 98 queue_lock: lock protecting test_queue.
99 continuous_parsing: bool, enable continuous parsing.
98 """ 100 """
99 threading.Thread.__init__(self) 101 threading.Thread.__init__(self)
100 self._server_job = server_job 102 self._server_job = server_job
101 self._test_queue = test_queue 103 self._test_queue = test_queue
102 self._test_queue_lock = queue_lock 104 self._test_queue_lock = queue_lock
105 self._continuous_parsing = continuous_parsing
103 self._tests_run = 0 106 self._tests_run = 0
104 self._machine = machine 107 self._machine = machine
105 self._host = hosts.create_host(self._machine) 108 self._host = hosts.create_host(self._machine)
106 self._client_at = autotest.Autotest(self._host) 109 self._client_at = autotest.Autotest(self._host)
107 client_attributes = host_attributes.host_attributes(machine) 110 client_attributes = host_attributes.host_attributes(machine)
108 self.attribute_set = set([key for key, value in 111 self.attribute_set = set([key for key, value in
109 client_attributes.__dict__.items() if value]) 112 client_attributes.__dict__.items() if value])
110 self._results_dir = os.path.join(work_dir, self._machine) 113 self._results_dir = os.path.join(work_dir, self._machine)
111 if not os.path.exists(self._results_dir): 114 if not os.path.exists(self._results_dir):
112 os.makedirs(self._results_dir) 115 os.makedirs(self._results_dir)
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
146 149
147 except Queue.Empty: 150 except Queue.Empty:
148 break 151 break
149 152
150 # Return any skipped tests to the queue. 153 # Return any skipped tests to the queue.
151 for st in skipped_tests: 154 for st in skipped_tests:
152 self._test_queue.put(st) 155 self._test_queue.put(st)
153 156
154 return good_test 157 return good_test
155 158
156 def run_subcommand(self, active_test): 159 def run(self):
157 """Use subcommand to fork process and execute test.""" 160 """Use subcommand to fork process and execute tests.
158 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, 161
159 [active_test], 162 The forked processes prevents log files from simultaneous tests
163 interweaving with each other. Logging doesn't communicate host autotest
164 to client autotest, it communicates host module to client autotest. So
165 different server side autotest instances share the same module and
166 require split processes to have clean logging.
167 """
168 sub_cmd = subcommand.subcommand(self._run,
169 [],
160 self._results_dir) 170 self._results_dir)
161 sub_cmd.fork_start() 171 sub_cmd.fork_start()
162 sub_cmd.fork_waitfor() 172 sub_cmd.fork_waitfor()
163 173
164 def subcommand_wrapper(self, active_test): 174 def _run(self):
165 """Callback for subcommand to call into with the test parameter. 175 """Executes tests on the host machine.
166 176
167 When this function executes it has forked from the main process so it 177 If continuous parsing was requested, start the parser before running
168 is safe to modify state on the server_job object. These changes enable 178 tests.
169 continuous parsing which communicates results back to the database
170 while the server_job is running instead of only when the server_job is
171 complete.
172 """ 179 """
173 self._server_job._parse_job += "/" + self._machine 180 if self._continuous_parsing:
174 self._server_job._using_parser = True 181 self._server_job._parse_job += "/" + self._machine
175 self._server_job.machines = [self._machine] 182 self._server_job._using_parser = True
176 self._server_job.push_execution_context(self._machine) 183 self._server_job.machines = [self._machine]
177 self._server_job.init_parser() 184 self._server_job.push_execution_context(self._machine)
178 self._client_at.run_test(active_test.test_name, 185 self._server_job.init_parser()
179 results_dir=self._results_dir,
180 **active_test.test_args)
181 self._server_job.cleanup_parser()
182 186
183 def run(self):
184 """Executes tests on host machine.
185
186 Uses subprocess to fork the process when running tests so unique client
187 jobs talk to unique server jobs which prevents log files from
188 simultaneous tests interweaving with each other.
189 """
190 while True: 187 while True:
191 active_test = self.get_test() 188 active_test = self.get_test()
192 if active_test is None: 189 if active_test is None:
193 break 190 break
194 191
195 # Install autoest on host before running tests. Do this before
196 # the subcommand fork so all future forks see that it has been
197 # installed on the host.
198 if not self._client_at.installed:
199 self._client_at.install()
200
201 logging.info('%s running %s', self._machine, active_test) 192 logging.info('%s running %s', self._machine, active_test)
202 try: 193 try:
203 self.run_subcommand(active_test) 194 self._client_at.run_test(active_test.test_name,
195 results_dir=self._results_dir,
196 **active_test.test_args)
204 except (error.AutoservError, error.AutotestError): 197 except (error.AutoservError, error.AutotestError):
205 logging.exception('Error running test "%s".', active_test) 198 logging.exception('Error running test "%s".', active_test)
206 except Exception: 199 except Exception:
207 logging.exception('Exception running test "%s".', active_test) 200 logging.exception('Exception running test "%s".', active_test)
208 raise 201 raise
209 finally: 202 finally:
210 self._test_queue.task_done() 203 self._test_queue.task_done()
211 self._tests_run += 1 204 self._tests_run += 1
212 205
206 if self._continuous_parsing:
207 self._server_job.cleanup_parser()
213 logging.info('%s completed %d tests.', self._machine, self._tests_run) 208 logging.info('%s completed %d tests.', self._machine, self._tests_run)
OLDNEW
« no previous file with comments | « server/server_job.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698