Index: build/android/pylib/base/shard.py |
diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py |
index b4e1281ef979537043fb42c15609ba6a158162c8..fdca5249a06b06f920d7b9ccf7a4d9e9fe026c60 100644 |
--- a/build/android/pylib/base/shard.py |
+++ b/build/android/pylib/base/shard.py |
@@ -5,61 +5,154 @@ |
"""Implements test sharding logic.""" |
import logging |
-import sys |
import threading |
from pylib import android_commands |
from pylib import forwarder |
+from pylib.utils import reraiser_thread |
import test_result |
-class _Worker(threading.Thread): |
- """Runs tests from the test_queue using the given runner in a separate thread. |
+class _Test(object): |
+ """Holds a test with additional metadata.""" |
+ def __init__(self, test, tries=0): |
+ """Initializes the _Test object. |
- Places results in the out_results. |
+ Args: |
+ test: the test. |
+ tries: number of tries so far. |
+ """ |
+ self.test = test |
+ self.tries = tries |
+ |
+ |
+class _TestCollection(object): |
+ """A threadsafe collection of tests. |
+ |
+ Args: |
+ tests: list of tests to put in the collection. |
""" |
- def __init__(self, runner, test_queue, out_results, out_retry): |
- """Initializes the worker. |
+ def __init__(self, tests=[]): |
+ self._lock = threading.Lock() |
+ self._tests = [] |
+ self._tests_in_progress = 0 |
+ # Used to signal that an item is avaliable or all items have been handled. |
+ self._item_avaliable_or_all_done = threading.Event() |
+ for t in tests: |
+ self.add(t) |
+ |
+ def _pop(self): |
+ """Pop a test from the collection. |
+ |
+ Waits until a test is avaliable or all tests have been handled. |
+ |
+ Returns: |
+ A test or None if all tests have been handled. |
+ """ |
+ while True: |
+ # Wait for a test to be avaliable or all tests to have been handled. |
+ self._item_avaliable_or_all_done.wait() |
+ with self._lock: |
+ # Check which of the two conditions triggered the signal. |
+ if self._tests_in_progress == 0: |
+ return None |
+ try: |
+ return self._tests.pop() |
+ except IndexError: |
+ # Another thread beat us to the avaliable test, wait again. |
+ self._item_avaliable_or_all_done.clear() |
+ |
+ def add(self, test): |
+ """Add an test to the collection. |
Args: |
- runner: A TestRunner object used to run the tests. |
- test_queue: A list from which to get tests to run. |
- out_results: A list to add TestResults to. |
- out_retry: A list to add tests to retry. |
- """ |
- super(_Worker, self).__init__() |
- self.daemon = True |
- self._exc_info = None |
- self._runner = runner |
- self._test_queue = test_queue |
- self._out_results = out_results |
- self._out_retry = out_retry |
- |
- #override |
- def run(self): |
- """Run tests from the queue in a seperate thread until it is empty. |
- |
- Adds TestResults objects to the out_results list and may add tests to the |
- out_retry list. |
+ item: A test to add. |
""" |
+ with self._lock: |
+ self._tests.append(test) |
+ self._item_avaliable_or_all_done.set() |
+ self._tests_in_progress += 1 |
+ |
+ def test_completed(self): |
+ """Indicate that a test has been fully handled.""" |
+ with self._lock: |
+ self._tests_in_progress -= 1 |
+ if self._tests_in_progress == 0: |
+ # All tests have been handled, signal all waiting threads. |
+ self._item_avaliable_or_all_done.set() |
+ |
+ def __iter__(self): |
+ """Iterate through tests in the collection until all have been handled.""" |
+ while True: |
+ r = self._pop() |
+ if r is None: |
+ break |
+ yield r |
+ |
+ |
+def _RunTestsFromQueue(runner, test_collection, out_results): |
+ """Runs tests from the test_collection until empty using the given runner. |
+ |
+ Adds TestResults objects to the out_results list and may add tests to the |
+ out_retry list. |
+ |
+ Args: |
+ runner: A TestRunner object used to run the tests. |
+ test_collection: A _TestCollection from which to get _Test objects to run. |
+ out_results: A list to add TestResults to. |
+ """ |
+ for test in test_collection: |
try: |
- while True: |
- test = self._test_queue.pop() |
- result, retry = self._runner.Run(test) |
- self._out_results.append(result) |
- if retry: |
- self._out_retry.append(retry) |
- except IndexError: |
- pass |
+ if not android_commands.IsDeviceAttached(runner.device): |
+ # Device is unresponsive, stop handling tests on this device. |
+ msg = 'Device %s is unresponsive.' % runner.device |
+ logging.warning(msg) |
+ raise android_commands.errors.DeviceUnresponsiveError(msg) |
+ result, retry = runner.RunTest(test.test) |
+ test.tries += 1 |
+ if retry and test.tries <= 3: |
+ # Retry non-passing results, only record passing results. |
+ out_results.append(test_result.TestResults.FromRun(ok=result.ok)) |
+ logging.warning('****Retrying test, try #%s.' % test.tries) |
+ test_collection.add(_Test(test=retry, tries=test.tries)) |
+ else: |
+ # All tests passed or retry limit reached. Either way, record results. |
+ out_results.append(result) |
+ except android_commands.errors.DeviceUnresponsiveError: |
+ # Device is unresponsive, stop handling tests on this device and ensure |
+ # current test gets runs by another device. Don't reraise this exception |
+ # on the main thread. |
+ test_collection.add(test) |
+ return |
except: |
- self._exc_info = sys.exc_info() |
+ # An unhandleable exception, ensure tests get run by another device and |
+ # reraise this exception on the main thread. |
+ test_collection.add(test) |
raise |
+ finally: |
+ # Retries count as separate tasks so always mark the popped test as done. |
+ test_collection.test_completed() |
+ |
+ |
+def _SetUp(runner_factory, device, out_runners): |
+ """Creates a test runner for each device and calls SetUp() in parallel. |
- def ReraiseIfException(self): |
- """Reraise exception if an exception was raised in the thread.""" |
- if self._exc_info: |
- raise self._exc_info[0], self._exc_info[1], self._exc_info[2] |
+ Note: if a device is unresponsive the corresponding TestRunner will not be |
+ added to out_runners. |
+ |
+ Args: |
+ runner_factory: callable that takes a device and returns a TestRunner. |
+ device: the device serial number to set up. |
+ out_runners: list to add the successfully set up TestRunner object. |
+ """ |
+ try: |
+ logging.warning('*****Creating shard for %s.', device) |
+ runner = runner_factory(device) |
+ runner.SetUp() |
+ out_runners.append(runner) |
+ except android_commands.errors.DeviceUnresponsiveError as e: |
+ logging.warning('****Failed to create shard for %s: [%s]', (device, e)) |
def _RunAllTests(runners, tests): |
@@ -70,28 +163,21 @@ def _RunAllTests(runners, tests): |
tests: a list of Tests to run using the given TestRunners. |
Returns: |
- Tuple: (list of TestResults, list of tests to retry) |
+ A TestResults object. |
""" |
- tests_queue = list(tests) |
- workers = [] |
+ logging.warning('****Running %s tests with %s test runners.' % |
+ (len(tests), len(runners))) |
+ tests_collection = _TestCollection([_Test(t) for t in tests]) |
results = [] |
- retry = [] |
- for r in runners: |
- worker = _Worker(r, tests_queue, results, retry) |
- worker.start() |
- workers.append(worker) |
- while workers: |
- for w in workers[:]: |
- # Allow the main thread to periodically check for keyboard interrupts. |
- w.join(0.1) |
- if not w.isAlive(): |
- w.ReraiseIfException() |
- workers.remove(w) |
- return (results, retry) |
+ workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread( |
+ _RunTestsFromQueue, [r, tests_collection, results]) for r in runners]) |
+ workers.StartAll() |
+ workers.JoinAll() |
+ return test_result.TestResults.FromTestResults(results) |
def _CreateRunners(runner_factory, devices): |
- """Creates a test runner for each device. |
+ """Creates a test runner for each device and calls SetUp() in parallel. |
Note: if a device is unresponsive the corresponding TestRunner will not be |
included in the returned list. |
@@ -103,20 +189,29 @@ def _CreateRunners(runner_factory, devices): |
Returns: |
A list of TestRunner objects. |
""" |
+ logging.warning('****Creating %s test runners.' % len(devices)) |
test_runners = [] |
- for index, device in enumerate(devices): |
- logging.warning('*' * 80) |
- logging.warning('Creating shard %d for %s', index, device) |
- logging.warning('*' * 80) |
- try: |
- test_runners.append(runner_factory(device)) |
- except android_commands.errors.DeviceUnresponsiveError as e: |
- logging.warning('****Failed to create a shard: [%s]', e) |
+ threads = reraiser_thread.ReraiserThreadGroup( |
+ [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners]) |
+ for d in devices]) |
+ threads.StartAll() |
+ threads.JoinAll() |
return test_runners |
-def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', |
- tries=3): |
+def _TearDownRunners(runners): |
+ """Calls TearDown() for each test runner in parallel. |
+ Args: |
+ runners: a list of TestRunner objects. |
+ """ |
+ threads = reraiser_thread.ReraiserThreadGroup( |
+ [reraiser_thread.ReraiserThread(runner.TearDown) |
+ for runner in runners]) |
+ threads.StartAll() |
+ threads.JoinAll() |
+ |
+ |
+def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'): |
"""Run all tests on attached devices, retrying tests that don't pass. |
Args: |
@@ -124,34 +219,18 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', |
devices: list of attached device serial numbers as strings. |
tests: list of tests to run. |
build_type: either 'Debug' or 'Release'. |
- tries: number of tries before accepting failure. |
Returns: |
A test_result.TestResults object. |
""" |
- final_results = test_result.TestResults() |
- results = test_result.TestResults() |
forwarder.Forwarder.KillHost(build_type) |
- try_count = 0 |
- while tests: |
- devices = set(devices).intersection(android_commands.GetAttachedDevices()) |
- if not devices: |
- # There are no visible devices attached, this is unrecoverable. |
- msg = 'No devices attached and visible to run tests!' |
- logging.critical(msg) |
- raise Exception(msg) |
- if try_count >= tries: |
- # We've retried too many times, return the TestResults up to this point. |
- results.ok = final_results.ok |
- final_results = results |
- break |
- try_count += 1 |
- runners = _CreateRunners(runner_factory, devices) |
+ runners = _CreateRunners(runner_factory, devices) |
+ try: |
+ return _RunAllTests(runners, tests) |
+ finally: |
try: |
- results_list, tests = _RunAllTests(runners, tests) |
- results = test_result.TestResults.FromTestResults(results_list) |
- final_results.ok += results.ok |
+ _TearDownRunners(runners) |
except android_commands.errors.DeviceUnresponsiveError as e: |
- logging.warning('****Failed to run test: [%s]', e) |
- forwarder.Forwarder.KillHost(build_type) |
- return final_results |
+ logging.warning('****Device unresponsive during TearDown: [%s]', e) |
+ finally: |
+ forwarder.Forwarder.KillHost(build_type) |