Index: bin/cros_au_test_harness.py |
diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py |
index e230d2a2656b722506e7341920364753ac40f2f1..ab71232b7ee74d129e7ab82699e2ad70dc7aca8b 100755 |
--- a/bin/cros_au_test_harness.py |
+++ b/bin/cros_au_test_harness.py |
@@ -15,7 +15,6 @@ |
import optparse |
import os |
import re |
-import subprocess |
import sys |
import threading |
import time |
@@ -24,7 +23,6 @@ import urllib |
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib')) |
from cros_build_lib import Die |
-from cros_build_lib import GetIPAddress |
from cros_build_lib import Info |
from cros_build_lib import ReinterpretPathForChroot |
from cros_build_lib import RunCommand |
@@ -33,8 +31,6 @@ from cros_build_lib import Warning |
import cros_test_proxy |
-global dev_server_cache |
- |
class UpdateException(Exception): |
"""Exception thrown when _UpdateImage or _UpdateUsingPayload fail""" |
@@ -473,14 +469,12 @@ class RealAUTest(unittest.TestCase, AUTest): |
class VirtualAUTest(unittest.TestCase, AUTest): |
"""Test harness for updating virtual machines.""" |
+ vm_image_path = None |
# VM Constants. |
_FULL_VDISK_SIZE = 6072 |
_FULL_STATEFULFS_SIZE = 3074 |
- |
- # Class variables used to acquire individual VM variables per test. |
- _vm_lock = threading.Lock() |
- _next_port = 9222 |
+ _KVM_PID_FILE = '/tmp/harness_pid' |
def _KillExistingVM(self, pid_file): |
if os.path.exists(pid_file): |
@@ -491,22 +485,10 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
assert not os.path.exists(pid_file) |
- def _AcquireUniquePortAndPidFile(self): |
- """Acquires unique ssh port and pid file for VM.""" |
- with VirtualAUTest._vm_lock: |
- self._ssh_port = VirtualAUTest._next_port |
- self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port |
- VirtualAUTest._next_port += 1 |
- |
def setUp(self): |
"""Unit test overriden method. Is called before every test.""" |
AUTest.setUp(self) |
- self.vm_image_path = None |
- self._AcquireUniquePortAndPidFile() |
- self._KillExistingVM(self._kvm_pid_file) |
- |
- def tearDown(self): |
- self._KillExistingVM(self._kvm_pid_file) |
+ self._KillExistingVM(self._KVM_PID_FILE) |
@classmethod |
def ProcessOptions(cls, parser, options): |
@@ -545,42 +527,26 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
self.assertTrue(os.path.exists(self.vm_image_path)) |
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
- proxy_port=''): |
+ proxy_port=None): |
"""Updates VM image with image_path.""" |
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change) |
if src_image_path and self._first_update: |
src_image_path = self.vm_image_path |
self._first_update = False |
- # Check image payload cache first. |
- update_id = _GenerateUpdateId(target=image_path, src=src_image_path) |
- cache_path = dev_server_cache[update_id] |
- if cache_path: |
- Info('Using cache %s' % cache_path) |
- update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path) |
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
- '--vm_image_path=%s' % self.vm_image_path, |
- '--snapshot', |
- self.graphics_flag, |
- '--persist', |
- '--kvm_pid=%s' % self._kvm_pid_file, |
- '--ssh_port=%s' % self._ssh_port, |
- stateful_change_flag, |
- '--update_url=%s' % update_url, |
- ] |
- else: |
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
- '--update_image_path=%s' % image_path, |
- '--vm_image_path=%s' % self.vm_image_path, |
- '--snapshot', |
- self.graphics_flag, |
- '--persist', |
- '--kvm_pid=%s' % self._kvm_pid_file, |
- '--ssh_port=%s' % self._ssh_port, |
- stateful_change_flag, |
- '--src_image=%s' % src_image_path, |
- '--proxy_port=%s' % proxy_port |
- ] |
+ cmd = ['%s/cros_run_vm_update' % self.crosutilsbin, |
+ '--update_image_path=%s' % image_path, |
+ '--vm_image_path=%s' % self.vm_image_path, |
+ '--snapshot', |
+ self.graphics_flag, |
+ '--persist', |
+ '--kvm_pid=%s' % self._KVM_PID_FILE, |
+ stateful_change_flag, |
+ '--src_image=%s' % src_image_path, |
+ ] |
+ |
+ if proxy_port: |
+ cmd.append('--proxy_port=%s' % proxy_port) |
if self.verbose: |
try: |
@@ -602,8 +568,7 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
'--snapshot', |
self.graphics_flag, |
'--persist', |
- '--kvm_pid=%s' % self._kvm_pid_file, |
- '--ssh_port=%s' % self._ssh_port, |
+ '--kvm_pid=%s' % self._KVM_PID_FILE, |
stateful_change_flag, |
] |
@@ -629,8 +594,7 @@ class VirtualAUTest(unittest.TestCase, AUTest): |
'--image_path=%s' % self.vm_image_path, |
'--snapshot', |
'--persist', |
- '--kvm_pid=%s' % self._kvm_pid_file, |
- '--ssh_port=%s' % self._ssh_port, |
+ '--kvm_pid=%s' % self._KVM_PID_FILE, |
self.verify_suite, |
] |
@@ -649,15 +613,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): |
def setUp(self): |
AUTest.setUp(self) |
- def tearDown(self): |
- pass |
- |
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old', |
proxy_port=None): |
if src_image_path and self._first_update: |
src_image_path = self.vm_image_path |
self._first_update = False |
+ image_path = ReinterpretPathForChroot(image_path) |
+ if src_image_path: |
+ src_image_path = ReinterpretPathForChroot(src_image_path) |
if not self.delta_list.has_key(image_path): |
self.delta_list[image_path] = set([src_image_path]) |
else: |
@@ -671,98 +635,33 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest): |
class ParallelJob(threading.Thread): |
- """Small wrapper for threading. Thread that releases a semaphores on exit.""" |
- |
- def __init__(self, starting_semaphore, ending_semaphore, target, args): |
- """Initializes an instance of a job. |
- |
- Args: |
- starting_semaphore: Semaphore used by caller to wait on such that |
- there isn't more than a certain number of threads running. Should |
- be initialized to a value for the number of threads wanting to be run |
- at a time. |
- ending_semaphore: Semaphore is released every time a job ends. Should be |
- initialized to 0 before starting first job. Should be acquired once for |
- each job. Threading.Thread.join() has a bug where if the run function |
- terminates too quickly join() will hang forever. |
- target: The func to run. |
- args: Args to pass to the fun. |
- """ |
+ """Small wrapper for threading.Thread that releases a semaphore on exit.""" |
+ def __init__(self, semaphore, target, args): |
threading.Thread.__init__(self, target=target, args=args) |
self._target = target |
self._args = args |
- self._starting_semaphore = starting_semaphore |
- self._ending_semaphore = ending_semaphore |
+ self._semaphore = semaphore |
self._output = None |
self._completed = False |
def run(self): |
- """Thread override. Runs the method specified and sets output.""" |
try: |
- self._output = self._target(*self._args) |
+ threading.Thread.run(self) |
finally: |
- # From threading.py to avoid a refcycle. |
- del self._target, self._args |
- # Our own clean up. |
self._Cleanup() |
self._completed = True |
def GetOutput(self): |
- """Returns the output of the method run.""" |
assert self._completed, 'GetOutput called before thread was run.' |
return self._output |
def _Cleanup(self): |
- """Releases semaphores for a waiting caller.""" |
- self._starting_semaphore.release() |
- self._ending_semaphore.release() |
+ self._semaphore.release() |
def __str__(self): |
return '%s(%s)' % (self._target, self._args) |
-class DevServerWrapper(threading.Thread): |
- """A Simple wrapper around a devserver instance.""" |
- |
- def __init__(self): |
- self.proc = None |
- threading.Thread.__init__(self) |
- |
- def run(self): |
- # Kill previous running instance of devserver if it exists. |
- RunCommand(['sudo', 'pkill', '-f', 'devserver.py', ], error_ok=True, |
- print_cmd=False) |
- self.proc = subprocess.Popen(['sudo', |
- './start_devserver', |
- '--archive_dir=./static', |
- '--client_prefix=ChromeOSUpdateEngine', |
- '--production', |
- ]) |
- self.proc.communicate() |
- |
- def Stop(self): |
- """Kills the devserver instance.""" |
- self.proc.kill() |
- |
- @classmethod |
- def GetDevServerURL(cls, port, sub_dir): |
- """Returns the dev server url for a given port and sub directory.""" |
- ip_addr = GetIPAddress() |
- if not port: port = 8080 |
- url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr, |
- 'port': str(port), |
- 'dir': sub_dir} |
- return url |
- |
- |
-def _GenerateUpdateId(target, src): |
- """Returns a simple representation id of target and src paths.""" |
- if src: |
- return '%s->%s' % (target, src) |
- else: |
- return target |
- |
- |
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): |
"""Runs set number of specified jobs in parallel. |
@@ -777,39 +676,29 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args): |
return (x, y) |
threads = [] |
- job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
- join_semaphore = threading.Semaphore(0) |
+ job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs) |
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.' |
# Create the parallel jobs. |
for job, args in map(_TwoTupleize, jobs, jobs_args): |
- thread = ParallelJob(job_start_semaphore, join_semaphore, target=job, |
- args=args) |
+ thread = ParallelJob(job_pool_semaphore, target=job, args=args) |
threads.append(thread) |
# We use a semaphore to ensure we don't run more jobs that required. |
# After each thread finishes, it releases (increments semaphore). |
# Acquire blocks of num jobs reached and continues when a thread finishes. |
for next_thread in threads: |
- job_start_semaphore.acquire(blocking=True) |
- Info('Starting job %s' % next_thread) |
+ job_pool_semaphore.acquire(blocking=True) |
+ Info('Starting %s' % next_thread) |
next_thread.start() |
# Wait on the rest of the threads to finish. |
for thread in threads: |
- join_semaphore.acquire(blocking=True) |
+ thread.join() |
return [thread.GetOutput() for thread in threads] |
-def _PrepareTestSuite(parser, options, test_class): |
- """Returns a prepared test suite given by the options and test class.""" |
- test_class.ProcessOptions(parser, options) |
- test_loader = unittest.TestLoader() |
- test_loader.testMethodPrefix = options.test_prefix |
- return test_loader.loadTestsFromTestCase(test_class) |
- |
- |
def _PregenerateUpdates(parser, options): |
"""Determines all deltas that will be generated and generates them. |
@@ -819,80 +708,41 @@ def _PregenerateUpdates(parser, options): |
parser: parser from main. |
options: options from parsed parser. |
Returns: |
- Dictionary of Update Identifiers->Relative cache locations. |
+ Array of output from generating updates. |
""" |
def _GenerateVMUpdate(target, src): |
"""Generates an update using the devserver.""" |
- target = ReinterpretPathForChroot(target) |
- if src: |
- src = ReinterpretPathForChroot(src) |
- |
- return RunCommand(['sudo', |
- './start_devserver', |
- '--pregenerate_update', |
- '--exit', |
- '--image=%s' % target, |
- '--src_image=%s' % src, |
- '--for_vm', |
- ], redirect_stdout=True, enter_chroot=True, |
- print_cmd=False) |
+ RunCommand(['sudo', |
+ './start_devserver', |
+ '--pregenerate_update', |
+ '--exit', |
+ '--image=%s' % target, |
+ '--src_image=%s' % src, |
+ '--for_vm' |
+ ], enter_chroot=True) |
# Get the list of deltas by mocking out update method in test class. |
- test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest) |
+ GenerateVirtualAUDeltasTest.ProcessOptions(parser, options) |
+ test_loader = unittest.TestLoader() |
+ test_loader.testMethodPrefix = options.test_prefix |
+ test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest) |
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite) |
Info('The following delta updates are required.') |
- update_ids = [] |
jobs = [] |
args = [] |
for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items(): |
for src in srcs: |
- update_id = _GenerateUpdateId(target=target, src=src) |
- print >> sys.stderr, 'AU: %s' % update_id |
- update_ids.append(update_id) |
+ if src: |
+ print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target) |
+ else: |
+ print >> sys.stderr, 'FULL AU %s' % target |
+ |
jobs.append(_GenerateVMUpdate) |
args.append((target, src)) |
- raw_results = _RunParallelJobs(options.jobs, jobs, args) |
- results = [] |
- |
- # Parse the output. |
- key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)') |
- for result in raw_results: |
- for line in result.splitlines(): |
- match = key_line_re.search(line) |
- if match: |
- # Convert blah/blah/update.gz -> update/blah/blah. |
- path_to_update_gz = match.group(1).rstrip() |
- (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz') |
- results.append('/'.join(['update', path_to_update_dir])) |
- break |
- |
- assert len(raw_results) == len(results), \ |
- 'Insufficient number cache directories returned.' |
- |
- # Build the dictionary from our id's and returned cache paths. |
- cache_dictionary = {} |
- for index, id in enumerate(update_ids): |
- cache_dictionary[id] = results[index] |
- |
- return cache_dictionary |
- |
- |
-def _RunTestsInParallel(parser, options, test_class): |
- """Runs the tests given by the options and test_class in parallel.""" |
- threads = [] |
- args = [] |
- test_suite = _PrepareTestSuite(parser, options, test_class) |
- for test in test_suite: |
- test_name = test.id() |
- test_case = unittest.TestLoader().loadTestsFromName(test_name) |
- threads.append(unittest.TextTestRunner().run) |
- args.append(test_case) |
- |
- results = _RunParallelJobs(options.jobs, threads, args) |
- if not (test_result.wasSuccessful() for test_result in results): |
- Die('Test harness was not successful') |
+ results = _RunParallelJobs(options.jobs, jobs, args) |
+ return results |
def main(): |
@@ -927,28 +777,22 @@ def main(): |
if leftover_args: |
parser.error('Found extra options we do not support: %s' % leftover_args) |
- # Figure out the test_class. |
if options.type == 'vm': test_class = VirtualAUTest |
elif options.type == 'real': test_class = RealAUTest |
else: parser.error('Could not parse harness type %s.' % options.type) |
# TODO(sosa): Caching doesn't really make sense on non-vm images (yet). |
- global dev_server_cache |
- if options.type == 'vm' and options.jobs > 1: |
- dev_server_cache = _PregenerateUpdates(parser, options) |
- my_server = DevServerWrapper() |
- my_server.start() |
- try: |
- _RunTestsInParallel(parser, options, test_class) |
- finally: |
- my_server.Stop() |
- |
- else: |
- dev_server_cache = None |
- test_suite = _PrepareTestSuite(parser, options, test_class) |
- test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
- if not test_result.wasSuccessful(): |
- Die('Test harness was not successful.') |
+ if options.type == 'vm': |
+ _PregenerateUpdates(parser, options) |
+ |
+ # Run the test suite. |
+ test_class.ProcessOptions(parser, options) |
+ test_loader = unittest.TestLoader() |
+ test_loader.testMethodPrefix = options.test_prefix |
+ test_suite = test_loader.loadTestsFromTestCase(test_class) |
+ test_result = unittest.TextTestRunner(verbosity=2).run(test_suite) |
+ if not test_result.wasSuccessful(): |
+ Die('Test harness was not successful.') |
if __name__ == '__main__': |