Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(12)

Side by Side Diff: server/server_job.py

Issue 6677157: Preparing to upstream: Created host_attributes and server_job_utils files. (Closed) Base URL: ssh://gitrw.chromium.org:9222/autotest.git@master
Patch Set: Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « server/host_attributes.py ('k') | server/server_job_utils.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 """ 1 """
2 The main job wrapper for the server side. 2 The main job wrapper for the server side.
3 3
4 This is the core infrastructure. Derived from the client side job.py 4 This is the core infrastructure. Derived from the client side job.py
5 5
6 Copyright Martin J. Bligh, Andy Whitcroft 2007 6 Copyright Martin J. Bligh, Andy Whitcroft 2007
7 """ 7 """
8 8
9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform 9 import getpass, os, sys, re, stat, tempfile, time, select, subprocess, platform
10 import Queue, threading 10 import Queue, threading
11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno 11 import traceback, shutil, warnings, fcntl, pickle, logging, itertools, errno
12 from autotest_lib.client.bin import sysinfo 12 from autotest_lib.client.bin import sysinfo
13 from autotest_lib.client.common_lib import base_job 13 from autotest_lib.client.common_lib import base_job
14 from autotest_lib.client.common_lib import error, log, utils, packages 14 from autotest_lib.client.common_lib import error, log, utils, packages
15 from autotest_lib.client.common_lib import logging_manager 15 from autotest_lib.client.common_lib import logging_manager
16 from autotest_lib.server import autotest, hosts, site_host_attributes 16 from autotest_lib.server import test, subcommand, profilers, server_job_utils
17 from autotest_lib.server import test, subcommand, profilers
18 from autotest_lib.server.hosts import abstract_ssh 17 from autotest_lib.server.hosts import abstract_ssh
19 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils 18 from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
20 19
21 20
22 def _control_segment_path(name): 21 def _control_segment_path(name):
23 """Get the pathname of the named control segment file.""" 22 """Get the pathname of the named control segment file."""
24 server_dir = os.path.dirname(os.path.abspath(__file__)) 23 server_dir = os.path.dirname(os.path.abspath(__file__))
25 return os.path.join(server_dir, "control_segments", name) 24 return os.path.join(server_dir, "control_segments", name)
26 25
27 26
(...skipping 447 matching lines...) Expand 10 before | Expand all | Expand 10 after
475 Args: 474 Args:
476 tests: List of tests to run. 475 tests: List of tests to run.
477 machines: list of machines to use. 476 machines: list of machines to use.
478 """ 477 """
479 # The Queue is thread safe, but since a machine may have to search 478 # The Queue is thread safe, but since a machine may have to search
480 # through the queue to find a valid test the lock provides exclusive 479 # through the queue to find a valid test the lock provides exclusive
481 # queue access for more than just the get call. 480 # queue access for more than just the get call.
482 test_queue = Queue.Queue() 481 test_queue = Queue.Queue()
483 test_queue_lock = threading.Lock() 482 test_queue_lock = threading.Lock()
484 483
485 machine_workers = [machine_worker(machine, self.resultdir, test_queue, 484 machine_workers = [server_job_utils.machine_worker(machine,
486 test_queue_lock) 485 self.resultdir,
486 test_queue,
487 test_queue_lock)
487 for machine in machines] 488 for machine in machines]
488 489
489 # To (potentially) speed up searching for valid tests create a list of 490 # To (potentially) speed up searching for valid tests create a list of
490 # unique attribute sets present in the machines for this job. If sets 491 # unique attribute sets present in the machines for this job. If sets
491 # were hashable we could just use a dictionary for fast verification. 492 # were hashable we could just use a dictionary for fast verification.
492 # This at least reduces the search space from the number of machines to 493 # This at least reduces the search space from the number of machines to
493 # the number of unique machines. 494 # the number of unique machines.
494 unique_machine_attributes = [] 495 unique_machine_attributes = []
495 for mw in machine_workers: 496 for mw in machine_workers:
496 if not mw.attribute_set in unique_machine_attributes: 497 if not mw.attribute_set in unique_machine_attributes:
497 unique_machine_attributes.append(mw.attribute_set) 498 unique_machine_attributes.append(mw.attribute_set)
498 499
499 # Only queue tests which are valid on at least one machine. Record 500 # Only queue tests which are valid on at least one machine. Record
500 # skipped tests in the status.log file using record_skipped_test(). 501 # skipped tests in the status.log file using record_skipped_test().
501 for test_entry in tests: 502 for test_entry in tests:
502 ti = test_item(*test_entry) 503 ti = server_job_utils.test_item(*test_entry)
503 machine_found = False 504 machine_found = False
504 for ma in unique_machine_attributes: 505 for ma in unique_machine_attributes:
505 if ti.validate(ma): 506 if ti.validate(ma):
506 test_queue.put(ti) 507 test_queue.put(ti)
507 machine_found = True 508 machine_found = True
508 break 509 break
509 if not machine_found: 510 if not machine_found:
510 self.record_skipped_test(ti) 511 self.record_skipped_test(ti)
511 512
512 # Run valid tests and wait for completion. 513 # Run valid tests and wait for completion.
(...skipping 692 matching lines...) Expand 10 before | Expand all | Expand 10 after
1205 intervals = self.disabled_warnings.setdefault(warning_type, []) 1206 intervals = self.disabled_warnings.setdefault(warning_type, [])
1206 if not intervals or intervals[-1][1] is not None: 1207 if not intervals or intervals[-1][1] is not None:
1207 intervals.append((int(current_time_func()), None)) 1208 intervals.append((int(current_time_func()), None))
1208 1209
1209 1210
1210 def enable_warnings(self, warning_type, current_time_func=time.time): 1211 def enable_warnings(self, warning_type, current_time_func=time.time):
1211 """As of now, enables all further warnings of this type.""" 1212 """As of now, enables all further warnings of this type."""
1212 intervals = self.disabled_warnings.get(warning_type, []) 1213 intervals = self.disabled_warnings.get(warning_type, [])
1213 if intervals and intervals[-1][1] is None: 1214 if intervals and intervals[-1][1] is None:
1214 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1215 intervals[-1] = (intervals[-1][0], int(current_time_func()))
1215
1216
1217 class test_item(object):
1218 """Adds machine verification logic to the basic test tuple.
1219
1220 Tests can either be tuples of the existing form ('testName', {args}) or the
1221 extended for of ('testname', {args}, ['include'], ['exclude']) where include
1222 and exclude are lists of attribues. A machine must have all the attributes
1223 in include and must not have any of the attributes in exclude to be valid
1224 for the test.
1225 """
1226
1227 def __init__(self, test_name, test_args, include_attribs=None,
1228 exclude_attribs=None):
1229 """Creates an instance of test_item.
1230
1231 Args:
1232 test_name: string, name of test to execute.
1233 test_args: dictionary, arguments to pass into test.
1234 include_attribs: attributes a machine must have to run test.
1235 exclude_attribs: attributes preventing a machine from running test.
1236 """
1237 self.test_name = test_name
1238 self.test_args = test_args
1239 self.inc_set = None
1240 if include_attribs is not None:
1241 self.inc_set = set(include_attribs)
1242 self.exc_set = None
1243 if exclude_attribs is not None:
1244 self.exc_set = set(exclude_attribs)
1245
1246 def __str__(self):
1247 """Return an info string of this test."""
1248 params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
1249 msg = '%s(%s)' % (self.test_name, params)
1250 if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set]
1251 if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set]
1252 return msg
1253
1254 def validate(self, machine_attributes):
1255 """Check if this test can run on machine with machine_attributes.
1256
1257 If the test has include attributes, a candidate machine must have all
1258 the attributes to be valid.
1259
1260 If the test has exclude attributes, a candidate machine cannot have any
1261 of the attributes to be valid.
1262
1263 Args:
1264 machine_attributes: set, True attributes of candidate machine.
1265
1266 Returns:
1267 True/False if the machine is valid for this test.
1268 """
1269 if self.inc_set is not None:
1270 if not self.inc_set <= machine_attributes: return False
1271 if self.exc_set is not None:
1272 if self.exc_set & machine_attributes: return False
1273 return True
1274
1275
1276 class machine_worker(threading.Thread):
1277 """Thread that runs tests on a remote host machine."""
1278
1279 def __init__(self, machine, work_dir, test_queue, queue_lock):
1280 """Creates an instance of machine_worker to run tests on a remote host.
1281
1282 Retrieves that host attributes for this machine and creates the set of
1283 True attributes to validate against test include/exclude attributes.
1284
1285 Creates a directory to hold the log files for tests run and writes the
1286 hostname and tko parser version into keyvals file.
1287
1288 Args:
1289 machine: name of remote host.
1290 work_dir: directory server job is using.
1291 test_queue: queue of tests.
1292 queue_lock: lock protecting test_queue.
1293 """
1294 threading.Thread.__init__(self)
1295 self._test_queue = test_queue
1296 self._test_queue_lock = queue_lock
1297 self._tests_run = 0
1298 self._machine = machine
1299 self._host = hosts.create_host(self._machine)
1300 self._client_at = autotest.Autotest(self._host)
1301 client_attributes = site_host_attributes.HostAttributes(machine)
1302 self.attribute_set = set([key for key, value in
1303 client_attributes.__dict__.items() if value])
1304 self._results_dir = os.path.join(work_dir, self._machine)
1305 if not os.path.exists(self._results_dir):
1306 os.makedirs(self._results_dir)
1307 machine_data = {'hostname': self._machine,
1308 'status_version': str(1)}
1309 utils.write_keyval(self._results_dir, machine_data)
1310
1311 def __str__(self):
1312 attributes = [a for a in self.attribute_set]
1313 return '%s attributes=%s' % (self._machine, attributes)
1314
1315 def get_test(self):
1316 """Return a test from the queue to run on this host.
1317
1318 The test queue can be non-empty, but still not contain a test that is
1319 valid for this machine. This function will take exclusive access to
1320 the queue via _test_queue_lock and repeatedly pop tests off the queue
1321 until finding a valid test or depleting the queue. In either case if
1322 invalid tests have been popped from the queue, they are pushed back
1323 onto the queue before returning.
1324
1325 Returns:
1326 test_item, or None if no more tests exist for this machine.
1327 """
1328 good_test = None
1329 skipped_tests = []
1330
1331 with self._test_queue_lock:
1332 while True:
1333 try:
1334 canidate_test = self._test_queue.get_nowait()
1335 # Check if test is valid for this machine.
1336 if canidate_test.validate(self.attribute_set):
1337 good_test = canidate_test
1338 break
1339 skipped_tests.append(canidate_test)
1340
1341 except Queue.Empty:
1342 break
1343
1344 # Return any skipped tests to the queue.
1345 for st in skipped_tests:
1346 self._test_queue.put(st)
1347
1348 return good_test
1349
1350 def run_subcommand(self, active_test):
1351 """Use subcommand to fork process and execute test."""
1352 sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test])
1353 sub_cmd.fork_start()
1354 sub_cmd.fork_waitfor()
1355
1356 def subcommand_wrapper(self, active_test):
1357 """Callback for subcommand to call into with the test parameter."""
1358 self._client_at.run_test(active_test.test_name,
1359 results_dir=self._results_dir,
1360 **active_test.test_args)
1361
1362 def run(self):
1363 """Executes tests on host machine.
1364
1365 Uses subprocess to fork the process when running tests so unique client
1366 jobs talk to unique server jobs which prevents log files from
1367 simlutaneous tests interweaving with each other.
1368 """
1369 while True:
1370 active_test = self.get_test()
1371 if active_test is None:
1372 break
1373
1374 logging.info('%s running %s', self._machine, active_test)
1375 try:
1376 self.run_subcommand(active_test)
1377 except (error.AutoservError, error.AutotestError):
1378 logging.exception('Error running test "%s".', active_test)
1379 except Exception:
1380 logging.exception('Exception running test "%s".', active_test)
1381 raise
1382 finally:
1383 self._test_queue.task_done()
1384 self._tests_run += 1
1385
1386 logging.info('%s completed %d tests.', self._machine, self._tests_run)
OLDNEW
« no previous file with comments | « server/host_attributes.py ('k') | server/server_job_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698