Index: server/server_job.py |
diff --git a/server/server_job.py b/server/server_job.py |
index 77e12e1f5d1a706319db9317e27ea167dac25c48..753cfa1522a2ed9de3f987339cebf5f2adee54fa 100644 |
--- a/server/server_job.py |
+++ b/server/server_job.py |
@@ -13,8 +13,7 @@ from autotest_lib.client.bin import sysinfo |
from autotest_lib.client.common_lib import base_job |
from autotest_lib.client.common_lib import error, log, utils, packages |
from autotest_lib.client.common_lib import logging_manager |
-from autotest_lib.server import autotest, hosts, site_host_attributes |
-from autotest_lib.server import test, subcommand, profilers |
+from autotest_lib.server import test, subcommand, profilers, server_job_utils |
from autotest_lib.server.hosts import abstract_ssh |
from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils |
@@ -482,8 +481,10 @@ class base_server_job(base_job.base_job): |
test_queue = Queue.Queue() |
test_queue_lock = threading.Lock() |
- machine_workers = [machine_worker(machine, self.resultdir, test_queue, |
- test_queue_lock) |
+ machine_workers = [server_job_utils.machine_worker(machine, |
+ self.resultdir, |
+ test_queue, |
+ test_queue_lock) |
for machine in machines] |
# To (potentially) speed up searching for valid tests create a list of |
@@ -499,7 +500,7 @@ class base_server_job(base_job.base_job): |
# Only queue tests which are valid on at least one machine. Record |
# skipped tests in the status.log file using record_skipped_test(). |
for test_entry in tests: |
- ti = test_item(*test_entry) |
+ ti = server_job_utils.test_item(*test_entry) |
machine_found = False |
for ma in unique_machine_attributes: |
if ti.validate(ma): |
@@ -1212,175 +1213,3 @@ class warning_manager(object): |
intervals = self.disabled_warnings.get(warning_type, []) |
if intervals and intervals[-1][1] is None: |
intervals[-1] = (intervals[-1][0], int(current_time_func())) |
- |
- |
-class test_item(object): |
- """Adds machine verification logic to the basic test tuple. |
- |
- Tests can either be tuples of the existing form ('testName', {args}) or the |
- extended for of ('testname', {args}, ['include'], ['exclude']) where include |
- and exclude are lists of attribues. A machine must have all the attributes |
- in include and must not have any of the attributes in exclude to be valid |
- for the test. |
- """ |
- |
- def __init__(self, test_name, test_args, include_attribs=None, |
- exclude_attribs=None): |
- """Creates an instance of test_item. |
- |
- Args: |
- test_name: string, name of test to execute. |
- test_args: dictionary, arguments to pass into test. |
- include_attribs: attributes a machine must have to run test. |
- exclude_attribs: attributes preventing a machine from running test. |
- """ |
- self.test_name = test_name |
- self.test_args = test_args |
- self.inc_set = None |
- if include_attribs is not None: |
- self.inc_set = set(include_attribs) |
- self.exc_set = None |
- if exclude_attribs is not None: |
- self.exc_set = set(exclude_attribs) |
- |
- def __str__(self): |
- """Return an info string of this test.""" |
- params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] |
- msg = '%s(%s)' % (self.test_name, params) |
- if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set] |
- if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set] |
- return msg |
- |
- def validate(self, machine_attributes): |
- """Check if this test can run on machine with machine_attributes. |
- |
- If the test has include attributes, a candidate machine must have all |
- the attributes to be valid. |
- |
- If the test has exclude attributes, a candidate machine cannot have any |
- of the attributes to be valid. |
- |
- Args: |
- machine_attributes: set, True attributes of candidate machine. |
- |
- Returns: |
- True/False if the machine is valid for this test. |
- """ |
- if self.inc_set is not None: |
- if not self.inc_set <= machine_attributes: return False |
- if self.exc_set is not None: |
- if self.exc_set & machine_attributes: return False |
- return True |
- |
- |
-class machine_worker(threading.Thread): |
- """Thread that runs tests on a remote host machine.""" |
- |
- def __init__(self, machine, work_dir, test_queue, queue_lock): |
- """Creates an instance of machine_worker to run tests on a remote host. |
- |
- Retrieves that host attributes for this machine and creates the set of |
- True attributes to validate against test include/exclude attributes. |
- |
- Creates a directory to hold the log files for tests run and writes the |
- hostname and tko parser version into keyvals file. |
- |
- Args: |
- machine: name of remote host. |
- work_dir: directory server job is using. |
- test_queue: queue of tests. |
- queue_lock: lock protecting test_queue. |
- """ |
- threading.Thread.__init__(self) |
- self._test_queue = test_queue |
- self._test_queue_lock = queue_lock |
- self._tests_run = 0 |
- self._machine = machine |
- self._host = hosts.create_host(self._machine) |
- self._client_at = autotest.Autotest(self._host) |
- client_attributes = site_host_attributes.HostAttributes(machine) |
- self.attribute_set = set([key for key, value in |
- client_attributes.__dict__.items() if value]) |
- self._results_dir = os.path.join(work_dir, self._machine) |
- if not os.path.exists(self._results_dir): |
- os.makedirs(self._results_dir) |
- machine_data = {'hostname': self._machine, |
- 'status_version': str(1)} |
- utils.write_keyval(self._results_dir, machine_data) |
- |
- def __str__(self): |
- attributes = [a for a in self.attribute_set] |
- return '%s attributes=%s' % (self._machine, attributes) |
- |
- def get_test(self): |
- """Return a test from the queue to run on this host. |
- |
- The test queue can be non-empty, but still not contain a test that is |
- valid for this machine. This function will take exclusive access to |
- the queue via _test_queue_lock and repeatedly pop tests off the queue |
- until finding a valid test or depleting the queue. In either case if |
- invalid tests have been popped from the queue, they are pushed back |
- onto the queue before returning. |
- |
- Returns: |
- test_item, or None if no more tests exist for this machine. |
- """ |
- good_test = None |
- skipped_tests = [] |
- |
- with self._test_queue_lock: |
- while True: |
- try: |
- canidate_test = self._test_queue.get_nowait() |
- # Check if test is valid for this machine. |
- if canidate_test.validate(self.attribute_set): |
- good_test = canidate_test |
- break |
- skipped_tests.append(canidate_test) |
- |
- except Queue.Empty: |
- break |
- |
- # Return any skipped tests to the queue. |
- for st in skipped_tests: |
- self._test_queue.put(st) |
- |
- return good_test |
- |
- def run_subcommand(self, active_test): |
- """Use subcommand to fork process and execute test.""" |
- sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test]) |
- sub_cmd.fork_start() |
- sub_cmd.fork_waitfor() |
- |
- def subcommand_wrapper(self, active_test): |
- """Callback for subcommand to call into with the test parameter.""" |
- self._client_at.run_test(active_test.test_name, |
- results_dir=self._results_dir, |
- **active_test.test_args) |
- |
- def run(self): |
- """Executes tests on host machine. |
- |
- Uses subprocess to fork the process when running tests so unique client |
- jobs talk to unique server jobs which prevents log files from |
- simlutaneous tests interweaving with each other. |
- """ |
- while True: |
- active_test = self.get_test() |
- if active_test is None: |
- break |
- |
- logging.info('%s running %s', self._machine, active_test) |
- try: |
- self.run_subcommand(active_test) |
- except (error.AutoservError, error.AutotestError): |
- logging.exception('Error running test "%s".', active_test) |
- except Exception: |
- logging.exception('Exception running test "%s".', active_test) |
- raise |
- finally: |
- self._test_queue.task_done() |
- self._tests_run += 1 |
- |
- logging.info('%s completed %d tests.', self._machine, self._tests_run) |