Chromium Code Reviews| Index: bin/cros_au_test_harness.py |
| diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py |
| index ab71232b7ee74d129e7ab82699e2ad70dc7aca8b..b85fbb7ddb298c5b2cf6f0da5d31aef9b166f453 100755 |
| --- a/bin/cros_au_test_harness.py |
| +++ b/bin/cros_au_test_harness.py |
| @@ -15,6 +15,7 @@ |
| import optparse |
| import os |
| import re |
| +import subprocess |
| import sys |
| import threading |
| import time |
| @@ -23,6 +24,7 @@ import urllib |
| sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) |
| from cros_build_lib import Die |
| +from cros_build_lib import GetIPAddress |
| from cros_build_lib import Info |
| from cros_build_lib import ReinterpretPathForChroot |
| from cros_build_lib import RunCommand |
| @@ -31,6 +33,8 @@ from cros_build_lib import Warning |
| import cros_test_proxy |
| +global dev_server_cache |
| + |
| class UpdateException(Exception): |
| """Exception thrown when _UpdateImage or _UpdateUsingPayload fail""" |
| @@ -469,12 +473,14 @@ class RealAUTest(unittest.TestCase, AUTest): |
| class VirtualAUTest(unittest.TestCase, AUTest): |
| """Test harness for updating virtual machines.""" |
| - vm_image_path = None |
| # VM Constants. |
| _FULL_VDISK_SIZE = 6072 |
| _FULL_STATEFULFS_SIZE = 3074 |
| - _KVM_PID_FILE = '/tmp/harness_pid' |
| + |
| + # Class variables used to acquire individual VM variables per test. |
| + _vm_lock = threading.Lock() |
| + _next_port = 9222 |
| def _KillExistingVM(self, pid_file): |
| if os.path.exists(pid_file): |
| @@ -485,10 +491,22 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
| assert not os.path.exists(pid_file) |
| + def _AcquireUniquePortAndPidFile(self): |
| + """Acquires unique ssh port and pid file for VM.""" |
| + with VirtualAUTest._vm_lock: |
| + self._ssh_port = VirtualAUTest._next_port |
| + self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port |
| + VirtualAUTest._next_port += 1 |
| + |
| def setUp(self): |
| """Unit test overriden method. Is called before every test.""" |
| AUTest.setUp(self) |
| - self._KillExistingVM(self._KVM_PID_FILE) |
| + self.vm_image_path = None |
| + self._AcquireUniquePortAndPidFile() |
| + self._KillExistingVM(self._kvm_pid_file) |
| + |
| + def tearDown(self): |
| + self._KillExistingVM(self._kvm_pid_file) |
| @classmethod |
| def ProcessOptions(cls, parser, options): |
| @@ -527,26 +545,42 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
| self.assertTrue(os.path.exists(self.vm_image_path)) |
| def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
| - proxy_port=None): |
| + proxy_port=''): |
| """Updates VM image with image_path.""" |
| stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) |
| if src_image_path and self._first_update: |
| src_image_path = self.vm_image_path |
| self._first_update = False |
| - cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
| - '--update_image_path=%s' % image_path, |
| - '--vm_image_path=%s' % self.vm_image_path, |
| - '--snapshot', |
| - self.graphics_flag, |
| - '--persist', |
| - '--kvm_pid=%s' % self._KVM_PID_FILE, |
| - stateful_change_flag, |
| - '--src_image=%s' % src_image_path, |
| - ] |
| - |
| - if proxy_port: |
| - cmd.append('--proxy_port=%s' % proxy_port) |
| + # Check image payload cache first. |
| + update_id = _GenerateUpdateId(target=image_path, src=src_image_path) |
| + cache_path = dev_server_cache[update_id] |
| + if cache_path: |
| + Info('Using cache %s' % cache_path) |
| + update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path) |
| + cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
| + '--vm_image_path=%s' % self.vm_image_path, |
| + '--snapshot', |
| + self.graphics_flag, |
| + '--persist', |
| + '--kvm_pid=%s' % self._kvm_pid_file, |
| + '--ssh_port=%s' % self._ssh_port, |
| + stateful_change_flag, |
| + '--update_url=%s' % update_url, |
| + ] |
| + else: |
| + cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
| + '--update_image_path=%s' % image_path, |
| + '--vm_image_path=%s' % self.vm_image_path, |
| + '--snapshot', |
| + self.graphics_flag, |
| + '--persist', |
| + '--kvm_pid=%s' % self._kvm_pid_file, |
| + '--ssh_port=%s' % self._ssh_port, |
| + stateful_change_flag, |
| + '--src_image=%s' % src_image_path, |
| + '--proxy_port=%s' % proxy_port |
| + ] |
| if self.verbose: |
| try: |
| @@ -568,7 +602,8 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
| '--snapshot', |
| self.graphics_flag, |
| '--persist', |
| - '--kvm_pid=%s' % self._KVM_PID_FILE, |
| + '--kvm_pid=%s' % self._kvm_pid_file, |
| + '--ssh_port=%s' % self._ssh_port, |
| stateful_change_flag, |
| ] |
| @@ -594,7 +629,8 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
| '--image_path=%s' % self.vm_image_path, |
| '--snapshot', |
| '--persist', |
| - '--kvm_pid=%s' % self._KVM_PID_FILE, |
| + '--kvm_pid=%s' % self._kvm_pid_file, |
| + '--ssh_port=%s' % self._ssh_port, |
| self.verify_suite, |
| ] |
| @@ -613,15 +649,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): |
| def setUp(self): |
| AUTest.setUp(self) |
| + def tearDown(self): |
| + pass |
| + |
| def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
| proxy_port=None): |
| if src_image_path and self._first_update: |
| src_image_path = self.vm_image_path |
| self._first_update = False |
| - image_path = ReinterpretPathForChroot(image_path) |
| - if src_image_path: |
| - src_image_path = ReinterpretPathForChroot(src_image_path) |
| if not self.delta_list.has_key(image_path): |
| self.delta_list[image_path] = set([src_image_path]) |
| else: |
| @@ -635,33 +671,98 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): |
| class ParallelJob(threading.Thread): |
| - """Small wrapper for threading.Thread that releases a semaphore on exit.""" |
| - def __init__(self, semaphore, target, args): |
| + """Small wrapper for threading. Thread that releases a semaphores on exit.""" |
| + |
| + def __init__(self, starting_semaphore, ending_semaphore, target, args): |
| + """Initializes an instance of a job. |
| + |
| + Args: |
| + starting_semaphore: Semaphore used by caller to wait on such that |
| + there isn't more than a certain number of threads running. Should |
| + be initialized to a value for the number of threads wanting to be run |
| + at a time. |
| + ending_semaphore: Semaphore is released every time a job ends. Should be |
| + initialized to 0 before starting first job. Should be acquired once for |
| + each job. Threading.Thread.join() has a bug where if the run function |
| + terminates too quickly join() will hang forever. |
| + target: The func to run. |
| + args: Args to pass to the fun. |
| + """ |
| threading.Thread.__init__(self, target=target, args=args) |
| self._target = target |
| self._args = args |
| - self._semaphore = semaphore |
| + self._starting_semaphore = starting_semaphore |
| + self._ending_semaphore = ending_semaphore |
| self._output = None |
| self._completed = False |
| def run(self): |
| + """Thread override. Runs the method specified and sets output.""" |
| try: |
| - threading.Thread.run(self) |
| + self._output = self._target(*self._args) |
| finally: |
| + # From threading.py to avoid a refcycle. |
| + del self._target, self._args |
| + # Our own clean up. |
| self._Cleanup() |
| self._completed = True |
| def GetOutput(self): |
| + """Returns the output of the method run.""" |
| assert self._completed, 'GetOutput called before thread was run.' |
| return self._output |
| def _Cleanup(self): |
| - self._semaphore.release() |
| + """Releases semaphores for a waiting caller.""" |
| + self._starting_semaphore.release() |
| + self._ending_semaphore.release() |
| def __str__(self): |
| return '%s(%s)' % (self._target, self._args) |
| +class DevServerWrapper(threading.Thread): |
| + """A Simple wrapper around a devserver instance.""" |
| + |
| + def __init__(self): |
| + self.proc = None |
| + threading.Thread.__init__(self) |
| + |
| + def run(self): |
| + # Kill previous running instance of devserver if it exists. |
| + RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True, |
| + print_cmd=False) |
| + RunCommand(['sudo', |
|
dgarrett
2011/01/26 19:19:16
Should this also be error_ok, to prevent nasty log
sosa
2011/01/26 19:22:18
I thought maybe too ... but it seems that cherrypy
|
| + './start_devserver', |
| + '--archive_dir=./static', |
| + '--client_prefix=ChromeOSUpdateEngine', |
| + '--production', |
| + ], enter_chroot=True, print_cmd=False) |
| + |
| + def Stop(self): |
| + """Kills the devserver instance.""" |
| + RunCommand(['sudo', 'pkill', '-f', 'devserver.py'], error_ok=True, |
| + print_cmd=False) |
| + |
| + @classmethod |
| + def GetDevServerURL(cls, port, sub_dir): |
| + """Returns the dev server url for a given port and sub directory.""" |
| + ip_addr = GetIPAddress() |
| + if not port: port = 8080 |
| + url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr, |
| + 'port': str(port), |
| + 'dir': sub_dir} |
| + return url |
| + |
| + |
| +def _GenerateUpdateId(target, src): |
| + """Returns a simple representation id of target and src paths.""" |
| + if src: |
| + return '%s->%s' % (target, src) |
| + else: |
| + return target |
| + |
| + |
| def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): |
| """Runs set number of specified jobs in parallel. |
| @@ -676,29 +777,39 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): |
| return (x, y) |
| threads = [] |
| - job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
| + job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
| + join_semaphore = threading.Semaphore(0) |
| assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
| # Create the parallel jobs. |
| for job, args in map(_TwoTupleize, jobs, jobs_args): |
| - thread = ParallelJob(job_pool_semaphore, target=job, args=args) |
| + thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, |
| + args=args) |
| threads.append(thread) |
| # We use a semaphore to ensure we don't run more jobs that required. |
| # After each thread finishes, it releases (increments semaphore). |
| # Acquire blocks of num jobs reached and continues when a thread finishes. |
| for next_thread in threads: |
| - job_pool_semaphore.acquire(blocking=True) |
| - Info('Starting %s' % next_thread) |
| + job_start_semaphore.acquire(blocking=True) |
| + Info('Starting job %s' % next_thread) |
| next_thread.start() |
| # Wait on the rest of the threads to finish. |
| for thread in threads: |
| - thread.join() |
| + join_semaphore.acquire(blocking=True) |
| return [thread.GetOutput() for thread in threads] |
| +def _PrepareTestSuite(parser, options, test_class): |
| + """Returns a prepared test suite given by the options and test class.""" |
| + test_class.ProcessOptions(parser, options) |
| + test_loader = unittest.TestLoader() |
| + test_loader.testMethodPrefix = options.test_prefix |
| + return test_loader.loadTestsFromTestCase(test_class) |
| + |
| + |
| def _PregenerateUpdates(parser, options): |
| """Determines all deltas that will be generated and generates them. |
| @@ -708,41 +819,80 @@ def _PregenerateUpdates(parser, options): |
| parser: parser from main. |
| options: options from parsed parser. |
| Returns: |
| - Array of output from generating updates. |
| + Dictionary of Update Identifiers->Relative cache locations. |
| """ |
| def _GenerateVMUpdate(target, src): |
| """Generates an update using the devserver.""" |
| - RunCommand(['sudo', |
| - './start_devserver', |
| - '--pregenerate_update', |
| - '--exit', |
| - '--image=%s' % target, |
| - '--src_image=%s' % src, |
| - '--for_vm' |
| - ], enter_chroot=True) |
| + target = ReinterpretPathForChroot(target) |
| + if src: |
| + src = ReinterpretPathForChroot(src) |
| + |
| + return RunCommand(['sudo', |
| + './start_devserver', |
| + '--pregenerate_update', |
| + '--exit', |
| + '--image=%s' % target, |
| + '--src_image=%s' % src, |
| + '--for_vm', |
| + ], redirect_stdout=True, enter_chroot=True, |
| + print_cmd=False) |
| # Get the list of deltas by mocking out update method in test class. |
| - GenerateVirtualAUDeltasTest.ProcessOptions(parser, options) |
| - test_loader = unittest.TestLoader() |
| - test_loader.testMethodPrefix = options.test_prefix |
| - test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest) |
| + test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest) |
| test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) |
| Info('The following delta updates are required.') |
| + update_ids = [] |
| jobs = [] |
| args = [] |
| for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items(): |
| for src in srcs: |
| - if src: |
| - print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target) |
| - else: |
| - print >> sys.stderr, 'FULL AU %s' % target |
| - |
| + update_id = _GenerateUpdateId(target=target, src=src) |
| + print >> sys.stderr, 'AU: %s' % update_id |
| + update_ids.append(update_id) |
| jobs.append(_GenerateVMUpdate) |
| args.append((target, src)) |
| - results = _RunParallelJobs(options.jobs, jobs, args) |
| - return results |
| + raw_results = _RunParallelJobs(options.jobs, jobs, args) |
| + results = [] |
| + |
| + # Parse the output. |
| + key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') |
| + for result in raw_results: |
| + for line in result.splitlines(): |
| + match = key_line_re.search(line) |
| + if match: |
| + # Convert blah/blah/update.gz -> update/blah/blah. |
| + path_to_update_gz = match.group(1).rstrip() |
| + (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz') |
| + results.append('/'.join(['update', path_to_update_dir])) |
| + break |
| + |
| + assert len(raw_results) == len(results), \ |
| + 'Insufficient number cache directories returned.' |
| + |
| + # Build the dictionary from our id's and returned cache paths. |
| + cache_dictionary = {} |
| + for index, id in enumerate(update_ids): |
| + cache_dictionary[id] = results[index] |
| + |
| + return cache_dictionary |
| + |
| + |
| +def _RunTestsInParallel(parser, options, test_class): |
| + """Runs the tests given by the options and test_class in parallel.""" |
| + threads = [] |
| + args = [] |
| + test_suite = _PrepareTestSuite(parser, options, test_class) |
| + for test in test_suite: |
| + test_name = test.id() |
| + test_case = unittest.TestLoader().loadTestsFromName(test_name) |
| + threads.append(unittest.TextTestRunner().run) |
| + args.append(test_case) |
| + |
| + results = _RunParallelJobs(options.jobs, threads, args) |
| + if not (test_result.wasSuccessful() for test_result in results): |
| + Die('Test harness was not successful') |
| def main(): |
| @@ -777,22 +927,28 @@ def main(): |
| if leftover_args: |
| parser.error('Found extra options we do not support: %s' % leftover_args) |
| + # Figure out the test_class. |
| if options.type == 'vm': test_class = VirtualAUTest |
| elif options.type == 'real': test_class = RealAUTest |
| else: parser.error('Could not parse harness type %s.' % options.type) |
| # TODO(sosa): Caching doesn't really make sense on non-vm images (yet). |
| - if options.type == 'vm': |
| - _PregenerateUpdates(parser, options) |
| - |
| - # Run the test suite. |
| - test_class.ProcessOptions(parser, options) |
| - test_loader = unittest.TestLoader() |
| - test_loader.testMethodPrefix = options.test_prefix |
| - test_suite = test_loader.loadTestsFromTestCase(test_class) |
| - test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
| - if not test_result.wasSuccessful(): |
| - Die('Test harness was not successful.') |
| + global dev_server_cache |
| + if options.type == 'vm' and options.jobs > 1: |
| + dev_server_cache = _PregenerateUpdates(parser, options) |
| + my_server = DevServerWrapper() |
| + my_server.start() |
| + try: |
| + _RunTestsInParallel(parser, options, test_class) |
| + finally: |
| + my_server.Stop() |
| + |
| + else: |
| + dev_server_cache = None |
| + test_suite = _PrepareTestSuite(parser, options, test_class) |
| + test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
| + if not test_result.wasSuccessful(): |
| + Die('Test harness was not successful.') |
| if __name__ == '__main__': |