Index: bin/cros_au_test_harness.py |
diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py |
index ab71232b7ee74d129e7ab82699e2ad70dc7aca8b..e647c0fb202b806e375acc15be1c3a38c7e78418 100755 |
--- a/bin/cros_au_test_harness.py |
+++ b/bin/cros_au_test_harness.py |
@@ -15,6 +15,7 @@ |
import optparse |
import os |
import re |
+import subprocess |
import sys |
import threading |
import time |
@@ -23,6 +24,7 @@ import urllib |
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) |
from cros_build_lib import Die |
+from cros_build_lib import GetIPAddress |
from cros_build_lib import Info |
from cros_build_lib import ReinterpretPathForChroot |
from cros_build_lib import RunCommand |
@@ -31,6 +33,8 @@ from cros_build_lib import Warning |
import cros_test_proxy |
+global dev_server_cache |
+ |
class UpdateException(Exception): |
"""Exception thrown when _UpdateImage or _UpdateUsingPayload fail""" |
@@ -469,12 +473,14 @@ class RealAUTest(unittest.TestCase, AUTest): |
class VirtualAUTest(unittest.TestCase, AUTest): |
"""Test harness for updating virtual machines.""" |
- vm_image_path = None |
# VM Constants. |
_FULL_VDISK_SIZE = 6072 |
_FULL_STATEFULFS_SIZE = 3074 |
- _KVM_PID_FILE = '/tmp/harness_pid' |
+ |
+ # Class variables used to acquire individual VM variables per test. |
+ _vm_lock = threading.Lock() |
+ _next_port = 9222 |
def _KillExistingVM(self, pid_file): |
if os.path.exists(pid_file): |
@@ -485,10 +491,22 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
assert not os.path.exists(pid_file) |
+ def _AcquireUniquePortAndPidFile(self): |
+ """Acquires unique ssh port and pid file for VM.""" |
+ with VirtualAUTest._vm_lock: |
+ self._ssh_port = VirtualAUTest._next_port |
+ self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port |
+ VirtualAUTest._next_port += 1 |
+ |
def setUp(self): |
"""Unit test overriden method. Is called before every test.""" |
AUTest.setUp(self) |
- self._KillExistingVM(self._KVM_PID_FILE) |
+ self.vm_image_path = None |
+ self._AcquireUniquePortAndPidFile() |
+ self._KillExistingVM(self._kvm_pid_file) |
+ |
+ def tearDown(self): |
+ self._KillExistingVM(self._kvm_pid_file) |
@classmethod |
def ProcessOptions(cls, parser, options): |
@@ -527,27 +545,43 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
self.assertTrue(os.path.exists(self.vm_image_path)) |
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
- proxy_port=None): |
+ proxy_port=''): |
"""Updates VM image with image_path.""" |
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) |
if src_image_path and self._first_update: |
src_image_path = self.vm_image_path |
self._first_update = False |
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
+ # Check image payload cache first. |
+ update_id = _GenerateUpdateId(target=image_path, src=src_image_path) |
+ cache_path = dev_server_cache[update_id] |
+ if cache_path: |
+ Info('Using cache %s' % cache_path) |
+ update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path) |
+ cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
dgarrett
2011/01/25 03:44:02
Indention is off.
|
+ '--vm_image_path=%s' % self.vm_image_path, |
+ '--snapshot', |
+ self.graphics_flag, |
+ '--persist', |
+ '--kvm_pid=%s' % self._kvm_pid_file, |
+ '--ssh_port=%s' % self._ssh_port, |
+ stateful_change_flag, |
+ '--update_url=%s' % update_url, |
+ ] |
+ else: |
+ cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
dgarrett
2011/01/25 03:44:02
More indention is off.
|
'--update_image_path=%s' % image_path, |
'--vm_image_path=%s' % self.vm_image_path, |
'--snapshot', |
self.graphics_flag, |
'--persist', |
- '--kvm_pid=%s' % self._KVM_PID_FILE, |
+ '--kvm_pid=%s' % self._kvm_pid_file, |
+ '--ssh_port=%s' % self._ssh_port, |
stateful_change_flag, |
'--src_image=%s' % src_image_path, |
+ '--proxy_port=%s' % proxy_port |
] |
- if proxy_port: |
- cmd.append('--proxy_port=%s' % proxy_port) |
- |
if self.verbose: |
try: |
RunCommand(cmd) |
@@ -568,7 +602,8 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
'--snapshot', |
self.graphics_flag, |
'--persist', |
- '--kvm_pid=%s' % self._KVM_PID_FILE, |
+ '--kvm_pid=%s' % self._kvm_pid_file, |
+ '--ssh_port=%s' % self._ssh_port, |
stateful_change_flag, |
] |
@@ -594,7 +629,8 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
'--image_path=%s' % self.vm_image_path, |
'--snapshot', |
'--persist', |
- '--kvm_pid=%s' % self._KVM_PID_FILE, |
+ '--kvm_pid=%s' % self._kvm_pid_file, |
+ '--ssh_port=%s' % self._ssh_port, |
self.verify_suite, |
] |
@@ -613,15 +649,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): |
def setUp(self): |
AUTest.setUp(self) |
+ def tearDown(self): |
+ pass |
+ |
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
proxy_port=None): |
if src_image_path and self._first_update: |
src_image_path = self.vm_image_path |
self._first_update = False |
- image_path = ReinterpretPathForChroot(image_path) |
- if src_image_path: |
- src_image_path = ReinterpretPathForChroot(src_image_path) |
if not self.delta_list.has_key(image_path): |
self.delta_list[image_path] = set([src_image_path]) |
else: |
@@ -635,33 +671,97 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): |
class ParallelJob(threading.Thread): |
- """Small wrapper for threading.Thread that releases a semaphore on exit.""" |
- def __init__(self, semaphore, target, args): |
+ """Small wrapper for threading.Thread that releases a semaphores on exit.""" |
dgarrett
2011/01/25 03:44:02
Space after period.
|
+ |
+ def __init__(self, starting_semaphore, ending_semaphore, target, args): |
+ """Initializes an instance of a job. |
+ |
+ Args: |
+ starting_semaphore: Semaphore used by caller to wait on such that |
+ there isn't more than a certain number of threads running. Should |
+ be initialized to a value for the number of threads wanting to be run |
+ at a time. |
+ ending_semaphore: Semaphore is released every time a job ends. Should be |
dgarrett
2011/01/25 03:44:02
I think it's only one space after the ':', and sur
|
+ initialized to 0 before starting first job. Should be acquired once for |
+ each job. Threading.Thread.join() has a bug where if the run function |
+ terminates too quickly join() will hang forever. |
+ target: The func to run. |
+ args: Args to pass to the fun. |
+ """ |
threading.Thread.__init__(self, target=target, args=args) |
self._target = target |
self._args = args |
- self._semaphore = semaphore |
+ self._starting_semaphore = starting_semaphore |
+ self._ending_semaphore = ending_semaphore |
self._output = None |
self._completed = False |
def run(self): |
+ """Thread override. Runs the method specified and sets output.""" |
dgarrett
2011/01/25 03:44:02
Two spaces after period.
|
try: |
- threading.Thread.run(self) |
+ self._output = self._target(*self._args) |
finally: |
+ # From threading.py to avoid a refcycle. |
+ del self._target, self._args |
+ # Our own clean up. |
self._Cleanup() |
self._completed = True |
def GetOutput(self): |
+ """Returns the output of the method run.""" |
assert self._completed, 'GetOutput called before thread was run.' |
return self._output |
def _Cleanup(self): |
- self._semaphore.release() |
+ """Releases semaphores for a waiting caller.""" |
+ self._starting_semaphore.release() |
+ self._ending_semaphore.release() |
def __str__(self): |
return '%s(%s)' % (self._target, self._args) |
+class DevServerWrapper(threading.Thread): |
+ """A Simple wrapper around a devserver instance.""" |
+ |
+ def __init__(self): |
+ self.proc = None |
+ threading.Thread.__init__(self) |
+ |
+ def run(self): |
+ # Kill previous running instance of devserver if it exists. |
+ RunCommand(['sudo', 'pkill', '-f', 'devserver.py', ], error_ok=True, |
+ print_cmd=False) |
+ self.proc = subprocess.Popen(['./start_devserver', |
+ '--archive_dir=./static', |
+ '--client_prefix=ChromeOSUpdateEngine', |
+ '--production', |
+ ]) |
+ self.proc.communicate() |
+ |
+ def Stop(self): |
+ """Kills the devserver instance.""" |
+ self.proc.kill() |
+ |
+ @classmethod |
+ def GetDevServerURL(cls, port, sub_dir): |
+ """Returns the dev server url for a given port and sub directory.""" |
+ ip_addr = GetIPAddress() |
+ if not port: port = 8080 |
+ url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr, |
+ 'port': str(port), |
+ 'dir': sub_dir} |
+ return url |
+ |
+ |
+def _GenerateUpdateId(target, src): |
+ """Returns a simple representation id of target and src paths.""" |
+ if src: |
+ return '%s->%s' % (target, src) |
+ else: |
+ return target |
+ |
+ |
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): |
"""Runs set number of specified jobs in parallel. |
@@ -676,29 +776,39 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): |
return (x, y) |
threads = [] |
- job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
+ job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
+ join_semaphore = threading.Semaphore(0) |
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
# Create the parallel jobs. |
for job, args in map(_TwoTupleize, jobs, jobs_args): |
- thread = ParallelJob(job_pool_semaphore, target=job, args=args) |
+ thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, |
+ args=args) |
threads.append(thread) |
# We use a semaphore to ensure we don't run more jobs that required. |
# After each thread finishes, it releases (increments semaphore). |
# Acquire blocks of num jobs reached and continues when a thread finishes. |
for next_thread in threads: |
- job_pool_semaphore.acquire(blocking=True) |
- Info('Starting %s' % next_thread) |
+ job_start_semaphore.acquire(blocking=True) |
+ Info('Starting job %s' % next_thread) |
next_thread.start() |
# Wait on the rest of the threads to finish. |
for thread in threads: |
- thread.join() |
+ join_semaphore.acquire(blocking=True) |
return [thread.GetOutput() for thread in threads] |
+def _PrepareTestSuite(parser, options, test_class): |
+ """Returns a prepared test suite given by the options and test class.""" |
+ test_class.ProcessOptions(parser, options) |
+ test_loader = unittest.TestLoader() |
+ test_loader.testMethodPrefix = options.test_prefix |
+ return test_loader.loadTestsFromTestCase(test_class) |
+ |
+ |
def _PregenerateUpdates(parser, options): |
"""Determines all deltas that will be generated and generates them. |
@@ -708,41 +818,80 @@ def _PregenerateUpdates(parser, options): |
parser: parser from main. |
options: options from parsed parser. |
Returns: |
- Array of output from generating updates. |
+ Dictionary of Update Identifiers->Relative cache locations. |
""" |
def _GenerateVMUpdate(target, src): |
"""Generates an update using the devserver.""" |
- RunCommand(['sudo', |
- './start_devserver', |
- '--pregenerate_update', |
- '--exit', |
- '--image=%s' % target, |
- '--src_image=%s' % src, |
- '--for_vm' |
- ], enter_chroot=True) |
+ target = ReinterpretPathForChroot(target) |
+ if src: |
+ src = ReinterpretPathForChroot(src) |
+ |
+ return RunCommand(['sudo', |
+ './start_devserver', |
+ '--pregenerate_update', |
+ '--exit', |
+ '--image=%s' % target, |
+ '--src_image=%s' % src, |
+ '--for_vm', |
+ ], redirect_stdout=True, enter_chroot=True, |
+ print_cmd=False) |
# Get the list of deltas by mocking out update method in test class. |
- GenerateVirtualAUDeltasTest.ProcessOptions(parser, options) |
- test_loader = unittest.TestLoader() |
- test_loader.testMethodPrefix = options.test_prefix |
- test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest) |
+ test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest) |
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) |
Info('The following delta updates are required.') |
+ update_ids = [] |
jobs = [] |
args = [] |
for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items(): |
for src in srcs: |
- if src: |
- print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target) |
- else: |
- print >> sys.stderr, 'FULL AU %s' % target |
- |
+ update_id = _GenerateUpdateId(target=target, src=src) |
+ print >> sys.stderr, 'AU: %s' % update_id |
+ update_ids.append(update_id) |
jobs.append(_GenerateVMUpdate) |
args.append((target, src)) |
- results = _RunParallelJobs(options.jobs, jobs, args) |
- return results |
+ raw_results = _RunParallelJobs(options.jobs, jobs, args) |
+ results = [] |
+ |
+ # Parse the output. |
+ key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') |
+ for result in raw_results: |
+ for line in result.splitlines(): |
+ match = key_line_re.search(line) |
+ if match: |
+ # Convert blah/blah/update.gz -> update/blah/blah. |
+ path_to_update_gz = match.group(1).rstrip() |
+ (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz') |
+ results.append('/'.join(['update', path_to_update_dir])) |
+ break |
+ |
+ assert len(raw_results) == len(results), \ |
+ 'Insufficient number cache directories returned.' |
+ |
+ # Build the dictionary from our id's and returned cache paths. |
+ cache_dictionary = {} |
+ for index, id in enumerate(update_ids): |
+ cache_dictionary[id] = results[index] |
+ |
+ return cache_dictionary |
+ |
+ |
+def _RunTestsInParallel(parser, options, test_class): |
+ """Runs the tests given by the options and test_class in parallel.""" |
+ threads = [] |
+ args = [] |
+ test_suite = _PrepareTestSuite(parser, options, test_class) |
+ for test in test_suite: |
+ test_name = test.id() |
+ test_case = unittest.TestLoader().loadTestsFromName(test_name) |
+ threads.append(unittest.TextTestRunner().run) |
+ args.append(test_case) |
+ |
+ results = _RunParallelJobs(options.jobs, threads, args) |
+ if not (test_result.wasSuccessful() for test_result in results): |
+ Die('Test harness was not successful') |
def main(): |
@@ -777,22 +926,28 @@ def main(): |
if leftover_args: |
parser.error('Found extra options we do not support: %s' % leftover_args) |
+ # Figure out the test_class. |
if options.type == 'vm': test_class = VirtualAUTest |
elif options.type == 'real': test_class = RealAUTest |
else: parser.error('Could not parse harness type %s.' % options.type) |
# TODO(sosa): Caching doesn't really make sense on non-vm images (yet). |
- if options.type == 'vm': |
- _PregenerateUpdates(parser, options) |
- |
- # Run the test suite. |
- test_class.ProcessOptions(parser, options) |
- test_loader = unittest.TestLoader() |
- test_loader.testMethodPrefix = options.test_prefix |
- test_suite = test_loader.loadTestsFromTestCase(test_class) |
- test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
- if not test_result.wasSuccessful(): |
- Die('Test harness was not successful.') |
+ global dev_server_cache |
+ if options.type == 'vm' and options.jobs > 1: |
+ dev_server_cache = _PregenerateUpdates(parser, options) |
+ my_server = DevServerWrapper() |
+ my_server.start() |
+ try: |
+ _RunTestsInParallel(parser, options, test_class) |
+ finally: |
+ my_server.Stop() |
+ |
+ else: |
+ dev_server_cache = None |
+ test_suite = _PrepareTestSuite(parser, options, test_class) |
+ test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
+ if not test_result.wasSuccessful(): |
+ Die('Test harness was not successful.') |
if __name__ == '__main__': |