Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1456)

Unified Diff: server/server_job_utils.py

Issue 6677157: Preparing to upstream: Created host_attributes and server_job_utils files. (Closed) Base URL: ssh://gitrw.chromium.org:9222/autotest.git@master
Patch Set: Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/server_job.py ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/server_job_utils.py
diff --git a/server/server_job_utils.py b/server/server_job_utils.py
new file mode 100644
index 0000000000000000000000000000000000000000..fab956e2fc29fae5c64305d5039f54677aa11e9b
--- /dev/null
+++ b/server/server_job_utils.py
@@ -0,0 +1,190 @@
+# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""
+Utility classes used by server_job.distribute_across_machines().
+
+test_item: extends the basic test tuple to add include/exclude attributes.
+
+machine_worker: is a thread that manages running tests on a host. It
+ verifies test are valid for a host using the test attributes from test_item
+ and the host attributes from host_attributes.
+"""
+
+
+import logging, os, Queue, threading
+from autotest_lib.client.common_lib import error, utils
+from autotest_lib.server import autotest, hosts, host_attributes, subcommand
+
+
+class test_item(object):
+ """Adds machine verification logic to the basic test tuple.
+
+ Tests can either be tuples of the existing form ('testName', {args}) or the
+ extended for of ('testname', {args}, ['include'], ['exclude']) where include
+ and exclude are lists of attributes. A machine must have all the attributes
+ in include and must not have any of the attributes in exclude to be valid
+ for the test.
+ """
+
+ def __init__(self, test_name, test_args, include_attribs=None,
+ exclude_attribs=None):
+ """Creates an instance of test_item.
+
+ Args:
+ test_name: string, name of test to execute.
+ test_args: dictionary, arguments to pass into test.
+ include_attribs: attributes a machine must have to run test.
+ exclude_attribs: attributes preventing a machine from running test.
+ """
+ self.test_name = test_name
+ self.test_args = test_args
+ self.inc_set = None
+ if include_attribs is not None:
+ self.inc_set = set(include_attribs)
+ self.exc_set = None
+ if exclude_attribs is not None:
+ self.exc_set = set(exclude_attribs)
+
+ def __str__(self):
+ """Return an info string of this test."""
+ params = ['%s=%s' % (k, v) for k, v in self.test_args.items()]
+ msg = '%s(%s)' % (self.test_name, params)
+ if self.inc_set: msg += ' include=%s' % [s for s in self.inc_set]
+ if self.exc_set: msg += ' exclude=%s' % [s for s in self.exc_set]
+ return msg
+
+ def validate(self, machine_attributes):
+ """Check if this test can run on machine with machine_attributes.
+
+ If the test has include attributes, a candidate machine must have all
+ the attributes to be valid.
+
+ If the test has exclude attributes, a candidate machine cannot have any
+ of the attributes to be valid.
+
+ Args:
+ machine_attributes: set, True attributes of candidate machine.
+
+ Returns:
+ True/False if the machine is valid for this test.
+ """
+ if self.inc_set is not None:
+ if not self.inc_set <= machine_attributes: return False
+ if self.exc_set is not None:
+ if self.exc_set & machine_attributes: return False
+ return True
+
+
+class machine_worker(threading.Thread):
+ """Thread that runs tests on a remote host machine."""
+
+ def __init__(self, machine, work_dir, test_queue, queue_lock):
+ """Creates an instance of machine_worker to run tests on a remote host.
+
+ Retrieves that host attributes for this machine and creates the set of
+ True attributes to validate against test include/exclude attributes.
+
+ Creates a directory to hold the log files for tests run and writes the
+ hostname and tko parser version into keyvals file.
+
+ Args:
+ machine: name of remote host.
+ work_dir: directory server job is using.
+ test_queue: queue of tests.
+ queue_lock: lock protecting test_queue.
+ """
+ threading.Thread.__init__(self)
+ self._test_queue = test_queue
+ self._test_queue_lock = queue_lock
+ self._tests_run = 0
+ self._machine = machine
+ self._host = hosts.create_host(self._machine)
+ self._client_at = autotest.Autotest(self._host)
+ client_attributes = host_attributes.host_attributes(machine)
+ self.attribute_set = set([key for key, value in
+ client_attributes.__dict__.items() if value])
+ self._results_dir = os.path.join(work_dir, self._machine)
+ if not os.path.exists(self._results_dir):
+ os.makedirs(self._results_dir)
+ machine_data = {'hostname': self._machine,
+ 'status_version': str(1)}
+ utils.write_keyval(self._results_dir, machine_data)
+
+ def __str__(self):
+ attributes = [a for a in self.attribute_set]
+ return '%s attributes=%s' % (self._machine, attributes)
+
+ def get_test(self):
+ """Return a test from the queue to run on this host.
+
+ The test queue can be non-empty, but still not contain a test that is
+ valid for this machine. This function will take exclusive access to
+ the queue via _test_queue_lock and repeatedly pop tests off the queue
+ until finding a valid test or depleting the queue. In either case if
+ invalid tests have been popped from the queue, they are pushed back
+ onto the queue before returning.
+
+ Returns:
+ test_item, or None if no more tests exist for this machine.
+ """
+ good_test = None
+ skipped_tests = []
+
+ with self._test_queue_lock:
+ while True:
+ try:
+ canidate_test = self._test_queue.get_nowait()
+ # Check if test is valid for this machine.
+ if canidate_test.validate(self.attribute_set):
+ good_test = canidate_test
+ break
+ skipped_tests.append(canidate_test)
+
+ except Queue.Empty:
+ break
+
+ # Return any skipped tests to the queue.
+ for st in skipped_tests:
+ self._test_queue.put(st)
+
+ return good_test
+
+ def run_subcommand(self, active_test):
+ """Use subcommand to fork process and execute test."""
+ sub_cmd = subcommand.subcommand(self.subcommand_wrapper, [active_test])
+ sub_cmd.fork_start()
+ sub_cmd.fork_waitfor()
+
+ def subcommand_wrapper(self, active_test):
+ """Callback for subcommand to call into with the test parameter."""
+ self._client_at.run_test(active_test.test_name,
+ results_dir=self._results_dir,
+ **active_test.test_args)
+
+ def run(self):
+ """Executes tests on host machine.
+
+ Uses subprocess to fork the process when running tests so unique client
+ jobs talk to unique server jobs which prevents log files from
+ simultaneous tests interweaving with each other.
+ """
+ while True:
+ active_test = self.get_test()
+ if active_test is None:
+ break
+
+ logging.info('%s running %s', self._machine, active_test)
+ try:
+ self.run_subcommand(active_test)
+ except (error.AutoservError, error.AutotestError):
+ logging.exception('Error running test "%s".', active_test)
+ except Exception:
+ logging.exception('Exception running test "%s".', active_test)
+ raise
+ finally:
+ self._test_queue.task_done()
+ self._tests_run += 1
+
+ logging.info('%s completed %d tests.', self._machine, self._tests_run)
« no previous file with comments | « server/server_job.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698