Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Unified Diff: bin/cros_au_test_harness.py

Issue 6348022: Revert "Passes cache location to tests and runs the tests in parallel." (Closed) Base URL: http://git.chromium.org/git/crosutils.git@master
Patch Set: Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | bin/cros_run_vm_update » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: bin/cros_au_test_harness.py
diff --git a/bin/cros_au_test_harness.py b/bin/cros_au_test_harness.py
index e230d2a2656b722506e7341920364753ac40f2f1..ab71232b7ee74d129e7ab82699e2ad70dc7aca8b 100755
--- a/bin/cros_au_test_harness.py
+++ b/bin/cros_au_test_harness.py
@@ -15,7 +15,6 @@
import optparse
import os
import re
-import subprocess
import sys
import threading
import time
@@ -24,7 +23,6 @@ import urllib
sys.path.append(os.path.join(os.path.dirname(__file__), '../lib'))
from cros_build_lib import Die
-from cros_build_lib import GetIPAddress
from cros_build_lib import Info
from cros_build_lib import ReinterpretPathForChroot
from cros_build_lib import RunCommand
@@ -33,8 +31,6 @@ from cros_build_lib import Warning
import cros_test_proxy
-global dev_server_cache
-
class UpdateException(Exception):
"""Exception thrown when _UpdateImage or _UpdateUsingPayload fail"""
@@ -473,14 +469,12 @@ class RealAUTest(unittest.TestCase, AUTest):
class VirtualAUTest(unittest.TestCase, AUTest):
"""Test harness for updating virtual machines."""
+ vm_image_path = None
# VM Constants.
_FULL_VDISK_SIZE = 6072
_FULL_STATEFULFS_SIZE = 3074
-
- # Class variables used to acquire individual VM variables per test.
- _vm_lock = threading.Lock()
- _next_port = 9222
+ _KVM_PID_FILE = '/tmp/harness_pid'
def _KillExistingVM(self, pid_file):
if os.path.exists(pid_file):
@@ -491,22 +485,10 @@ class VirtualAUTest(unittest.TestCase, AUTest):
assert not os.path.exists(pid_file)
- def _AcquireUniquePortAndPidFile(self):
- """Acquires unique ssh port and pid file for VM."""
- with VirtualAUTest._vm_lock:
- self._ssh_port = VirtualAUTest._next_port
- self._kvm_pid_file = '/tmp/kvm.%d' % self._ssh_port
- VirtualAUTest._next_port += 1
-
def setUp(self):
"""Unit test overriden method. Is called before every test."""
AUTest.setUp(self)
- self.vm_image_path = None
- self._AcquireUniquePortAndPidFile()
- self._KillExistingVM(self._kvm_pid_file)
-
- def tearDown(self):
- self._KillExistingVM(self._kvm_pid_file)
+ self._KillExistingVM(self._KVM_PID_FILE)
@classmethod
def ProcessOptions(cls, parser, options):
@@ -545,42 +527,26 @@ class VirtualAUTest(unittest.TestCase, AUTest):
self.assertTrue(os.path.exists(self.vm_image_path))
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
- proxy_port=''):
+ proxy_port=None):
"""Updates VM image with image_path."""
stateful_change_flag = self.GetStatefulChangeFlag(stateful_change)
if src_image_path and self._first_update:
src_image_path = self.vm_image_path
self._first_update = False
- # Check image payload cache first.
- update_id = _GenerateUpdateId(target=image_path, src=src_image_path)
- cache_path = dev_server_cache[update_id]
- if cache_path:
- Info('Using cache %s' % cache_path)
- update_url = DevServerWrapper.GetDevServerURL(proxy_port, cache_path)
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
- '--vm_image_path=%s' % self.vm_image_path,
- '--snapshot',
- self.graphics_flag,
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- stateful_change_flag,
- '--update_url=%s' % update_url,
- ]
- else:
- cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
- '--update_image_path=%s' % image_path,
- '--vm_image_path=%s' % self.vm_image_path,
- '--snapshot',
- self.graphics_flag,
- '--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
- stateful_change_flag,
- '--src_image=%s' % src_image_path,
- '--proxy_port=%s' % proxy_port
- ]
+ cmd = ['%s/cros_run_vm_update' % self.crosutilsbin,
+ '--update_image_path=%s' % image_path,
+ '--vm_image_path=%s' % self.vm_image_path,
+ '--snapshot',
+ self.graphics_flag,
+ '--persist',
+ '--kvm_pid=%s' % self._KVM_PID_FILE,
+ stateful_change_flag,
+ '--src_image=%s' % src_image_path,
+ ]
+
+ if proxy_port:
+ cmd.append('--proxy_port=%s' % proxy_port)
if self.verbose:
try:
@@ -602,8 +568,7 @@ class VirtualAUTest(unittest.TestCase, AUTest):
'--snapshot',
self.graphics_flag,
'--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
+ '--kvm_pid=%s' % self._KVM_PID_FILE,
stateful_change_flag,
]
@@ -629,8 +594,7 @@ class VirtualAUTest(unittest.TestCase, AUTest):
'--image_path=%s' % self.vm_image_path,
'--snapshot',
'--persist',
- '--kvm_pid=%s' % self._kvm_pid_file,
- '--ssh_port=%s' % self._ssh_port,
+ '--kvm_pid=%s' % self._KVM_PID_FILE,
self.verify_suite,
]
@@ -649,15 +613,15 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest):
def setUp(self):
AUTest.setUp(self)
- def tearDown(self):
- pass
-
def _UpdateImage(self, image_path, src_image_path='', stateful_change='old',
proxy_port=None):
if src_image_path and self._first_update:
src_image_path = self.vm_image_path
self._first_update = False
+ image_path = ReinterpretPathForChroot(image_path)
+ if src_image_path:
+ src_image_path = ReinterpretPathForChroot(src_image_path)
if not self.delta_list.has_key(image_path):
self.delta_list[image_path] = set([src_image_path])
else:
@@ -671,98 +635,33 @@ class GenerateVirtualAUDeltasTest(VirtualAUTest):
class ParallelJob(threading.Thread):
- """Small wrapper for threading. Thread that releases a semaphores on exit."""
-
- def __init__(self, starting_semaphore, ending_semaphore, target, args):
- """Initializes an instance of a job.
-
- Args:
- starting_semaphore: Semaphore used by caller to wait on such that
- there isn't more than a certain number of threads running. Should
- be initialized to a value for the number of threads wanting to be run
- at a time.
- ending_semaphore: Semaphore is released every time a job ends. Should be
- initialized to 0 before starting first job. Should be acquired once for
- each job. Threading.Thread.join() has a bug where if the run function
- terminates too quickly join() will hang forever.
- target: The func to run.
- args: Args to pass to the fun.
- """
+ """Small wrapper for threading.Thread that releases a semaphore on exit."""
+ def __init__(self, semaphore, target, args):
threading.Thread.__init__(self, target=target, args=args)
self._target = target
self._args = args
- self._starting_semaphore = starting_semaphore
- self._ending_semaphore = ending_semaphore
+ self._semaphore = semaphore
self._output = None
self._completed = False
def run(self):
- """Thread override. Runs the method specified and sets output."""
try:
- self._output = self._target(*self._args)
+ threading.Thread.run(self)
finally:
- # From threading.py to avoid a refcycle.
- del self._target, self._args
- # Our own clean up.
self._Cleanup()
self._completed = True
def GetOutput(self):
- """Returns the output of the method run."""
assert self._completed, 'GetOutput called before thread was run.'
return self._output
def _Cleanup(self):
- """Releases semaphores for a waiting caller."""
- self._starting_semaphore.release()
- self._ending_semaphore.release()
+ self._semaphore.release()
def __str__(self):
return '%s(%s)' % (self._target, self._args)
-class DevServerWrapper(threading.Thread):
- """A Simple wrapper around a devserver instance."""
-
- def __init__(self):
- self.proc = None
- threading.Thread.__init__(self)
-
- def run(self):
- # Kill previous running instance of devserver if it exists.
- RunCommand(['sudo', 'pkill', '-f', 'devserver.py', ], error_ok=True,
- print_cmd=False)
- self.proc = subprocess.Popen(['sudo',
- './start_devserver',
- '--archive_dir=./static',
- '--client_prefix=ChromeOSUpdateEngine',
- '--production',
- ])
- self.proc.communicate()
-
- def Stop(self):
- """Kills the devserver instance."""
- self.proc.kill()
-
- @classmethod
- def GetDevServerURL(cls, port, sub_dir):
- """Returns the dev server url for a given port and sub directory."""
- ip_addr = GetIPAddress()
- if not port: port = 8080
- url = 'http://%(ip)s:%(port)s/%(dir)s' % {'ip': ip_addr,
- 'port': str(port),
- 'dir': sub_dir}
- return url
-
-
-def _GenerateUpdateId(target, src):
- """Returns a simple representation id of target and src paths."""
- if src:
- return '%s->%s' % (target, src)
- else:
- return target
-
-
def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
"""Runs set number of specified jobs in parallel.
@@ -777,39 +676,29 @@ def _RunParallelJobs(number_of_sumultaneous_jobs, jobs, jobs_args):
return (x, y)
threads = []
- job_start_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
- join_semaphore = threading.Semaphore(0)
+ job_pool_semaphore = threading.Semaphore(number_of_sumultaneous_jobs)
assert len(jobs) == len(jobs_args), 'Length of args array is wrong.'
# Create the parallel jobs.
for job, args in map(_TwoTupleize, jobs, jobs_args):
- thread = ParallelJob(job_start_semaphore, join_semaphore, target=job,
- args=args)
+ thread = ParallelJob(job_pool_semaphore, target=job, args=args)
threads.append(thread)
# We use a semaphore to ensure we don't run more jobs that required.
# After each thread finishes, it releases (increments semaphore).
# Acquire blocks of num jobs reached and continues when a thread finishes.
for next_thread in threads:
- job_start_semaphore.acquire(blocking=True)
- Info('Starting job %s' % next_thread)
+ job_pool_semaphore.acquire(blocking=True)
+ Info('Starting %s' % next_thread)
next_thread.start()
# Wait on the rest of the threads to finish.
for thread in threads:
- join_semaphore.acquire(blocking=True)
+ thread.join()
return [thread.GetOutput() for thread in threads]
-def _PrepareTestSuite(parser, options, test_class):
- """Returns a prepared test suite given by the options and test class."""
- test_class.ProcessOptions(parser, options)
- test_loader = unittest.TestLoader()
- test_loader.testMethodPrefix = options.test_prefix
- return test_loader.loadTestsFromTestCase(test_class)
-
-
def _PregenerateUpdates(parser, options):
"""Determines all deltas that will be generated and generates them.
@@ -819,80 +708,41 @@ def _PregenerateUpdates(parser, options):
parser: parser from main.
options: options from parsed parser.
Returns:
- Dictionary of Update Identifiers->Relative cache locations.
+ Array of output from generating updates.
"""
def _GenerateVMUpdate(target, src):
"""Generates an update using the devserver."""
- target = ReinterpretPathForChroot(target)
- if src:
- src = ReinterpretPathForChroot(src)
-
- return RunCommand(['sudo',
- './start_devserver',
- '--pregenerate_update',
- '--exit',
- '--image=%s' % target,
- '--src_image=%s' % src,
- '--for_vm',
- ], redirect_stdout=True, enter_chroot=True,
- print_cmd=False)
+ RunCommand(['sudo',
+ './start_devserver',
+ '--pregenerate_update',
+ '--exit',
+ '--image=%s' % target,
+ '--src_image=%s' % src,
+ '--for_vm'
+ ], enter_chroot=True)
# Get the list of deltas by mocking out update method in test class.
- test_suite = _PrepareTestSuite(parser, options, GenerateVirtualAUDeltasTest)
+ GenerateVirtualAUDeltasTest.ProcessOptions(parser, options)
+ test_loader = unittest.TestLoader()
+ test_loader.testMethodPrefix = options.test_prefix
+ test_suite = test_loader.loadTestsFromTestCase(GenerateVirtualAUDeltasTest)
test_result = unittest.TextTestRunner(verbosity=0).run(test_suite)
Info('The following delta updates are required.')
- update_ids = []
jobs = []
args = []
for target, srcs in GenerateVirtualAUDeltasTest.delta_list.items():
for src in srcs:
- update_id = _GenerateUpdateId(target=target, src=src)
- print >> sys.stderr, 'AU: %s' % update_id
- update_ids.append(update_id)
+ if src:
+ print >> sys.stderr, 'DELTA AU %s -> %s' % (src, target)
+ else:
+ print >> sys.stderr, 'FULL AU %s' % target
+
jobs.append(_GenerateVMUpdate)
args.append((target, src))
- raw_results = _RunParallelJobs(options.jobs, jobs, args)
- results = []
-
- # Parse the output.
- key_line_re = re.compile('^PREGENERATED_UPDATE=([\w/.]+)')
- for result in raw_results:
- for line in result.splitlines():
- match = key_line_re.search(line)
- if match:
- # Convert blah/blah/update.gz -> update/blah/blah.
- path_to_update_gz = match.group(1).rstrip()
- (path_to_update_dir, _, _) = path_to_update_gz.rpartition('/update.gz')
- results.append('/'.join(['update', path_to_update_dir]))
- break
-
- assert len(raw_results) == len(results), \
- 'Insufficient number cache directories returned.'
-
- # Build the dictionary from our id's and returned cache paths.
- cache_dictionary = {}
- for index, id in enumerate(update_ids):
- cache_dictionary[id] = results[index]
-
- return cache_dictionary
-
-
-def _RunTestsInParallel(parser, options, test_class):
- """Runs the tests given by the options and test_class in parallel."""
- threads = []
- args = []
- test_suite = _PrepareTestSuite(parser, options, test_class)
- for test in test_suite:
- test_name = test.id()
- test_case = unittest.TestLoader().loadTestsFromName(test_name)
- threads.append(unittest.TextTestRunner().run)
- args.append(test_case)
-
- results = _RunParallelJobs(options.jobs, threads, args)
- if not (test_result.wasSuccessful() for test_result in results):
- Die('Test harness was not successful')
+ results = _RunParallelJobs(options.jobs, jobs, args)
+ return results
def main():
@@ -927,28 +777,22 @@ def main():
if leftover_args:
parser.error('Found extra options we do not support: %s' % leftover_args)
- # Figure out the test_class.
if options.type == 'vm': test_class = VirtualAUTest
elif options.type == 'real': test_class = RealAUTest
else: parser.error('Could not parse harness type %s.' % options.type)
# TODO(sosa): Caching doesn't really make sense on non-vm images (yet).
- global dev_server_cache
- if options.type == 'vm' and options.jobs > 1:
- dev_server_cache = _PregenerateUpdates(parser, options)
- my_server = DevServerWrapper()
- my_server.start()
- try:
- _RunTestsInParallel(parser, options, test_class)
- finally:
- my_server.Stop()
-
- else:
- dev_server_cache = None
- test_suite = _PrepareTestSuite(parser, options, test_class)
- test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- if not test_result.wasSuccessful():
- Die('Test harness was not successful.')
+ if options.type == 'vm':
+ _PregenerateUpdates(parser, options)
+
+ # Run the test suite.
+ test_class.ProcessOptions(parser, options)
+ test_loader = unittest.TestLoader()
+ test_loader.testMethodPrefix = options.test_prefix
+ test_suite = test_loader.loadTestsFromTestCase(test_class)
+ test_result = unittest.TextTestRunner(verbosity=2).run(test_suite)
+ if not test_result.wasSuccessful():
+ Die('Test harness was not successful.')
if __name__ == '__main__':
« no previous file with comments | « no previous file | bin/cros_run_vm_update » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698