Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1978)

Unified Diff: server/server_job.py

Issue 6677157: Preparing to upstream: Created host_attributes and server_job_utils files. (Closed) Base URL: ssh://gitrw.chromium.org:9222/autotest.git@master
Patch Set: Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/host_attributes.py ('k') | server/server_job_utils.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/server_job.py
diff --git a/server/server_job.py b/server/server_job.py
index 77e12e1f5d1a706319db9317e27ea167dac25c48..753cfa1522a2ed9de3f987339cebf5f2adee54fa 100644
--- a/server/server_job.py
+++ b/server/server_job.py
@@ -13,8 +13,7 @@ from autotest_lib.client.bin import sysinfo
from autotest_lib.client.common_lib import base_job
from autotest_lib.client.common_lib import error, log, utils, packages
from autotest_lib.client.common_lib import logging_manager
-from autotest_lib.server import autotest, hosts, site_host_attributes
-from autotest_lib.server import test, subcommand, profilers
+from autotest_lib.server import test, subcommand, profilers, server_job_utils
from autotest_lib.server.hosts import abstract_ssh
from autotest_lib.tko import db as tko_db, status_lib, utils as tko_utils
@@ -482,8 +481,10 @@ class base_server_job(base_job.base_job):
test_queue = Queue.Queue()
test_queue_lock = threading.Lock()
- machine_workers = [machine_worker(machine, self.resultdir, test_queue,
- test_queue_lock)
+ machine_workers = [server_job_utils.machine_worker(machine,
+ self.resultdir,
+ test_queue,
+ test_queue_lock)
for machine in machines]
# To (potentially) speed up searching for valid tests create a list of
@@ -499,7 +500,7 @@ class base_server_job(base_job.base_job):
# Only queue tests which are valid on at least one machine. Record
# skipped tests in the status.log file using record_skipped_test().
for test_entry in tests:
- ti = test_item(*test_entry)
+ ti = server_job_utils.test_item(*test_entry)
machine_found = False
for ma in unique_machine_attributes:
if ti.validate(ma):
@@ -1212,175 +1213,3 @@ class warning_manager(object):
intervals = self.disabled_warnings.get(warning_type, [])
if intervals and intervals[-1][1] is None:
intervals[-1] = (intervals[-1][0], int(current_time_func()))
-
-
-class test_item(object):
- """Adds machine verification logic to the basic test tuple.
-
- Tests can either be tuples of the existing form ('testName', {args}) or the
- extended for of ('testname', {args}, ['include'], ['exclude']) where include
- and exclude are lists of attribues. A machine must have all the attributes
- in include and must not have any of the attributes in exclude to be valid
- for the test.
- """
-
- def __init__(self, test_name, test_args, include_attribs=None,
- exclude_attribs=None):
- """Creates an instance of test_item.
-
- Args:
- test_name: string, name of test to execute.
- test_args: dictionary, arguments to pass into test.
- include_attribs: attributes a machine must have to run test.
- exclude_attribs: attributes preventing a machine from running test.
- """
- self.test_name = test_name
- self.test_args = test_args
- self.inc_set = None
- if include_attribs is not None:
- self.inc_set = set(include_attribs)
- self.exc_set = None
- if exclude_attribs is not None:
- self.exc_set = set(exclude_attribs)
-
- def __str__(self):
- """Return an info string of this test."""
- params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
- msg = '%s(%s)' % (self.test_name, params)
- if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set]
- if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set]
- return msg
-
- def validate(self, machine_attributes):
- """Check if this test can run on machine with machine_attributes.
-
- If the test has include attributes, a candidate machine must have all
- the attributes to be valid.
-
- If the test has exclude attributes, a candidate machine cannot have any
- of the attributes to be valid.
-
- Args:
- machine_attributes: set, True attributes of candidate machine.
-
- Returns:
- True/False if the machine is valid for this test.
- """
- if self.inc_set is not None:
- if not self.inc_set <= machine_attributes: return False
- if self.exc_set is not None:
- if self.exc_set & machine_attributes: return False
- return True
-
-
-class machine_worker(threading.Thread):
- """Thread that runs tests on a remote host machine."""
-
- def __init__(self, machine, work_dir, test_queue, queue_lock):
- """Creates an instance of machine_worker to run tests on a remote host.
-
- Retrieves that host attributes for this machine and creates the set of
- True attributes to validate against test include/exclude attributes.
-
- Creates a directory to hold the log files for tests run and writes the
- hostname and tko parser version into keyvals file.
-
- Args:
- machine: name of remote host.
- work_dir: directory server job is using.
- test_queue: queue of tests.
- queue_lock: lock protecting test_queue.
- """
- threading.Thread.__init__(self)
- self._test_queue = test_queue
- self._test_queue_lock = queue_lock
- self._tests_run = 0
- self._machine = machine
- self._host = hosts.create_host(self._machine)
- self._client_at = autotest.Autotest(self._host)
- client_attributes = site_host_attributes.HostAttributes(machine)
- self.attribute_set = set([key for key, value in
- client_attributes.__dict__.items() if value])
- self._results_dir = os.path.join(work_dir, self._machine)
- if not os.path.exists(self._results_dir):
- os.makedirs(self._results_dir)
- machine_data = {'hostname': self._machine,
- 'status_version': str(1)}
- utils.write_keyval(self._results_dir, machine_data)
-
- def __str__(self):
- attributes = [a for a in self.attribute_set]
- return '%s attributes=%s' % (self._machine, attributes)
-
- def get_test(self):
- """Return a test from the queue to run on this host.
-
- The test queue can be non-empty, but still not contain a test that is
- valid for this machine. This function will take exclusive access to
- the queue via _test_queue_lock and repeatedly pop tests off the queue
- until finding a valid test or depleting the queue. In either case if
- invalid tests have been popped from the queue, they are pushed back
- onto the queue before returning.
-
- Returns:
- test_item, or None if no more tests exist for this machine.
- """
- good_test = None
- skipped_tests = []
-
- with self._test_queue_lock:
- while True:
- try:
- canidate_test = self._test_queue.get_nowait()
- # Check if test is valid for this machine.
- if canidate_test.validate(self.attribute_set):
- good_test = canidate_test
- break
- skipped_tests.append(canidate_test)
-
- except Queue.Empty:
- break
-
- # Return any skipped tests to the queue.
- for st in skipped_tests:
- self._test_queue.put(st)
-
- return good_test
-
- def run_subcommand(self, active_test):
- """Use subcommand to fork process and execute test."""
- sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test])
- sub_cmd.fork_start()
- sub_cmd.fork_waitfor()
-
- def subcommand_wrapper(self, active_test):
- """Callback for subcommand to call into with the test parameter."""
- self._client_at.run_test(active_test.test_name,
- results_dir=self._results_dir,
- **active_test.test_args)
-
- def run(self):
- """Executes tests on host machine.
-
- Uses subprocess to fork the process when running tests so unique client
- jobs talk to unique server jobs which prevents log files from
- simlutaneous tests interweaving with each other.
- """
- while True:
- active_test = self.get_test()
- if active_test is None:
- break
-
- logging.info('%s running %s', self._machine, active_test)
- try:
- self.run_subcommand(active_test)
- except (error.AutoservError, error.AutotestError):
- logging.exception('Error running test "%s".', active_test)
- except Exception:
- logging.exception('Exception running test "%s".', active_test)
- raise
- finally:
- self._test_queue.task_done()
- self._tests_run += 1
-
- logging.info('%s completed %d tests.', self._machine, self._tests_run)
« no previous file with comments | « server/host_attributes.py ('k') | server/server_job_utils.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698