Index: server/server_job_utils.py |
diff --git a/server/server_job_utils.py b/server/server_job_utils.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..fab956e2fc29fae5c64305d5039f54677aa11e9b |
--- /dev/null |
+++ b/server/server_job_utils.py |
@@ -0,0 +1,190 @@ |
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
+# Use of this source code is governed by a BSD-style license that can be |
+# found in the LICENSE file. |
+ |
+""" |
+Utility classes used by server_job.distribute_across_machines(). |
+ |
+test_item: extends the basic test tuple to add include/exclude attributes. |
+ |
+machine_worker: is a thread that manages running tests on a host. It |
+ verifies test are valid for a host using the test attributes from test_item |
+ and the host attributes from host_attributes. |
+""" |
+ |
+ |
+import logging, os, Queue, threading |
+from autotest_lib.client.common_lib import error, utils |
+from autotest_lib.server import autotest, hosts, host_attributes, subcommand |
+ |
+ |
+class test_item(object): |
+ """Adds machine verification logic to the basic test tuple. |
+ |
+ Tests can either be tuples of the existing form ('testName', {args}) or the |
+ extended for of ('testname', {args}, ['include'], ['exclude']) where include |
+ and exclude are lists of attributes. A machine must have all the attributes |
+ in include and must not have any of the attributes in exclude to be valid |
+ for the test. |
+ """ |
+ |
+ def __init__(self, test_name, test_args, include_attribs=None, |
+ exclude_attribs=None): |
+ """Creates an instance of test_item. |
+ |
+ Args: |
+ test_name: string, name of test to execute. |
+ test_args: dictionary, arguments to pass into test. |
+ include_attribs: attributes a machine must have to run test. |
+ exclude_attribs: attributes preventing a machine from running test. |
+ """ |
+ self.test_name = test_name |
+ self.test_args = test_args |
+ self.inc_set = None |
+ if include_attribs is not None: |
+ self.inc_set = set(include_attribs) |
+ self.exc_set = None |
+ if exclude_attribs is not None: |
+ self.exc_set = set(exclude_attribs) |
+ |
+ def __str__(self): |
+ """Return an info string of this test.""" |
+ params = ['%s=%s' % (k, v) for k, v in self.test_args.items()] |
+ msg = '%s(%s)' % (self.test_name, params) |
+ if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set] |
+ if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set] |
+ return msg |
+ |
+ def validate(self, machine_attributes): |
+ """Check if this test can run on machine with machine_attributes. |
+ |
+ If the test has include attributes, a candidate machine must have all |
+ the attributes to be valid. |
+ |
+ If the test has exclude attributes, a candidate machine cannot have any |
+ of the attributes to be valid. |
+ |
+ Args: |
+ machine_attributes: set, True attributes of candidate machine. |
+ |
+ Returns: |
+ True/False if the machine is valid for this test. |
+ """ |
+ if self.inc_set is not None: |
+ if not self.inc_set <= machine_attributes: return False |
+ if self.exc_set is not None: |
+ if self.exc_set & machine_attributes: return False |
+ return True |
+ |
+ |
+class machine_worker(threading.Thread): |
+ """Thread that runs tests on a remote host machine.""" |
+ |
+ def __init__(self, machine, work_dir, test_queue, queue_lock): |
+ """Creates an instance of machine_worker to run tests on a remote host. |
+ |
+ Retrieves that host attributes for this machine and creates the set of |
+ True attributes to validate against test include/exclude attributes. |
+ |
+ Creates a directory to hold the log files for tests run and writes the |
+ hostname and tko parser version into keyvals file. |
+ |
+ Args: |
+ machine: name of remote host. |
+ work_dir: directory server job is using. |
+ test_queue: queue of tests. |
+ queue_lock: lock protecting test_queue. |
+ """ |
+ threading.Thread.__init__(self) |
+ self._test_queue = test_queue |
+ self._test_queue_lock = queue_lock |
+ self._tests_run = 0 |
+ self._machine = machine |
+ self._host = hosts.create_host(self._machine) |
+ self._client_at = autotest.Autotest(self._host) |
+ client_attributes = host_attributes.host_attributes(machine) |
+ self.attribute_set = set([key for key, value in |
+ client_attributes.__dict__.items() if value]) |
+ self._results_dir = os.path.join(work_dir, self._machine) |
+ if not os.path.exists(self._results_dir): |
+ os.makedirs(self._results_dir) |
+ machine_data = {'hostname': self._machine, |
+ 'status_version': str(1)} |
+ utils.write_keyval(self._results_dir, machine_data) |
+ |
+ def __str__(self): |
+ attributes = [a for a in self.attribute_set] |
+ return '%s attributes=%s' % (self._machine, attributes) |
+ |
+ def get_test(self): |
+ """Return a test from the queue to run on this host. |
+ |
+ The test queue can be non-empty, but still not contain a test that is |
+ valid for this machine. This function will take exclusive access to |
+ the queue via _test_queue_lock and repeatedly pop tests off the queue |
+ until finding a valid test or depleting the queue. In either case if |
+ invalid tests have been popped from the queue, they are pushed back |
+ onto the queue before returning. |
+ |
+ Returns: |
+ test_item, or None if no more tests exist for this machine. |
+ """ |
+ good_test = None |
+ skipped_tests = [] |
+ |
+ with self._test_queue_lock: |
+ while True: |
+ try: |
+ canidate_test = self._test_queue.get_nowait() |
+ # Check if test is valid for this machine. |
+ if canidate_test.validate(self.attribute_set): |
+ good_test = canidate_test |
+ break |
+ skipped_tests.append(canidate_test) |
+ |
+ except Queue.Empty: |
+ break |
+ |
+ # Return any skipped tests to the queue. |
+ for st in skipped_tests: |
+ self._test_queue.put(st) |
+ |
+ return good_test |
+ |
+ def run_subcommand(self, active_test): |
+ """Use subcommand to fork process and execute test.""" |
+ sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test]) |
+ sub_cmd.fork_start() |
+ sub_cmd.fork_waitfor() |
+ |
+ def subcommand_wrapper(self, active_test): |
+ """Callback for subcommand to call into with the test parameter.""" |
+ self._client_at.run_test(active_test.test_name, |
+ results_dir=self._results_dir, |
+ **active_test.test_args) |
+ |
+ def run(self): |
+ """Executes tests on host machine. |
+ |
+ Uses subprocess to fork the process when running tests so unique client |
+ jobs talk to unique server jobs which prevents log files from |
+ simultaneous tests interweaving with each other. |
+ """ |
+ while True: |
+ active_test = self.get_test() |
+ if active_test is None: |
+ break |
+ |
+ logging.info('%s running %s', self._machine, active_test) |
+ try: |
+ self.run_subcommand(active_test) |
+ except (error.AutoservError, error.AutotestError): |
+ logging.exception('Error running test "%s".', active_test) |
+ except Exception: |
+ logging.exception('Exception running test "%s".', active_test) |
+ raise |
+ finally: |
+ self._test_queue.task_done() |
+ self._tests_run += 1 |
+ |
+ logging.info('%s completed %d tests.', self._machine, self._tests_run) |