| OLD | NEW |
| 1 """ | 1 """ |
| 2 The main job wrapper for the server side. | 2 The main job wrapper for the server side. |
| 3 | 3 |
| 4 This is the core infrastructure. Derived from the client side job.py | 4 This is the core infrastructure. Derived from the client side job.py |
| 5 | 5 |
| 6 Copyright Martin J. Bligh, Andy Whitcroft 2007 | 6 Copyright Martin J. Bligh, Andy Whitcroft 2007 |
| 7 """ | 7 """ |
| 8 | 8 |
| 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform | 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform |
| 10 import Queue, threading | 10 import Queue, threading |
| 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno | 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno |
| 12 from autotest_lib.client.bin import sysinfo | 12 from autotest_lib.client.bin import sysinfo |
| 13 from autotest_lib.client.common_lib import base_job | 13 from autotest_lib.client.common_lib import base_job |
| 14 from autotest_lib.client.common_lib import error, log, utils, packages | 14 from autotest_lib.client.common_lib import error, log, utils, packages |
| 15 from autotest_lib.client.common_lib import logging_manager | 15 from autotest_lib.client.common_lib import logging_manager |
| 16 from autotest_lib.server import autotest, hosts, site_host_attributes | 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils |
| 17 from autotest_lib.server import test, subcommand, profilers | |
| 18 from autotest_lib.server.hosts import abstract_ssh | 17 from autotest_lib.server.hosts import abstract_ssh |
| 19 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils | 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils |
| 20 | 19 |
| 21 | 20 |
| 22 def _control_segment_path(name): | 21 def _control_segment_path(name): |
| 23 """Get the pathname of the named control segment file.""" | 22 """Get the pathname of the named control segment file.""" |
| 24 server_dir = os.path.dirname(os.path.abspath(__file__)) | 23 server_dir = os.path.dirname(os.path.abspath(__file__)) |
| 25 return os.path.join(server_dir, "control_segments", name) | 24 return os.path.join(server_dir, "control_segments", name) |
| 26 | 25 |
| 27 | 26 |
| (...skipping 447 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 475 Args: | 474 Args: |
| 476 tests: List of tests to run. | 475 tests: List of tests to run. |
| 477 machines: list of machines to use. | 476 machines: list of machines to use. |
| 478 """ | 477 """ |
| 479 # The Queue is thread safe, but since a machine may have to search | 478 # The Queue is thread safe, but since a machine may have to search |
| 480 # through the queue to find a valid test the lock provides exclusive | 479 # through the queue to find a valid test the lock provides exclusive |
| 481 # queue access for more than just the get call. | 480 # queue access for more than just the get call. |
| 482 test_queue = Queue.Queue() | 481 test_queue = Queue.Queue() |
| 483 test_queue_lock = threading.Lock() | 482 test_queue_lock = threading.Lock() |
| 484 | 483 |
| 485 machine_workers = [machine_worker(machine, self.resultdir, test_queue, | 484 machine_workers = [server_job_utils.machine_worker(machine, |
| 486 test_queue_lock) | 485 self.resultdir, |
| 486 test_queue, |
| 487 test_queue_lock) |
| 487 for machine in machines] | 488 for machine in machines] |
| 488 | 489 |
| 489 # To (potentially) speed up searching for valid tests create a list of | 490 # To (potentially) speed up searching for valid tests create a list of |
| 490 # unique attribute sets present in the machines for this job. If sets | 491 # unique attribute sets present in the machines for this job. If sets |
| 491 # were hashable we could just use a dictionary for fast verification. | 492 # were hashable we could just use a dictionary for fast verification. |
| 492 # This at least reduces the search space from the number of machines to | 493 # This at least reduces the search space from the number of machines to |
| 493 # the number of unique machines. | 494 # the number of unique machines. |
| 494 unique_machine_attributes = [] | 495 unique_machine_attributes = [] |
| 495 for mw in machine_workers: | 496 for mw in machine_workers: |
| 496 if not mw.attribute_set in unique_machine_attributes: | 497 if not mw.attribute_set in unique_machine_attributes: |
| 497 unique_machine_attributes.append(mw.attribute_set) | 498 unique_machine_attributes.append(mw.attribute_set) |
| 498 | 499 |
| 499 # Only queue tests which are valid on at least one machine. Record | 500 # Only queue tests which are valid on at least one machine. Record |
| 500 # skipped tests in the status.log file using record_skipped_test(). | 501 # skipped tests in the status.log file using record_skipped_test(). |
| 501 for test_entry in tests: | 502 for test_entry in tests: |
| 502 ti = test_item(*test_entry) | 503 ti = server_job_utils.test_item(*test_entry) |
| 503 machine_found = False | 504 machine_found = False |
| 504 for ma in unique_machine_attributes: | 505 for ma in unique_machine_attributes: |
| 505 if ti.validate(ma): | 506 if ti.validate(ma): |
| 506 test_queue.put(ti) | 507 test_queue.put(ti) |
| 507 machine_found = True | 508 machine_found = True |
| 508 break | 509 break |
| 509 if not machine_found: | 510 if not machine_found: |
| 510 self.record_skipped_test(ti) | 511 self.record_skipped_test(ti) |
| 511 | 512 |
| 512 # Run valid tests and wait for completion. | 513 # Run valid tests and wait for completion. |
| (...skipping 692 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1205 intervals = self.disabled_warnings.setdefault(warning_type, []) | 1206 intervals = self.disabled_warnings.setdefault(warning_type, []) |
| 1206 if not intervals or intervals[-1][1] is not None: | 1207 if not intervals or intervals[-1][1] is not None: |
| 1207 intervals.append((int(current_time_func()), None)) | 1208 intervals.append((int(current_time_func()), None)) |
| 1208 | 1209 |
| 1209 | 1210 |
| 1210 def enable_warnings(self, warning_type, current_time_func=time.time): | 1211 def enable_warnings(self, warning_type, current_time_func=time.time): |
| 1211 """As of now, enables all further warnings of this type.""" | 1212 """As of now, enables all further warnings of this type.""" |
| 1212 intervals = self.disabled_warnings.get(warning_type, []) | 1213 intervals = self.disabled_warnings.get(warning_type, []) |
| 1213 if intervals and intervals[-1][1] is None: | 1214 if intervals and intervals[-1][1] is None: |
| 1214 intervals[-1] = (intervals[-1][0], int(current_time_func())) | 1215 intervals[-1] = (intervals[-1][0], int(current_time_func())) |
| 1215 | |
| 1216 | |
| 1217 class test_item(object): | |
| 1218 """Adds machine verification logic to the basic test tuple. | |
| 1219 | |
| 1220 Tests can either be tuples of the existing form ('testName', {args}) or the | |
| 1221 extended for of ('testname', {args}, ['include'], ['exclude']) where include | |
| 1222 and exclude are lists of attribues. A machine must have all the attributes | |
| 1223 in include and must not have any of the attributes in exclude to be valid | |
| 1224 for the test. | |
| 1225 """ | |
| 1226 | |
| 1227 def __init__(self, test_name, test_args, include_attribs=None, | |
| 1228 exclude_attribs=None): | |
| 1229 """Creates an instance of test_item. | |
| 1230 | |
| 1231 Args: | |
| 1232 test_name: string, name of test to execute. | |
| 1233 test_args: dictionary, arguments to pass into test. | |
| 1234 include_attribs: attributes a machine must have to run test. | |
| 1235 exclude_attribs: attributes preventing a machine from running test. | |
| 1236 """ | |
| 1237 self.test_name = test_name | |
| 1238 self.test_args = test_args | |
| 1239 self.inc_set = None | |
| 1240 if include_attribs is not None: | |
| 1241 self.inc_set = set(include_attribs) | |
| 1242 self.exc_set = None | |
| 1243 if exclude_attribs is not None: | |
| 1244 self.exc_set = set(exclude_attribs) | |
| 1245 | |
| 1246 def __str__(self): | |
| 1247 """Return an info string of this test.""" | |
| 1248 params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] | |
| 1249 msg = '%s(%s)' % (self.test_name, params) | |
| 1250 if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set] | |
| 1251 if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set] | |
| 1252 return msg | |
| 1253 | |
| 1254 def validate(self, machine_attributes): | |
| 1255 """Check if this test can run on machine with machine_attributes. | |
| 1256 | |
| 1257 If the test has include attributes, a candidate machine must have all | |
| 1258 the attributes to be valid. | |
| 1259 | |
| 1260 If the test has exclude attributes, a candidate machine cannot have any | |
| 1261 of the attributes to be valid. | |
| 1262 | |
| 1263 Args: | |
| 1264 machine_attributes: set, True attributes of candidate machine. | |
| 1265 | |
| 1266 Returns: | |
| 1267 True/False if the machine is valid for this test. | |
| 1268 """ | |
| 1269 if self.inc_set is not None: | |
| 1270 if not self.inc_set <= machine_attributes: return False | |
| 1271 if self.exc_set is not None: | |
| 1272 if self.exc_set & machine_attributes: return False | |
| 1273 return True | |
| 1274 | |
| 1275 | |
| 1276 class machine_worker(threading.Thread): | |
| 1277 """Thread that runs tests on a remote host machine.""" | |
| 1278 | |
| 1279 def __init__(self, machine, work_dir, test_queue, queue_lock): | |
| 1280 """Creates an instance of machine_worker to run tests on a remote host. | |
| 1281 | |
| 1282 Retrieves that host attributes for this machine and creates the set of | |
| 1283 True attributes to validate against test include/exclude attributes. | |
| 1284 | |
| 1285 Creates a directory to hold the log files for tests run and writes the | |
| 1286 hostname and tko parser version into keyvals file. | |
| 1287 | |
| 1288 Args: | |
| 1289 machine: name of remote host. | |
| 1290 work_dir: directory server job is using. | |
| 1291 test_queue: queue of tests. | |
| 1292 queue_lock: lock protecting test_queue. | |
| 1293 """ | |
| 1294 threading.Thread.__init__(self) | |
| 1295 self._test_queue = test_queue | |
| 1296 self._test_queue_lock = queue_lock | |
| 1297 self._tests_run = 0 | |
| 1298 self._machine = machine | |
| 1299 self._host = hosts.create_host(self._machine) | |
| 1300 self._client_at = autotest.Autotest(self._host) | |
| 1301 client_attributes = site_host_attributes.HostAttributes(machine) | |
| 1302 self.attribute_set = set([key for key, value in | |
| 1303 client_attributes.__dict__.items() if value]) | |
| 1304 self._results_dir = os.path.join(work_dir, self._machine) | |
| 1305 if not os.path.exists(self._results_dir): | |
| 1306 os.makedirs(self._results_dir) | |
| 1307 machine_data = {'hostname': self._machine, | |
| 1308 'status_version': str(1)} | |
| 1309 utils.write_keyval(self._results_dir, machine_data) | |
| 1310 | |
| 1311 def __str__(self): | |
| 1312 attributes = [a for a in self.attribute_set] | |
| 1313 return '%s attributes=%s' % (self._machine, attributes) | |
| 1314 | |
| 1315 def get_test(self): | |
| 1316 """Return a test from the queue to run on this host. | |
| 1317 | |
| 1318 The test queue can be non-empty, but still not contain a test that is | |
| 1319 valid for this machine. This function will take exclusive access to | |
| 1320 the queue via _test_queue_lock and repeatedly pop tests off the queue | |
| 1321 until finding a valid test or depleting the queue. In either case if | |
| 1322 invalid tests have been popped from the queue, they are pushed back | |
| 1323 onto the queue before returning. | |
| 1324 | |
| 1325 Returns: | |
| 1326 test_item, or None if no more tests exist for this machine. | |
| 1327 """ | |
| 1328 good_test = None | |
| 1329 skipped_tests = [] | |
| 1330 | |
| 1331 with self._test_queue_lock: | |
| 1332 while True: | |
| 1333 try: | |
| 1334 canidate_test = self._test_queue.get_nowait() | |
| 1335 # Check if test is valid for this machine. | |
| 1336 if canidate_test.validate(self.attribute_set): | |
| 1337 good_test = canidate_test | |
| 1338 break | |
| 1339 skipped_tests.append(canidate_test) | |
| 1340 | |
| 1341 except Queue.Empty: | |
| 1342 break | |
| 1343 | |
| 1344 # Return any skipped tests to the queue. | |
| 1345 for st in skipped_tests: | |
| 1346 self._test_queue.put(st) | |
| 1347 | |
| 1348 return good_test | |
| 1349 | |
| 1350 def run_subcommand(self, active_test): | |
| 1351 """Use subcommand to fork process and execute test.""" | |
| 1352 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test]) | |
| 1353 sub_cmd.fork_start() | |
| 1354 sub_cmd.fork_waitfor() | |
| 1355 | |
| 1356 def subcommand_wrapper(self, active_test): | |
| 1357 """Callback for subcommand to call into with the test parameter.""" | |
| 1358 self._client_at.run_test(active_test.test_name, | |
| 1359 results_dir=self._results_dir, | |
| 1360 **active_test.test_args) | |
| 1361 | |
| 1362 def run(self): | |
| 1363 """Executes tests on host machine. | |
| 1364 | |
| 1365 Uses subprocess to fork the process when running tests so unique client | |
| 1366 jobs talk to unique server jobs which prevents log files from | |
| 1367 simlutaneous tests interweaving with each other. | |
| 1368 """ | |
| 1369 while True: | |
| 1370 active_test = self.get_test() | |
| 1371 if active_test is None: | |
| 1372 break | |
| 1373 | |
| 1374 logging.info('%s running %s', self._machine, active_test) | |
| 1375 try: | |
| 1376 self.run_subcommand(active_test) | |
| 1377 except (error.AutoservError, error.AutotestError): | |
| 1378 logging.exception('Error running test "%s".', active_test) | |
| 1379 except Exception: | |
| 1380 logging.exception('Exception running test "%s".', active_test) | |
| 1381 raise | |
| 1382 finally: | |
| 1383 self._test_queue.task_done() | |
| 1384 self._tests_run += 1 | |
| 1385 | |
| 1386 logging.info('%s completed %d tests.', self._machine, self._tests_run) | |
| OLD | NEW |