Chromium Code Reviews| Index: build/android/pylib/base/shard.py |
| diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py |
| index b4e1281ef979537043fb42c15609ba6a158162c8..628629e466a58a9746513606f8738d9a704933db 100644 |
| --- a/build/android/pylib/base/shard.py |
| +++ b/build/android/pylib/base/shard.py |
| @@ -5,61 +5,142 @@ |
| """Implements test sharding logic.""" |
| import logging |
| -import sys |
| import threading |
| from pylib import android_commands |
| from pylib import forwarder |
| +from pylib.utils import reraiser_thread |
| import test_result |
| -class _Worker(threading.Thread): |
| - """Runs tests from the test_queue using the given runner in a separate thread. |
| +class _Test(object): |
| + """Holds a test with additional metadata.""" |
| + def __init__(self, test, tries=0): |
| + """Initializes the _Test object. |
| - Places results in the out_results. |
| + Args: |
| + test: the test. |
| + tries: number of tries so far. |
| + """ |
| + self.test = test |
| + self.tries = tries |
| + |
| + |
| +class _TestQueue(object): |
| + """A queue that implements specific blocking semantics. |
| + |
| + Args: |
| + items: items to put in the queue. |
| """ |
| - def __init__(self, runner, test_queue, out_results, out_retry): |
| - """Initializes the worker. |
| + def __init__(self, items=[]): |
| + self._lock = threading.Lock() |
| + self._items = list(items) |
| + self._incomplete_count = len(self._items) |
|
frankf
2013/02/26 00:57:07
incomplete_count -> tests_in_progress
craigdh
2013/02/26 01:23:16
Done.
|
| + self._can_pop = threading.Event() |
|
frankf
2013/02/26 00:57:07
This event is signalling two things 1) when an ite
craigdh
2013/02/26 01:23:16
Improved naming and added comments.
|
| + self._can_pop.set() |
|
frankf
2013/02/26 00:57:07
what if items is empty?
craigdh
2013/02/26 01:23:16
It got cleared if someone went to pop and found th
|
| - Args: |
| - runner: A TestRunner object used to run the tests. |
| - test_queue: A list from which to get tests to run. |
| - out_results: A list to add TestResults to. |
| - out_retry: A list to add tests to retry. |
| + def pop(self): |
| + """Pop an item from the queue. |
| + |
| + Waits until an item is avaliable or all items have been handled. |
| + |
| + Returns: |
| + An item or None if all items have been handled. |
| """ |
| - super(_Worker, self).__init__() |
| - self.daemon = True |
| - self._exc_info = None |
| - self._runner = runner |
| - self._test_queue = test_queue |
| - self._out_results = out_results |
| - self._out_retry = out_retry |
| - |
| - #override |
| - def run(self): |
| - """Run tests from the queue in a seperate thread until it is empty. |
| - |
| - Adds TestResults objects to the out_results list and may add tests to the |
| - out_retry list. |
| + while True: |
| + self._can_pop.wait() |
| + with self._lock: |
| + if self._incomplete_count == 0: |
| + return None |
| + try: |
| + return self._items.pop() |
| + except IndexError: |
| + self._can_pop.clear() |
| + |
| + def add(self, item): |
| + """Add an item to the queue. |
| + |
| + Args: |
| + item: An item to add. |
| """ |
| + with self._lock: |
| + self._items.append(item) |
| + self._can_pop.set() |
| + self._incomplete_count += 1 |
| + |
| + def task_done(self): |
| + """Indicate that a queue item has been fully handled.""" |
| + with self._lock: |
| + self._incomplete_count -= 1 |
| + if self._incomplete_count == 0: |
| + self._can_pop.set() |
| + assert self._incomplete_count >= 0 |
| + |
| + def __iter__(self): |
| + """Iterate through items in the queue until all items have been handled.""" |
| + while True: |
| + r = self.pop() |
| + if r is None: |
| + break |
| + yield r |
| + |
| + |
| +def _RunTestsFromQueue(runner, test_queue, out_results): |
| + """Runs tests from the test_queue until empty using the given runner. |
| + |
| + Adds TestResults objects to the out_results list and may add tests to the |
| + out_retry list. |
| + |
| + Args: |
| + runner: A TestRunner object used to run the tests. |
| + test_queue: A _TestQueue from which to get _Test objects to run. |
| + out_results: A list to add TestResults to. |
| + """ |
| + for test in test_queue: |
| + if not android_commands.IsDeviceAttached(runner.device): |
| + raise android_commands.errors.DeviceUnresponsiveError( |
| + 'Device %s is unresponsive.' % runner.device) |
| try: |
| - while True: |
| - test = self._test_queue.pop() |
| - result, retry = self._runner.Run(test) |
| - self._out_results.append(result) |
| - if retry: |
| - self._out_retry.append(retry) |
| - except IndexError: |
| - pass |
| + result, retry = runner.RunTest(test.test) |
| + # TODO(frankf): Don't break TestResults encapsulation. |
| + out_results.append(test_result.TestResults.FromRun(ok=result.ok)) |
| + if retry: |
| + if test.tries == 2: |
| + # Out of retries, store results. |
| + result.ok = [] |
| + out_results.append(result) |
| + else: |
| + # Retry, don't store results. |
| + logging.warning('****Retrying test, retry #%s.' % (test.tries + 1)) |
| + test_queue.add(_Test(test=retry, tries=test.tries + 1)) |
| + except android_commands.errors.DeviceUnresponsiveError: |
| + test_queue.add(test) |
| except: |
| - self._exc_info = sys.exc_info() |
| + test_queue.add(test) |
| raise |
| + finally: |
| + test_queue.task_done() |
| - def ReraiseIfException(self): |
| - """Reraise exception if an exception was raised in the thread.""" |
| - if self._exc_info: |
| - raise self._exc_info[0], self._exc_info[1], self._exc_info[2] |
| + |
| +def _SetUp(runner_factory, device, out_runners): |
| + """Creates a test runner for each device and calls SetUp() in parallel. |
| + |
| + Note: if a device is unresponsive the corresponding TestRunner will not be |
| + added to out_runners. |
| + |
| + Args: |
| + runner_factory: callable that takes a device and returns a TestRunner. |
| + device: the device serial number to set up. |
| + out_runners: list to add the successfully set up TestRunner object. |
| + """ |
| + try: |
| + logging.warning('*****Creating shard for %s.', device) |
| + runner = runner_factory(device) |
| + runner.SetUp() |
| + out_runners.append(runner) |
| + except android_commands.errors.DeviceUnresponsiveError as e: |
| + logging.warning('****Failed to create shard for %s: [%s]', (device, e)) |
| def _RunAllTests(runners, tests): |
| @@ -70,28 +151,21 @@ def _RunAllTests(runners, tests): |
| tests: a list of Tests to run using the given TestRunners. |
| Returns: |
| - Tuple: (list of TestResults, list of tests to retry) |
| + A TestResults object. |
| """ |
| - tests_queue = list(tests) |
| - workers = [] |
| + logging.warning('****Running %s tests with %s test runners.' % |
| + (len(tests), len(runners))) |
| + tests_queue = _TestQueue([_Test(t) for t in tests]) |
| results = [] |
| - retry = [] |
| - for r in runners: |
| - worker = _Worker(r, tests_queue, results, retry) |
| - worker.start() |
| - workers.append(worker) |
| - while workers: |
| - for w in workers[:]: |
| - # Allow the main thread to periodically check for keyboard interrupts. |
| - w.join(0.1) |
| - if not w.isAlive(): |
| - w.ReraiseIfException() |
| - workers.remove(w) |
| - return (results, retry) |
| + workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread( |
| + _RunTestsFromQueue, [r, tests_queue, results]) for r in runners]) |
| + workers.StartAll() |
| + workers.JoinAll() |
| + return test_result.TestResults.FromTestResults(results) |
| def _CreateRunners(runner_factory, devices): |
| - """Creates a test runner for each device. |
| + """Creates a test runner for each device and calls SetUp() in parallel. |
| Note: if a device is unresponsive the corresponding TestRunner will not be |
| included in the returned list. |
| @@ -103,20 +177,29 @@ def _CreateRunners(runner_factory, devices): |
| Returns: |
| A list of TestRunner objects. |
| """ |
| + logging.warning('****Creating %s test runners.' % len(devices)) |
| test_runners = [] |
| - for index, device in enumerate(devices): |
| - logging.warning('*' * 80) |
| - logging.warning('Creating shard %d for %s', index, device) |
| - logging.warning('*' * 80) |
| - try: |
| - test_runners.append(runner_factory(device)) |
| - except android_commands.errors.DeviceUnresponsiveError as e: |
| - logging.warning('****Failed to create a shard: [%s]', e) |
| + threads = reraiser_thread.ReraiserThreadGroup( |
| + [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners]) |
| + for d in devices]) |
| + threads.StartAll() |
| + threads.JoinAll() |
| return test_runners |
| -def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', |
| - tries=3): |
| +def _TearDownRunners(runners): |
| + """Calls TearDown() for each test runner in parallel. |
| + Args: |
| + runners: a list of TestRunner objects. |
| + """ |
| + threads = reraiser_thread.ReraiserThreadGroup( |
| + [reraiser_thread.ReraiserThread(runner.TearDown) |
| + for runner in runners]) |
| + threads.StartAll() |
| + threads.JoinAll() |
| + |
| + |
| +def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'): |
| """Run all tests on attached devices, retrying tests that don't pass. |
| Args: |
| @@ -124,34 +207,18 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', |
| devices: list of attached device serial numbers as strings. |
| tests: list of tests to run. |
| build_type: either 'Debug' or 'Release'. |
| - tries: number of tries before accepting failure. |
| Returns: |
| A test_result.TestResults object. |
| """ |
| - final_results = test_result.TestResults() |
| - results = test_result.TestResults() |
| forwarder.Forwarder.KillHost(build_type) |
| - try_count = 0 |
| - while tests: |
| - devices = set(devices).intersection(android_commands.GetAttachedDevices()) |
| - if not devices: |
| - # There are no visible devices attached, this is unrecoverable. |
| - msg = 'No devices attached and visible to run tests!' |
| - logging.critical(msg) |
| - raise Exception(msg) |
| - if try_count >= tries: |
| - # We've retried too many times, return the TestResults up to this point. |
| - results.ok = final_results.ok |
| - final_results = results |
| - break |
| - try_count += 1 |
| - runners = _CreateRunners(runner_factory, devices) |
| + runners = _CreateRunners(runner_factory, devices) |
| + try: |
| + return _RunAllTests(runners, tests) |
| + finally: |
| try: |
| - results_list, tests = _RunAllTests(runners, tests) |
| - results = test_result.TestResults.FromTestResults(results_list) |
| - final_results.ok += results.ok |
| + _TearDownRunners(runners) |
| except android_commands.errors.DeviceUnresponsiveError as e: |
| - logging.warning('****Failed to run test: [%s]', e) |
| - forwarder.Forwarder.KillHost(build_type) |
| - return final_results |
| + logging.warning('****Device unresponsive during TearDown: [%s]', e) |
| + finally: |
| + forwarder.Forwarder.KillHost(build_type) |