OLD | NEW |
1 """ | 1 """ |
2 The main job wrapper for the server side. | 2 The main job wrapper for the server side. |
3 | 3 |
4 This is the core infrastructure. Derived from the client side job.py | 4 This is the core infrastructure. Derived from the client side job.py |
5 | 5 |
6 Copyright Martin J. Bligh, Andy Whitcroft 2007 | 6 Copyright Martin J. Bligh, Andy Whitcroft 2007 |
7 """ | 7 """ |
8 | 8 |
9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform | 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform |
10 import Queue, threading | 10 import Queue, threading |
11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno | 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno |
12 from autotest_lib.client.bin import sysinfo | 12 from autotest_lib.client.bin import sysinfo |
13 from autotest_lib.client.common_lib import base_job | 13 from autotest_lib.client.common_lib import base_job |
14 from autotest_lib.client.common_lib import error, log, utils, packages | 14 from autotest_lib.client.common_lib import error, log, utils, packages |
15 from autotest_lib.client.common_lib import logging_manager | 15 from autotest_lib.client.common_lib import logging_manager |
16 from autotest_lib.server import autotest, hosts, site_host_attributes | 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils |
17 from autotest_lib.server import test, subcommand, profilers | |
18 from autotest_lib.server.hosts import abstract_ssh | 17 from autotest_lib.server.hosts import abstract_ssh |
19 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils | 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils |
20 | 19 |
21 | 20 |
22 def _control_segment_path(name): | 21 def _control_segment_path(name): |
23 """Get the pathname of the named control segment file.""" | 22 """Get the pathname of the named control segment file.""" |
24 server_dir = os.path.dirname(os.path.abspath(__file__)) | 23 server_dir = os.path.dirname(os.path.abspath(__file__)) |
25 return os.path.join(server_dir, "control_segments", name) | 24 return os.path.join(server_dir, "control_segments", name) |
26 | 25 |
27 | 26 |
(...skipping 447 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
475 Args: | 474 Args: |
476 tests: List of tests to run. | 475 tests: List of tests to run. |
477 machines: list of machines to use. | 476 machines: list of machines to use. |
478 """ | 477 """ |
479 # The Queue is thread safe, but since a machine may have to search | 478 # The Queue is thread safe, but since a machine may have to search |
480 # through the queue to find a valid test the lock provides exclusive | 479 # through the queue to find a valid test the lock provides exclusive |
481 # queue access for more than just the get call. | 480 # queue access for more than just the get call. |
482 test_queue = Queue.Queue() | 481 test_queue = Queue.Queue() |
483 test_queue_lock = threading.Lock() | 482 test_queue_lock = threading.Lock() |
484 | 483 |
485 machine_workers = [machine_worker(machine, self.resultdir, test_queue, | 484 machine_workers = [server_job_utils.machine_worker(machine, |
486 test_queue_lock) | 485 self.resultdir, |
| 486 test_queue, |
| 487 test_queue_lock) |
487 for machine in machines] | 488 for machine in machines] |
488 | 489 |
489 # To (potentially) speed up searching for valid tests create a list of | 490 # To (potentially) speed up searching for valid tests create a list of |
490 # unique attribute sets present in the machines for this job. If sets | 491 # unique attribute sets present in the machines for this job. If sets |
491 # were hashable we could just use a dictionary for fast verification. | 492 # were hashable we could just use a dictionary for fast verification. |
492 # This at least reduces the search space from the number of machines to | 493 # This at least reduces the search space from the number of machines to |
493 # the number of unique machines. | 494 # the number of unique machines. |
494 unique_machine_attributes = [] | 495 unique_machine_attributes = [] |
495 for mw in machine_workers: | 496 for mw in machine_workers: |
496 if not mw.attribute_set in unique_machine_attributes: | 497 if not mw.attribute_set in unique_machine_attributes: |
497 unique_machine_attributes.append(mw.attribute_set) | 498 unique_machine_attributes.append(mw.attribute_set) |
498 | 499 |
499 # Only queue tests which are valid on at least one machine. Record | 500 # Only queue tests which are valid on at least one machine. Record |
500 # skipped tests in the status.log file using record_skipped_test(). | 501 # skipped tests in the status.log file using record_skipped_test(). |
501 for test_entry in tests: | 502 for test_entry in tests: |
502 ti = test_item(*test_entry) | 503 ti = server_job_utils.test_item(*test_entry) |
503 machine_found = False | 504 machine_found = False |
504 for ma in unique_machine_attributes: | 505 for ma in unique_machine_attributes: |
505 if ti.validate(ma): | 506 if ti.validate(ma): |
506 test_queue.put(ti) | 507 test_queue.put(ti) |
507 machine_found = True | 508 machine_found = True |
508 break | 509 break |
509 if not machine_found: | 510 if not machine_found: |
510 self.record_skipped_test(ti) | 511 self.record_skipped_test(ti) |
511 | 512 |
512 # Run valid tests and wait for completion. | 513 # Run valid tests and wait for completion. |
(...skipping 692 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1205 intervals = self.disabled_warnings.setdefault(warning_type, []) | 1206 intervals = self.disabled_warnings.setdefault(warning_type, []) |
1206 if not intervals or intervals[-1][1] is not None: | 1207 if not intervals or intervals[-1][1] is not None: |
1207 intervals.append((int(current_time_func()), None)) | 1208 intervals.append((int(current_time_func()), None)) |
1208 | 1209 |
1209 | 1210 |
1210 def enable_warnings(self, warning_type, current_time_func=time.time): | 1211 def enable_warnings(self, warning_type, current_time_func=time.time): |
1211 """As of now, enables all further warnings of this type.""" | 1212 """As of now, enables all further warnings of this type.""" |
1212 intervals = self.disabled_warnings.get(warning_type, []) | 1213 intervals = self.disabled_warnings.get(warning_type, []) |
1213 if intervals and intervals[-1][1] is None: | 1214 if intervals and intervals[-1][1] is None: |
1214 intervals[-1] = (intervals[-1][0], int(current_time_func())) | 1215 intervals[-1] = (intervals[-1][0], int(current_time_func())) |
1215 | |
1216 | |
1217 class test_item(object): | |
1218 """Adds machine verification logic to the basic test tuple. | |
1219 | |
1220 Tests can either be tuples of the existing form ('testName', {args}) or the | |
1221 extended for of ('testname', {args}, ['include'], ['exclude']) where include | |
1222 and exclude are lists of attribues. A machine must have all the attributes | |
1223 in include and must not have any of the attributes in exclude to be valid | |
1224 for the test. | |
1225 """ | |
1226 | |
1227 def __init__(self, test_name, test_args, include_attribs=None, | |
1228 exclude_attribs=None): | |
1229 """Creates an instance of test_item. | |
1230 | |
1231 Args: | |
1232 test_name: string, name of test to execute. | |
1233 test_args: dictionary, arguments to pass into test. | |
1234 include_attribs: attributes a machine must have to run test. | |
1235 exclude_attribs: attributes preventing a machine from running test. | |
1236 """ | |
1237 self.test_name = test_name | |
1238 self.test_args = test_args | |
1239 self.inc_set = None | |
1240 if include_attribs is not None: | |
1241 self.inc_set = set(include_attribs) | |
1242 self.exc_set = None | |
1243 if exclude_attribs is not None: | |
1244 self.exc_set = set(exclude_attribs) | |
1245 | |
1246 def __str__(self): | |
1247 """Return an info string of this test.""" | |
1248 params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] | |
1249 msg = '%s(%s)' % (self.test_name, params) | |
1250 if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set] | |
1251 if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set] | |
1252 return msg | |
1253 | |
1254 def validate(self, machine_attributes): | |
1255 """Check if this test can run on machine with machine_attributes. | |
1256 | |
1257 If the test has include attributes, a candidate machine must have all | |
1258 the attributes to be valid. | |
1259 | |
1260 If the test has exclude attributes, a candidate machine cannot have any | |
1261 of the attributes to be valid. | |
1262 | |
1263 Args: | |
1264 machine_attributes: set, True attributes of candidate machine. | |
1265 | |
1266 Returns: | |
1267 True/False if the machine is valid for this test. | |
1268 """ | |
1269 if self.inc_set is not None: | |
1270 if not self.inc_set <= machine_attributes: return False | |
1271 if self.exc_set is not None: | |
1272 if self.exc_set & machine_attributes: return False | |
1273 return True | |
1274 | |
1275 | |
1276 class machine_worker(threading.Thread): | |
1277 """Thread that runs tests on a remote host machine.""" | |
1278 | |
1279 def __init__(self, machine, work_dir, test_queue, queue_lock): | |
1280 """Creates an instance of machine_worker to run tests on a remote host. | |
1281 | |
1282 Retrieves that host attributes for this machine and creates the set of | |
1283 True attributes to validate against test include/exclude attributes. | |
1284 | |
1285 Creates a directory to hold the log files for tests run and writes the | |
1286 hostname and tko parser version into keyvals file. | |
1287 | |
1288 Args: | |
1289 machine: name of remote host. | |
1290 work_dir: directory server job is using. | |
1291 test_queue: queue of tests. | |
1292 queue_lock: lock protecting test_queue. | |
1293 """ | |
1294 threading.Thread.__init__(self) | |
1295 self._test_queue = test_queue | |
1296 self._test_queue_lock = queue_lock | |
1297 self._tests_run = 0 | |
1298 self._machine = machine | |
1299 self._host = hosts.create_host(self._machine) | |
1300 self._client_at = autotest.Autotest(self._host) | |
1301 client_attributes = site_host_attributes.HostAttributes(machine) | |
1302 self.attribute_set = set([key for key, value in | |
1303 client_attributes.__dict__.items() if value]) | |
1304 self._results_dir = os.path.join(work_dir, self._machine) | |
1305 if not os.path.exists(self._results_dir): | |
1306 os.makedirs(self._results_dir) | |
1307 machine_data = {'hostname': self._machine, | |
1308 'status_version': str(1)} | |
1309 utils.write_keyval(self._results_dir, machine_data) | |
1310 | |
1311 def __str__(self): | |
1312 attributes = [a for a in self.attribute_set] | |
1313 return '%s attributes=%s' % (self._machine, attributes) | |
1314 | |
1315 def get_test(self): | |
1316 """Return a test from the queue to run on this host. | |
1317 | |
1318 The test queue can be non-empty, but still not contain a test that is | |
1319 valid for this machine. This function will take exclusive access to | |
1320 the queue via _test_queue_lock and repeatedly pop tests off the queue | |
1321 until finding a valid test or depleting the queue. In either case if | |
1322 invalid tests have been popped from the queue, they are pushed back | |
1323 onto the queue before returning. | |
1324 | |
1325 Returns: | |
1326 test_item, or None if no more tests exist for this machine. | |
1327 """ | |
1328 good_test = None | |
1329 skipped_tests = [] | |
1330 | |
1331 with self._test_queue_lock: | |
1332 while True: | |
1333 try: | |
1334 canidate_test = self._test_queue.get_nowait() | |
1335 # Check if test is valid for this machine. | |
1336 if canidate_test.validate(self.attribute_set): | |
1337 good_test = canidate_test | |
1338 break | |
1339 skipped_tests.append(canidate_test) | |
1340 | |
1341 except Queue.Empty: | |
1342 break | |
1343 | |
1344 # Return any skipped tests to the queue. | |
1345 for st in skipped_tests: | |
1346 self._test_queue.put(st) | |
1347 | |
1348 return good_test | |
1349 | |
1350 def run_subcommand(self, active_test): | |
1351 """Use subcommand to fork process and execute test.""" | |
1352 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test]) | |
1353 sub_cmd.fork_start() | |
1354 sub_cmd.fork_waitfor() | |
1355 | |
1356 def subcommand_wrapper(self, active_test): | |
1357 """Callback for subcommand to call into with the test parameter.""" | |
1358 self._client_at.run_test(active_test.test_name, | |
1359 results_dir=self._results_dir, | |
1360 **active_test.test_args) | |
1361 | |
1362 def run(self): | |
1363 """Executes tests on host machine. | |
1364 | |
1365 Uses subprocess to fork the process when running tests so unique client | |
1366 jobs talk to unique server jobs which prevents log files from | |
1367 simlutaneous tests interweaving with each other. | |
1368 """ | |
1369 while True: | |
1370 active_test = self.get_test() | |
1371 if active_test is None: | |
1372 break | |
1373 | |
1374 logging.info('%s running %s', self._machine, active_test) | |
1375 try: | |
1376 self.run_subcommand(active_test) | |
1377 except (error.AutoservError, error.AutotestError): | |
1378 logging.exception('Error running test "%s".', active_test) | |
1379 except Exception: | |
1380 logging.exception('Exception running test "%s".', active_test) | |
1381 raise | |
1382 finally: | |
1383 self._test_queue.task_done() | |
1384 self._tests_run += 1 | |
1385 | |
1386 logging.info('%s completed %d tests.', self._machine, self._tests_run) | |
OLD | NEW |