Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(922)

Unified Diff: bin/cros_au_test_harness.py

Issue 6277015: Passes cache location to tests and runs the tests in parallel. (Closed) Base URL: http://git.chromium.org/git/crosutils.git@master
Patch Set: Introduced a bug in refactoring Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | bin/cros_run_vm_update » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: bin/cros_au_test_harness.py
diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py
index ab71232b7ee74d129e7ab82699e2ad70dc7aca8b..e647c0fb202b806e375acc15be1c3a38c7e78418 100755
--- a/bin/cros_au_test_harness.py
+++ b/bin/cros_au_test_harness.py
@@ -15,6 +15,7 @@
import optparse
import os
import re
+import subprocess
import sys
import threading
import time
@@ -23,6 +24,7 @@ import urllib
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
from cros_build_lib import Die
+from cros_build_lib import GetIPAddress
from cros_build_lib import Info
from cros_build_lib import ReinterpretPathForChroot
from cros_build_lib import RunCommand
@@ -31,6 +33,8 @@ from cros_build_lib import Warning
import cros_test_proxy
+global dev_server_cache
+
class UpdateException(Exception):
"""Exception thrown when _UpdateImage or _UpdateUsingPayload fail"""
@@ -469,12 +473,14 @@ class RealAUTest(unittest.TestCase, AUTest):
class VirtualAUTest(unittest.TestCase, AUTest):
"""Test harness for updating virtual machines."""
- vm_image_path = None
# VM Constants.
_FULL_VDISK_SIZE = 6072
_FULL_STATEFULFS_SIZE = 3074
- _KVM_PID_FILE = '/tmp/harness_pid'
+
+ # Class variables used to acquire individual VM variables per test.
+ _vm_lock = threading.Lock()
+ _next_port = 9222
def _KillExistingVM(self, pid_file):
if os.path.exists(pid_file):
@@ -485,10 +491,22 @@ class VirtualAUTest(unittest.TestCase, AUTest):
assert not os.path.exists(pid_file)
+ def _AcquireUniquePortAndPidFile(self):
+ """Acquires unique ssh port and pid file for VM."""
+ with VirtualAUTest._vm_lock:
+ self._ssh_port = VirtualAUTest._next_port
+ self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port
+ VirtualAUTest._next_port += 1
+
def setUp(self):
"""Unit test overriden method. Is called before every test."""
AUTest.setUp(self)
- self._KillExistingVM(self._KVM_PID_FILE)
+ self.vm_image_path = None
+ self._AcquireUniquePortAndPidFile()
+ self._KillExistingVM(self._kvm_pid_file)
+
+ def tearDown(self):
+ self._KillExistingVM(self._kvm_pid_file)
@classmethod
def ProcessOptions(cls, parser, options):
@@ -527,27 +545,43 @@ class VirtualAUTest(unittest.TestCase, AUTest):
self.assertTrue(os.path.exists(self.vm_image_path))
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=None):
+ proxy_port=''):
"""Updates VM image with image_path."""
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
if src_image_path and self._first_update:
src_image_path = self.vm_image_path
self._first_update = False
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
+ # Check image payload cache first.
+ update_id = _GenerateUpdateId(target=image_path, src=src_image_path)
+ cache_path = dev_server_cache[update_id]
+ if cache_path:
+ Info('Using cache %s' % cache_path)
+ update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path)
+ cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
dgarrett 2011/01/25 03:44:02 Indention is off.
+ '--vm_image_path=%s' % self.vm_image_path,
+ '--snapshot',
+ self.graphics_flag,
+ '--persist',
+ '--kvm_pid=%s' % self._kvm_pid_file,
+ '--ssh_port=%s' % self._ssh_port,
+ stateful_change_flag,
+ '--update_url=%s' % update_url,
+ ]
+ else:
+ cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
dgarrett 2011/01/25 03:44:02 More indention is off.
'--update_image_path=%s' % image_path,
'--vm_image_path=%s' % self.vm_image_path,
'--snapshot',
self.graphics_flag,
'--persist',
- '--kvm_pid=%s' % self._KVM_PID_FILE,
+ '--kvm_pid=%s' % self._kvm_pid_file,
+ '--ssh_port=%s' % self._ssh_port,
stateful_change_flag,
'--src_image=%s' % src_image_path,
+ '--proxy_port=%s' % proxy_port
]
- if proxy_port:
- cmd.append('--proxy_port=%s' % proxy_port)
-
if self.verbose:
try:
RunCommand(cmd)
@@ -568,7 +602,8 @@ class VirtualAUTest(unittest.TestCase, AUTest):
'--snapshot',
self.graphics_flag,
'--persist',
- '--kvm_pid=%s' % self._KVM_PID_FILE,
+ '--kvm_pid=%s' % self._kvm_pid_file,
+ '--ssh_port=%s' % self._ssh_port,
stateful_change_flag,
]
@@ -594,7 +629,8 @@ class VirtualAUTest(unittest.TestCase, AUTest):
'--image_path=%s' % self.vm_image_path,
'--snapshot',
'--persist',
- '--kvm_pid=%s' % self._KVM_PID_FILE,
+ '--kvm_pid=%s' % self._kvm_pid_file,
+ '--ssh_port=%s' % self._ssh_port,
self.verify_suite,
]
@@ -613,15 +649,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest):
def setUp(self):
AUTest.setUp(self)
+ def tearDown(self):
+ pass
+
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None):
if src_image_path and self._first_update:
src_image_path = self.vm_image_path
self._first_update = False
- image_path = ReinterpretPathForChroot(image_path)
- if src_image_path:
- src_image_path = ReinterpretPathForChroot(src_image_path)
if not self.delta_list.has_key(image_path):
self.delta_list[image_path] = set([src_image_path])
else:
@@ -635,33 +671,97 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest):
class ParallelJob(threading.Thread):
- """Small wrapper for threading.Thread that releases a semaphore on exit."""
- def __init__(self, semaphore, target, args):
+ """Small wrapper for threading.Thread that releases a semaphores on exit."""
dgarrett 2011/01/25 03:44:02 Space after period.
+
+ def __init__(self, starting_semaphore, ending_semaphore, target, args):
+ """Initializes an instance of a job.
+
+ Args:
+ starting_semaphore: Semaphore used by caller to wait on such that
+ there isn't more than a certain number of threads running. Should
+ be initialized to a value for the number of threads wanting to be run
+ at a time.
+ ending_semaphore: Semaphore is released every time a job ends. Should be
dgarrett 2011/01/25 03:44:02 I think it's only one space after the ':', and sur
+ initialized to 0 before starting first job. Should be acquired once for
+ each job. Threading.Thread.join() has a bug where if the run function
+ terminates too quickly join() will hang forever.
+ target: The func to run.
+ args: Args to pass to the fun.
+ """
threading.Thread.__init__(self, target=target, args=args)
self._target = target
self._args = args
- self._semaphore = semaphore
+ self._starting_semaphore = starting_semaphore
+ self._ending_semaphore = ending_semaphore
self._output = None
self._completed = False
def run(self):
+ """Thread override. Runs the method specified and sets output."""
dgarrett 2011/01/25 03:44:02 Two spaces after period.
try:
- threading.Thread.run(self)
+ self._output = self._target(*self._args)
finally:
+ # From threading.py to avoid a refcycle.
+ del self._target, self._args
+ # Our own clean up.
self._Cleanup()
self._completed = True
def GetOutput(self):
+ """Returns the output of the method run."""
assert self._completed, 'GetOutput called before thread was run.'
return self._output
def _Cleanup(self):
- self._semaphore.release()
+ """Releases semaphores for a waiting caller."""
+ self._starting_semaphore.release()
+ self._ending_semaphore.release()
def __str__(self):
return '%s(%s)' % (self._target, self._args)
+class DevServerWrapper(threading.Thread):
+ """A Simple wrapper around a devserver instance."""
+
+ def __init__(self):
+ self.proc = None
+ threading.Thread.__init__(self)
+
+ def run(self):
+ # Kill previous running instance of devserver if it exists.
+ RunCommand(['sudo', 'pkill', '-f', 'devserver.py', ], error_ok=True,
+ print_cmd=False)
+ self.proc = subprocess.Popen(['./start_devserver',
+ '--archive_dir=./static',
+ '--client_prefix=ChromeOSUpdateEngine',
+ '--production',
+ ])
+ self.proc.communicate()
+
+ def Stop(self):
+ """Kills the devserver instance."""
+ self.proc.kill()
+
+ @classmethod
+ def GetDevServerURL(cls, port, sub_dir):
+ """Returns the dev server url for a given port and sub directory."""
+ ip_addr = GetIPAddress()
+ if not port: port = 8080
+ url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr,
+ 'port': str(port),
+ 'dir': sub_dir}
+ return url
+
+
+def _GenerateUpdateId(target, src):
+ """Returns a simple representation id of target and src paths."""
+ if src:
+ return '%s->%s' % (target, src)
+ else:
+ return target
+
+
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
"""Runs set number of specified jobs in parallel.
@@ -676,29 +776,39 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
return (x, y)
threads = []
- job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
+ job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
+ join_semaphore = threading.Semaphore(0)
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
# Create the parallel jobs.
for job, args in map(_TwoTupleize, jobs, jobs_args):
- thread = ParallelJob(job_pool_semaphore, target=job, args=args)
+ thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
+ args=args)
threads.append(thread)
# We use a semaphore to ensure we don't run more jobs that required.
# After each thread finishes, it releases (increments semaphore).
# Acquire blocks of num jobs reached and continues when a thread finishes.
for next_thread in threads:
- job_pool_semaphore.acquire(blocking=True)
- Info('Starting %s' % next_thread)
+ job_start_semaphore.acquire(blocking=True)
+ Info('Starting job %s' % next_thread)
next_thread.start()
# Wait on the rest of the threads to finish.
for thread in threads:
- thread.join()
+ join_semaphore.acquire(blocking=True)
return [thread.GetOutput() for thread in threads]
+def _PrepareTestSuite(parser, options, test_class):
+ """Returns a prepared test suite given by the options and test class."""
+ test_class.ProcessOptions(parser, options)
+ test_loader = unittest.TestLoader()
+ test_loader.testMethodPrefix = options.test_prefix
+ return test_loader.loadTestsFromTestCase(test_class)
+
+
def _PregenerateUpdates(parser, options):
"""Determines all deltas that will be generated and generates them.
@@ -708,41 +818,80 @@ def _PregenerateUpdates(parser, options):
parser: parser from main.
options: options from parsed parser.
Returns:
- Array of output from generating updates.
+ Dictionary of Update Identifiers->Relative cache locations.
"""
def _GenerateVMUpdate(target, src):
"""Generates an update using the devserver."""
- RunCommand(['sudo',
- './start_devserver',
- '--pregenerate_update',
- '--exit',
- '--image=%s' % target,
- '--src_image=%s' % src,
- '--for_vm'
- ], enter_chroot=True)
+ target = ReinterpretPathForChroot(target)
+ if src:
+ src = ReinterpretPathForChroot(src)
+
+ return RunCommand(['sudo',
+ './start_devserver',
+ '--pregenerate_update',
+ '--exit',
+ '--image=%s' % target,
+ '--src_image=%s' % src,
+ '--for_vm',
+ ], redirect_stdout=True, enter_chroot=True,
+ print_cmd=False)
# Get the list of deltas by mocking out update method in test class.
- GenerateVirtualAUDeltasTest.ProcessOptions(parser, options)
- test_loader = unittest.TestLoader()
- test_loader.testMethodPrefix = options.test_prefix
- test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest)
+ test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest)
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
Info('The following delta updates are required.')
+ update_ids = []
jobs = []
args = []
for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items():
for src in srcs:
- if src:
- print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target)
- else:
- print >> sys.stderr, 'FULL AU %s' % target
-
+ update_id = _GenerateUpdateId(target=target, src=src)
+ print >> sys.stderr, 'AU: %s' % update_id
+ update_ids.append(update_id)
jobs.append(_GenerateVMUpdate)
args.append((target, src))
- results = _RunParallelJobs(options.jobs, jobs, args)
- return results
+ raw_results = _RunParallelJobs(options.jobs, jobs, args)
+ results = []
+
+ # Parse the output.
+ key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
+ for result in raw_results:
+ for line in result.splitlines():
+ match = key_line_re.search(line)
+ if match:
+ # Convert blah/blah/update.gz -> update/blah/blah.
+ path_to_update_gz = match.group(1).rstrip()
+ (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz')
+ results.append('/'.join(['update', path_to_update_dir]))
+ break
+
+ assert len(raw_results) == len(results), \
+ 'Insufficient number cache directories returned.'
+
+ # Build the dictionary from our id's and returned cache paths.
+ cache_dictionary = {}
+ for index, id in enumerate(update_ids):
+ cache_dictionary[id] = results[index]
+
+ return cache_dictionary
+
+
+def _RunTestsInParallel(parser, options, test_class):
+ """Runs the tests given by the options and test_class in parallel."""
+ threads = []
+ args = []
+ test_suite = _PrepareTestSuite(parser, options, test_class)
+ for test in test_suite:
+ test_name = test.id()
+ test_case = unittest.TestLoader().loadTestsFromName(test_name)
+ threads.append(unittest.TextTestRunner().run)
+ args.append(test_case)
+
+ results = _RunParallelJobs(options.jobs, threads, args)
+ if not (test_result.wasSuccessful() for test_result in results):
+ Die('Test harness was not successful')
def main():
@@ -777,22 +926,28 @@ def main():
if leftover_args:
parser.error('Found extra options we do not support: %s' % leftover_args)
+ # Figure out the test_class.
if options.type == 'vm': test_class = VirtualAUTest
elif options.type == 'real': test_class = RealAUTest
else: parser.error('Could not parse harness type %s.' % options.type)
# TODO(sosa): Caching doesn't really make sense on non-vm images (yet).
- if options.type == 'vm':
- _PregenerateUpdates(parser, options)
-
- # Run the test suite.
- test_class.ProcessOptions(parser, options)
- test_loader = unittest.TestLoader()
- test_loader.testMethodPrefix = options.test_prefix
- test_suite = test_loader.loadTestsFromTestCase(test_class)
- test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- if not test_result.wasSuccessful():
- Die('Test harness was not successful.')
+ global dev_server_cache
+ if options.type == 'vm' and options.jobs > 1:
+ dev_server_cache = _PregenerateUpdates(parser, options)
+ my_server = DevServerWrapper()
+ my_server.start()
+ try:
+ _RunTestsInParallel(parser, options, test_class)
+ finally:
+ my_server.Stop()
+
+ else:
+ dev_server_cache = None
+ test_suite = _PrepareTestSuite(parser, options, test_class)
+ test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
+ if not test_result.wasSuccessful():
+ Die('Test harness was not successful.')
if __name__ == '__main__':
« no previous file with comments | « no previous file | bin/cros_run_vm_update » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698