| Index: build/android/pylib/base/shard.py
|
| diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py
|
| index b4e1281ef979537043fb42c15609ba6a158162c8..fdca5249a06b06f920d7b9ccf7a4d9e9fe026c60 100644
|
| --- a/build/android/pylib/base/shard.py
|
| +++ b/build/android/pylib/base/shard.py
|
| @@ -5,61 +5,154 @@
|
| """Implements test sharding logic."""
|
|
|
| import logging
|
| -import sys
|
| import threading
|
|
|
| from pylib import android_commands
|
| from pylib import forwarder
|
| +from pylib.utils import reraiser_thread
|
|
|
| import test_result
|
|
|
|
|
| -class _Worker(threading.Thread):
|
| - """Runs tests from the test_queue using the given runner in a separate thread.
|
| +class _Test(object):
|
| + """Holds a test with additional metadata."""
|
| + def __init__(self, test, tries=0):
|
| + """Initializes the _Test object.
|
|
|
| - Places results in the out_results.
|
| + Args:
|
| + test: the test.
|
| + tries: number of tries so far.
|
| + """
|
| + self.test = test
|
| + self.tries = tries
|
| +
|
| +
|
| +class _TestCollection(object):
|
| + """A threadsafe collection of tests.
|
| +
|
| + Args:
|
| + tests: list of tests to put in the collection.
|
| """
|
| - def __init__(self, runner, test_queue, out_results, out_retry):
|
| - """Initializes the worker.
|
| + def __init__(self, tests=[]):
|
| + self._lock = threading.Lock()
|
| + self._tests = []
|
| + self._tests_in_progress = 0
|
| + # Used to signal that an item is avaliable or all items have been handled.
|
| + self._item_avaliable_or_all_done = threading.Event()
|
| + for t in tests:
|
| + self.add(t)
|
| +
|
| + def _pop(self):
|
| + """Pop a test from the collection.
|
| +
|
| + Waits until a test is avaliable or all tests have been handled.
|
| +
|
| + Returns:
|
| + A test or None if all tests have been handled.
|
| + """
|
| + while True:
|
| + # Wait for a test to be avaliable or all tests to have been handled.
|
| + self._item_avaliable_or_all_done.wait()
|
| + with self._lock:
|
| + # Check which of the two conditions triggered the signal.
|
| + if self._tests_in_progress == 0:
|
| + return None
|
| + try:
|
| + return self._tests.pop()
|
| + except IndexError:
|
| + # Another thread beat us to the avaliable test, wait again.
|
| + self._item_avaliable_or_all_done.clear()
|
| +
|
| + def add(self, test):
|
| + """Add an test to the collection.
|
|
|
| Args:
|
| - runner: A TestRunner object used to run the tests.
|
| - test_queue: A list from which to get tests to run.
|
| - out_results: A list to add TestResults to.
|
| - out_retry: A list to add tests to retry.
|
| - """
|
| - super(_Worker, self).__init__()
|
| - self.daemon = True
|
| - self._exc_info = None
|
| - self._runner = runner
|
| - self._test_queue = test_queue
|
| - self._out_results = out_results
|
| - self._out_retry = out_retry
|
| -
|
| - #override
|
| - def run(self):
|
| - """Run tests from the queue in a seperate thread until it is empty.
|
| -
|
| - Adds TestResults objects to the out_results list and may add tests to the
|
| - out_retry list.
|
| + item: A test to add.
|
| """
|
| + with self._lock:
|
| + self._tests.append(test)
|
| + self._item_avaliable_or_all_done.set()
|
| + self._tests_in_progress += 1
|
| +
|
| + def test_completed(self):
|
| + """Indicate that a test has been fully handled."""
|
| + with self._lock:
|
| + self._tests_in_progress -= 1
|
| + if self._tests_in_progress == 0:
|
| + # All tests have been handled, signal all waiting threads.
|
| + self._item_avaliable_or_all_done.set()
|
| +
|
| + def __iter__(self):
|
| + """Iterate through tests in the collection until all have been handled."""
|
| + while True:
|
| + r = self._pop()
|
| + if r is None:
|
| + break
|
| + yield r
|
| +
|
| +
|
| +def _RunTestsFromQueue(runner, test_collection, out_results):
|
| + """Runs tests from the test_collection until empty using the given runner.
|
| +
|
| + Adds TestResults objects to the out_results list and may add tests to the
|
| + out_retry list.
|
| +
|
| + Args:
|
| + runner: A TestRunner object used to run the tests.
|
| + test_collection: A _TestCollection from which to get _Test objects to run.
|
| + out_results: A list to add TestResults to.
|
| + """
|
| + for test in test_collection:
|
| try:
|
| - while True:
|
| - test = self._test_queue.pop()
|
| - result, retry = self._runner.Run(test)
|
| - self._out_results.append(result)
|
| - if retry:
|
| - self._out_retry.append(retry)
|
| - except IndexError:
|
| - pass
|
| + if not android_commands.IsDeviceAttached(runner.device):
|
| + # Device is unresponsive, stop handling tests on this device.
|
| + msg = 'Device %s is unresponsive.' % runner.device
|
| + logging.warning(msg)
|
| + raise android_commands.errors.DeviceUnresponsiveError(msg)
|
| + result, retry = runner.RunTest(test.test)
|
| + test.tries += 1
|
| + if retry and test.tries <= 3:
|
| + # Retry non-passing results, only record passing results.
|
| + out_results.append(test_result.TestResults.FromRun(ok=result.ok))
|
| + logging.warning('****Retrying test, try #%s.' % test.tries)
|
| + test_collection.add(_Test(test=retry, tries=test.tries))
|
| + else:
|
| + # All tests passed or retry limit reached. Either way, record results.
|
| + out_results.append(result)
|
| + except android_commands.errors.DeviceUnresponsiveError:
|
| + # Device is unresponsive, stop handling tests on this device and ensure
|
| + # current test gets runs by another device. Don't reraise this exception
|
| + # on the main thread.
|
| + test_collection.add(test)
|
| + return
|
| except:
|
| - self._exc_info = sys.exc_info()
|
| + # An unhandleable exception, ensure tests get run by another device and
|
| + # reraise this exception on the main thread.
|
| + test_collection.add(test)
|
| raise
|
| + finally:
|
| + # Retries count as separate tasks so always mark the popped test as done.
|
| + test_collection.test_completed()
|
| +
|
| +
|
| +def _SetUp(runner_factory, device, out_runners):
|
| + """Creates a test runner for each device and calls SetUp() in parallel.
|
|
|
| - def ReraiseIfException(self):
|
| - """Reraise exception if an exception was raised in the thread."""
|
| - if self._exc_info:
|
| - raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
|
| + Note: if a device is unresponsive the corresponding TestRunner will not be
|
| + added to out_runners.
|
| +
|
| + Args:
|
| + runner_factory: callable that takes a device and returns a TestRunner.
|
| + device: the device serial number to set up.
|
| + out_runners: list to add the successfully set up TestRunner object.
|
| + """
|
| + try:
|
| + logging.warning('*****Creating shard for %s.', device)
|
| + runner = runner_factory(device)
|
| + runner.SetUp()
|
| + out_runners.append(runner)
|
| + except android_commands.errors.DeviceUnresponsiveError as e:
|
| + logging.warning('****Failed to create shard for %s: [%s]', (device, e))
|
|
|
|
|
| def _RunAllTests(runners, tests):
|
| @@ -70,28 +163,21 @@ def _RunAllTests(runners, tests):
|
| tests: a list of Tests to run using the given TestRunners.
|
|
|
| Returns:
|
| - Tuple: (list of TestResults, list of tests to retry)
|
| + A TestResults object.
|
| """
|
| - tests_queue = list(tests)
|
| - workers = []
|
| + logging.warning('****Running %s tests with %s test runners.' %
|
| + (len(tests), len(runners)))
|
| + tests_collection = _TestCollection([_Test(t) for t in tests])
|
| results = []
|
| - retry = []
|
| - for r in runners:
|
| - worker = _Worker(r, tests_queue, results, retry)
|
| - worker.start()
|
| - workers.append(worker)
|
| - while workers:
|
| - for w in workers[:]:
|
| - # Allow the main thread to periodically check for keyboard interrupts.
|
| - w.join(0.1)
|
| - if not w.isAlive():
|
| - w.ReraiseIfException()
|
| - workers.remove(w)
|
| - return (results, retry)
|
| + workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread(
|
| + _RunTestsFromQueue, [r, tests_collection, results]) for r in runners])
|
| + workers.StartAll()
|
| + workers.JoinAll()
|
| + return test_result.TestResults.FromTestResults(results)
|
|
|
|
|
| def _CreateRunners(runner_factory, devices):
|
| - """Creates a test runner for each device.
|
| + """Creates a test runner for each device and calls SetUp() in parallel.
|
|
|
| Note: if a device is unresponsive the corresponding TestRunner will not be
|
| included in the returned list.
|
| @@ -103,20 +189,29 @@ def _CreateRunners(runner_factory, devices):
|
| Returns:
|
| A list of TestRunner objects.
|
| """
|
| + logging.warning('****Creating %s test runners.' % len(devices))
|
| test_runners = []
|
| - for index, device in enumerate(devices):
|
| - logging.warning('*' * 80)
|
| - logging.warning('Creating shard %d for %s', index, device)
|
| - logging.warning('*' * 80)
|
| - try:
|
| - test_runners.append(runner_factory(device))
|
| - except android_commands.errors.DeviceUnresponsiveError as e:
|
| - logging.warning('****Failed to create a shard: [%s]', e)
|
| + threads = reraiser_thread.ReraiserThreadGroup(
|
| + [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners])
|
| + for d in devices])
|
| + threads.StartAll()
|
| + threads.JoinAll()
|
| return test_runners
|
|
|
|
|
| -def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
|
| - tries=3):
|
| +def _TearDownRunners(runners):
|
| + """Calls TearDown() for each test runner in parallel.
|
| + Args:
|
| + runners: a list of TestRunner objects.
|
| + """
|
| + threads = reraiser_thread.ReraiserThreadGroup(
|
| + [reraiser_thread.ReraiserThread(runner.TearDown)
|
| + for runner in runners])
|
| + threads.StartAll()
|
| + threads.JoinAll()
|
| +
|
| +
|
| +def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'):
|
| """Run all tests on attached devices, retrying tests that don't pass.
|
|
|
| Args:
|
| @@ -124,34 +219,18 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
|
| devices: list of attached device serial numbers as strings.
|
| tests: list of tests to run.
|
| build_type: either 'Debug' or 'Release'.
|
| - tries: number of tries before accepting failure.
|
|
|
| Returns:
|
| A test_result.TestResults object.
|
| """
|
| - final_results = test_result.TestResults()
|
| - results = test_result.TestResults()
|
| forwarder.Forwarder.KillHost(build_type)
|
| - try_count = 0
|
| - while tests:
|
| - devices = set(devices).intersection(android_commands.GetAttachedDevices())
|
| - if not devices:
|
| - # There are no visible devices attached, this is unrecoverable.
|
| - msg = 'No devices attached and visible to run tests!'
|
| - logging.critical(msg)
|
| - raise Exception(msg)
|
| - if try_count >= tries:
|
| - # We've retried too many times, return the TestResults up to this point.
|
| - results.ok = final_results.ok
|
| - final_results = results
|
| - break
|
| - try_count += 1
|
| - runners = _CreateRunners(runner_factory, devices)
|
| + runners = _CreateRunners(runner_factory, devices)
|
| + try:
|
| + return _RunAllTests(runners, tests)
|
| + finally:
|
| try:
|
| - results_list, tests = _RunAllTests(runners, tests)
|
| - results = test_result.TestResults.FromTestResults(results_list)
|
| - final_results.ok += results.ok
|
| + _TearDownRunners(runners)
|
| except android_commands.errors.DeviceUnresponsiveError as e:
|
| - logging.warning('****Failed to run test: [%s]', e)
|
| - forwarder.Forwarder.KillHost(build_type)
|
| - return final_results
|
| + logging.warning('****Device unresponsive during TearDown: [%s]', e)
|
| + finally:
|
| + forwarder.Forwarder.KillHost(build_type)
|
|
|