Index: build/android/pylib/base/shard.py |
diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py |
index b4e1281ef979537043fb42c15609ba6a158162c8..7e45f89151d674cb34a9a633088decbb8b873214 100644 |
--- a/build/android/pylib/base/shard.py |
+++ b/build/android/pylib/base/shard.py |
@@ -5,61 +5,55 @@ |
"""Implements test sharding logic.""" |
import logging |
-import sys |
-import threading |
from pylib import android_commands |
from pylib import forwarder |
+from pylib.utils import reraiser_thread |
import test_result |
-class _Worker(threading.Thread): |
- """Runs tests from the test_queue using the given runner in a separate thread. |
+def _RunTestsFromQueue(runner, test_queue, out_results, out_retry): |
+ """Runs tests from the test_queue until empty using the given runner. |
- Places results in the out_results. |
+ Adds TestResults objects to the out_results list and may add tests to the |
+ out_retry list. |
+ |
+ Args: |
+ runner: A TestRunner object used to run the tests. |
+ test_queue: A list from which to get tests to run. |
+ out_results: A list to add TestResults to. |
+ out_retry: A list to add tests to retry. |
""" |
- def __init__(self, runner, test_queue, out_results, out_retry): |
- """Initializes the worker. |
- |
- Args: |
- runner: A TestRunner object used to run the tests. |
- test_queue: A list from which to get tests to run. |
- out_results: A list to add TestResults to. |
- out_retry: A list to add tests to retry. |
- """ |
- super(_Worker, self).__init__() |
- self.daemon = True |
- self._exc_info = None |
- self._runner = runner |
- self._test_queue = test_queue |
- self._out_results = out_results |
- self._out_retry = out_retry |
- |
- #override |
- def run(self): |
- """Run tests from the queue in a seperate thread until it is empty. |
- |
- Adds TestResults objects to the out_results list and may add tests to the |
- out_retry list. |
- """ |
- try: |
- while True: |
- test = self._test_queue.pop() |
- result, retry = self._runner.Run(test) |
- self._out_results.append(result) |
- if retry: |
- self._out_retry.append(retry) |
- except IndexError: |
- pass |
- except: |
- self._exc_info = sys.exc_info() |
- raise |
- |
- def ReraiseIfException(self): |
- """Reraise exception if an exception was raised in the thread.""" |
- if self._exc_info: |
- raise self._exc_info[0], self._exc_info[1], self._exc_info[2] |
+ try: |
+ while True: |
+ test = test_queue.pop() |
+ result, retry = runner.RunTest(test) |
+ out_results.append(result) |
+ if retry: |
+ out_retry.append(retry) |
+ except IndexError: |
+ pass |
+ |
+ |
+def _SetUp(runner_factory, device, out_runners): |
+ """Creates a test runner for each device and calls SetUp() in parallel. |
+ |
+ Note: if a device is unresponsive the corresponding TestRunner will not be |
+ added to out_runners. |
+ |
+ Args: |
+ runner_factory: callable that takes a device and returns a TestRunner. |
+ device: the device serial number to set up. |
+ out_runners: list to add the successfully set up TestRunner object. |
+ """ |
+ try: |
+ logging.warning('****Creating shard for %s', device) |
+ runner = runner_factory(device) |
+ runner.SetUp() |
+ out_runners.append(runner) |
+ except android_commands.errors.DeviceUnresponsiveError as e: |
+ logging.warning('****Failed to create shard for %s: [%s]', (device, e)) |
def _RunAllTests(runners, tests): |
@@ -73,25 +67,17 @@ def _RunAllTests(runners, tests): |
Tuple: (list of TestResults, list of tests to retry) |
""" |
tests_queue = list(tests) |
- workers = [] |
results = [] |
retry = [] |
- for r in runners: |
- worker = _Worker(r, tests_queue, results, retry) |
- worker.start() |
- workers.append(worker) |
- while workers: |
- for w in workers[:]: |
- # Allow the main thread to periodically check for keyboard interrupts. |
- w.join(0.1) |
- if not w.isAlive(): |
- w.ReraiseIfException() |
- workers.remove(w) |
+ workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread( |
+ _RunTestsFromQueue, [r, tests_queue, results, retry]) for r in runners]) |
+ workers.StartAll() |
+ workers.JoinAll() |
return (results, retry) |
def _CreateRunners(runner_factory, devices): |
- """Creates a test runner for each device. |
+ """Creates a test runner for each device and calls SetUp() in parallel. |
Note: if a device is unresponsive the corresponding TestRunner will not be |
included in the returned list. |
@@ -104,17 +90,26 @@ def _CreateRunners(runner_factory, devices): |
A list of TestRunner objects. |
""" |
test_runners = [] |
- for index, device in enumerate(devices): |
- logging.warning('*' * 80) |
- logging.warning('Creating shard %d for %s', index, device) |
- logging.warning('*' * 80) |
- try: |
- test_runners.append(runner_factory(device)) |
- except android_commands.errors.DeviceUnresponsiveError as e: |
- logging.warning('****Failed to create a shard: [%s]', e) |
+ threads = reraiser_thread.ReraiserThreadGroup( |
+ [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners]) |
+ for d in devices]) |
+ threads.StartAll() |
+ threads.JoinAll() |
frankf
2013/02/22 02:42:55
For all these JoinAll invokations, we need to thin
craigdh
2013/02/22 17:24:20
I thought about it a bit. Here's what's currently
|
return test_runners |
+def _TearDownRunners(runners): |
+ """Calls TearDown() for each test runner in parallel. |
+ Args: |
+ runners: a list of TestRunner objects. |
+ """ |
+ threads = reraiser_thread.ReraiserThreadGroup( |
+ [reraiser_thread.ReraiserThread(runner.TearDown) |
+ for runner in runners]) |
+ threads.StartAll() |
+ threads.JoinAll() |
+ |
+ |
def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', |
tries=3): |
"""Run all tests on attached devices, retrying tests that don't pass. |
@@ -133,25 +128,29 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', |
results = test_result.TestResults() |
forwarder.Forwarder.KillHost(build_type) |
try_count = 0 |
- while tests: |
- devices = set(devices).intersection(android_commands.GetAttachedDevices()) |
- if not devices: |
- # There are no visible devices attached, this is unrecoverable. |
- msg = 'No devices attached and visible to run tests!' |
- logging.critical(msg) |
- raise Exception(msg) |
- if try_count >= tries: |
- # We've retried too many times, return the TestResults up to this point. |
- results.ok = final_results.ok |
- final_results = results |
- break |
- try_count += 1 |
- runners = _CreateRunners(runner_factory, devices) |
- try: |
- results_list, tests = _RunAllTests(runners, tests) |
- results = test_result.TestResults.FromTestResults(results_list) |
- final_results.ok += results.ok |
- except android_commands.errors.DeviceUnresponsiveError as e: |
- logging.warning('****Failed to run test: [%s]', e) |
+ runners = _CreateRunners(runner_factory, set(devices)) |
nilesh
2013/02/22 02:00:29
I think the numbers of runners we create should al
craigdh
2013/02/22 17:24:20
There's an issue, though, if devices fail. I think
|
+ try: |
+ while tests: |
+ devices = set(devices).intersection(android_commands.GetAttachedDevices()) |
+ runners = [r for r in runners if r.device in devices] |
+ if not devices: |
+ # There are no visible devices attached, this is unrecoverable. |
+ msg = 'No devices attached and visible to run tests!' |
+ logging.critical(msg) |
+ raise Exception(msg) |
+ if try_count >= tries: |
+ # We've retried too many times, return the TestResults up to this point. |
+ results.ok = final_results.ok |
+ final_results = results |
+ break |
+ try_count += 1 |
+ try: |
+ results_list, tests = _RunAllTests(runners, tests) |
+ results = test_result.TestResults.FromTestResults(results_list) |
+ final_results.ok += results.ok |
+ except android_commands.errors.DeviceUnresponsiveError as e: |
+ logging.warning('****Failed to run test: [%s]', e) |
+ finally: |
+ _TearDownRunners(runners) |
forwarder.Forwarder.KillHost(build_type) |
return final_results |