Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2719)

Unified Diff: build/android/pylib/base/shard.py

Issue 12317059: [Andoid] Threaded TestRunner creation and SetUp and TearDown calls. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebase Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « build/android/pylib/base/new_base_test_runner.py ('k') | build/android/pylib/base/shard_unittest.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: build/android/pylib/base/shard.py
diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py
index b4e1281ef979537043fb42c15609ba6a158162c8..fdca5249a06b06f920d7b9ccf7a4d9e9fe026c60 100644
--- a/build/android/pylib/base/shard.py
+++ b/build/android/pylib/base/shard.py
@@ -5,61 +5,154 @@
"""Implements test sharding logic."""
import logging
-import sys
import threading
from pylib import android_commands
from pylib import forwarder
+from pylib.utils import reraiser_thread
import test_result
-class _Worker(threading.Thread):
- """Runs tests from the test_queue using the given runner in a separate thread.
+class _Test(object):
+ """Holds a test with additional metadata."""
+ def __init__(self, test, tries=0):
+ """Initializes the _Test object.
- Places results in the out_results.
+ Args:
+ test: the test.
+ tries: number of tries so far.
+ """
+ self.test = test
+ self.tries = tries
+
+
+class _TestCollection(object):
+ """A threadsafe collection of tests.
+
+ Args:
+ tests: list of tests to put in the collection.
"""
- def __init__(self, runner, test_queue, out_results, out_retry):
- """Initializes the worker.
+ def __init__(self, tests=[]):
+ self._lock = threading.Lock()
+ self._tests = []
+ self._tests_in_progress = 0
+ # Used to signal that an item is avaliable or all items have been handled.
+ self._item_avaliable_or_all_done = threading.Event()
+ for t in tests:
+ self.add(t)
+
+ def _pop(self):
+ """Pop a test from the collection.
+
+ Waits until a test is avaliable or all tests have been handled.
+
+ Returns:
+ A test or None if all tests have been handled.
+ """
+ while True:
+ # Wait for a test to be avaliable or all tests to have been handled.
+ self._item_avaliable_or_all_done.wait()
+ with self._lock:
+ # Check which of the two conditions triggered the signal.
+ if self._tests_in_progress == 0:
+ return None
+ try:
+ return self._tests.pop()
+ except IndexError:
+ # Another thread beat us to the avaliable test, wait again.
+ self._item_avaliable_or_all_done.clear()
+
+ def add(self, test):
+ """Add an test to the collection.
Args:
- runner: A TestRunner object used to run the tests.
- test_queue: A list from which to get tests to run.
- out_results: A list to add TestResults to.
- out_retry: A list to add tests to retry.
- """
- super(_Worker, self).__init__()
- self.daemon = True
- self._exc_info = None
- self._runner = runner
- self._test_queue = test_queue
- self._out_results = out_results
- self._out_retry = out_retry
-
- #override
- def run(self):
- """Run tests from the queue in a seperate thread until it is empty.
-
- Adds TestResults objects to the out_results list and may add tests to the
- out_retry list.
+ item: A test to add.
"""
+ with self._lock:
+ self._tests.append(test)
+ self._item_avaliable_or_all_done.set()
+ self._tests_in_progress += 1
+
+ def test_completed(self):
+ """Indicate that a test has been fully handled."""
+ with self._lock:
+ self._tests_in_progress -= 1
+ if self._tests_in_progress == 0:
+ # All tests have been handled, signal all waiting threads.
+ self._item_avaliable_or_all_done.set()
+
+ def __iter__(self):
+ """Iterate through tests in the collection until all have been handled."""
+ while True:
+ r = self._pop()
+ if r is None:
+ break
+ yield r
+
+
+def _RunTestsFromQueue(runner, test_collection, out_results):
+ """Runs tests from the test_collection until empty using the given runner.
+
+ Adds TestResults objects to the out_results list and may add tests to the
+ out_retry list.
+
+ Args:
+ runner: A TestRunner object used to run the tests.
+ test_collection: A _TestCollection from which to get _Test objects to run.
+ out_results: A list to add TestResults to.
+ """
+ for test in test_collection:
try:
- while True:
- test = self._test_queue.pop()
- result, retry = self._runner.Run(test)
- self._out_results.append(result)
- if retry:
- self._out_retry.append(retry)
- except IndexError:
- pass
+ if not android_commands.IsDeviceAttached(runner.device):
+ # Device is unresponsive, stop handling tests on this device.
+ msg = 'Device %s is unresponsive.' % runner.device
+ logging.warning(msg)
+ raise android_commands.errors.DeviceUnresponsiveError(msg)
+ result, retry = runner.RunTest(test.test)
+ test.tries += 1
+ if retry and test.tries <= 3:
+ # Retry non-passing results, only record passing results.
+ out_results.append(test_result.TestResults.FromRun(ok=result.ok))
+ logging.warning('****Retrying test, try #%s.' % test.tries)
+ test_collection.add(_Test(test=retry, tries=test.tries))
+ else:
+ # All tests passed or retry limit reached. Either way, record results.
+ out_results.append(result)
+ except android_commands.errors.DeviceUnresponsiveError:
+ # Device is unresponsive, stop handling tests on this device and ensure
+ # current test gets runs by another device. Don't reraise this exception
+ # on the main thread.
+ test_collection.add(test)
+ return
except:
- self._exc_info = sys.exc_info()
+ # An unhandleable exception, ensure tests get run by another device and
+ # reraise this exception on the main thread.
+ test_collection.add(test)
raise
+ finally:
+ # Retries count as separate tasks so always mark the popped test as done.
+ test_collection.test_completed()
+
+
+def _SetUp(runner_factory, device, out_runners):
+ """Creates a test runner for each device and calls SetUp() in parallel.
- def ReraiseIfException(self):
- """Reraise exception if an exception was raised in the thread."""
- if self._exc_info:
- raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
+ Note: if a device is unresponsive the corresponding TestRunner will not be
+ added to out_runners.
+
+ Args:
+ runner_factory: callable that takes a device and returns a TestRunner.
+ device: the device serial number to set up.
+ out_runners: list to add the successfully set up TestRunner object.
+ """
+ try:
+ logging.warning('*****Creating shard for %s.', device)
+ runner = runner_factory(device)
+ runner.SetUp()
+ out_runners.append(runner)
+ except android_commands.errors.DeviceUnresponsiveError as e:
+ logging.warning('****Failed to create shard for %s: [%s]', (device, e))
def _RunAllTests(runners, tests):
@@ -70,28 +163,21 @@ def _RunAllTests(runners, tests):
tests: a list of Tests to run using the given TestRunners.
Returns:
- Tuple: (list of TestResults, list of tests to retry)
+ A TestResults object.
"""
- tests_queue = list(tests)
- workers = []
+ logging.warning('****Running %s tests with %s test runners.' %
+ (len(tests), len(runners)))
+ tests_collection = _TestCollection([_Test(t) for t in tests])
results = []
- retry = []
- for r in runners:
- worker = _Worker(r, tests_queue, results, retry)
- worker.start()
- workers.append(worker)
- while workers:
- for w in workers[:]:
- # Allow the main thread to periodically check for keyboard interrupts.
- w.join(0.1)
- if not w.isAlive():
- w.ReraiseIfException()
- workers.remove(w)
- return (results, retry)
+ workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread(
+ _RunTestsFromQueue, [r, tests_collection, results]) for r in runners])
+ workers.StartAll()
+ workers.JoinAll()
+ return test_result.TestResults.FromTestResults(results)
def _CreateRunners(runner_factory, devices):
- """Creates a test runner for each device.
+ """Creates a test runner for each device and calls SetUp() in parallel.
Note: if a device is unresponsive the corresponding TestRunner will not be
included in the returned list.
@@ -103,20 +189,29 @@ def _CreateRunners(runner_factory, devices):
Returns:
A list of TestRunner objects.
"""
+ logging.warning('****Creating %s test runners.' % len(devices))
test_runners = []
- for index, device in enumerate(devices):
- logging.warning('*' * 80)
- logging.warning('Creating shard %d for %s', index, device)
- logging.warning('*' * 80)
- try:
- test_runners.append(runner_factory(device))
- except android_commands.errors.DeviceUnresponsiveError as e:
- logging.warning('****Failed to create a shard: [%s]', e)
+ threads = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners])
+ for d in devices])
+ threads.StartAll()
+ threads.JoinAll()
return test_runners
-def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
- tries=3):
+def _TearDownRunners(runners):
+ """Calls TearDown() for each test runner in parallel.
+ Args:
+ runners: a list of TestRunner objects.
+ """
+ threads = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(runner.TearDown)
+ for runner in runners])
+ threads.StartAll()
+ threads.JoinAll()
+
+
+def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'):
"""Run all tests on attached devices, retrying tests that don't pass.
Args:
@@ -124,34 +219,18 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
devices: list of attached device serial numbers as strings.
tests: list of tests to run.
build_type: either 'Debug' or 'Release'.
- tries: number of tries before accepting failure.
Returns:
A test_result.TestResults object.
"""
- final_results = test_result.TestResults()
- results = test_result.TestResults()
forwarder.Forwarder.KillHost(build_type)
- try_count = 0
- while tests:
- devices = set(devices).intersection(android_commands.GetAttachedDevices())
- if not devices:
- # There are no visible devices attached, this is unrecoverable.
- msg = 'No devices attached and visible to run tests!'
- logging.critical(msg)
- raise Exception(msg)
- if try_count >= tries:
- # We've retried too many times, return the TestResults up to this point.
- results.ok = final_results.ok
- final_results = results
- break
- try_count += 1
- runners = _CreateRunners(runner_factory, devices)
+ runners = _CreateRunners(runner_factory, devices)
+ try:
+ return _RunAllTests(runners, tests)
+ finally:
try:
- results_list, tests = _RunAllTests(runners, tests)
- results = test_result.TestResults.FromTestResults(results_list)
- final_results.ok += results.ok
+ _TearDownRunners(runners)
except android_commands.errors.DeviceUnresponsiveError as e:
- logging.warning('****Failed to run test: [%s]', e)
- forwarder.Forwarder.KillHost(build_type)
- return final_results
+ logging.warning('****Device unresponsive during TearDown: [%s]', e)
+ finally:
+ forwarder.Forwarder.KillHost(build_type)
« no previous file with comments | « build/android/pylib/base/new_base_test_runner.py ('k') | build/android/pylib/base/shard_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698