| Index: build/android/pylib/base/test_dispatcher.py
|
| diff --git a/build/android/pylib/base/test_dispatcher.py b/build/android/pylib/base/test_dispatcher.py
|
| index 7b00ccd51f6b2ce9fee997ab0631a8408d58c3bd..929c408ec50245c749cddf45800e68fae69344db 100644
|
| --- a/build/android/pylib/base/test_dispatcher.py
|
| +++ b/build/android/pylib/base/test_dispatcher.py
|
| @@ -24,6 +24,7 @@ import threading
|
| from pylib import android_commands
|
| from pylib import constants
|
| from pylib.base import base_test_result
|
| +from pylib.base import test_collection
|
| from pylib.device import device_errors
|
| from pylib.utils import reraiser_thread
|
| from pylib.utils import watchdog_timer
|
| @@ -65,92 +66,16 @@ class _Test(object):
|
| self.tries = tries
|
|
|
|
|
| -class _TestCollection(object):
|
| - """A threadsafe collection of tests.
|
| -
|
| - Args:
|
| - tests: List of tests to put in the collection.
|
| - """
|
| -
|
| - def __init__(self, tests=None):
|
| - if not tests:
|
| - tests = []
|
| - self._lock = threading.Lock()
|
| - self._tests = []
|
| - self._tests_in_progress = 0
|
| - # Used to signal that an item is available or all items have been handled.
|
| - self._item_available_or_all_done = threading.Event()
|
| - for t in tests:
|
| - self.add(t)
|
| -
|
| - def _pop(self):
|
| - """Pop a test from the collection.
|
| -
|
| - Waits until a test is available or all tests have been handled.
|
| -
|
| - Returns:
|
| - A test or None if all tests have been handled.
|
| - """
|
| - while True:
|
| - # Wait for a test to be available or all tests to have been handled.
|
| - self._item_available_or_all_done.wait()
|
| - with self._lock:
|
| - # Check which of the two conditions triggered the signal.
|
| - if self._tests_in_progress == 0:
|
| - return None
|
| - try:
|
| - return self._tests.pop(0)
|
| - except IndexError:
|
| - # Another thread beat us to the available test, wait again.
|
| - self._item_available_or_all_done.clear()
|
| -
|
| - def add(self, test):
|
| - """Add an test to the collection.
|
| -
|
| - Args:
|
| - test: A test to add.
|
| - """
|
| - with self._lock:
|
| - self._tests.append(test)
|
| - self._item_available_or_all_done.set()
|
| - self._tests_in_progress += 1
|
| -
|
| - def test_completed(self):
|
| - """Indicate that a test has been fully handled."""
|
| - with self._lock:
|
| - self._tests_in_progress -= 1
|
| - if self._tests_in_progress == 0:
|
| - # All tests have been handled, signal all waiting threads.
|
| - self._item_available_or_all_done.set()
|
| -
|
| - def __iter__(self):
|
| - """Iterate through tests in the collection until all have been handled."""
|
| - while True:
|
| - r = self._pop()
|
| - if r is None:
|
| - break
|
| - yield r
|
| -
|
| - def __len__(self):
|
| - """Return the number of tests currently in the collection."""
|
| - return len(self._tests)
|
| -
|
| - def test_names(self):
|
| - """Return a list of the names of the tests currently in the collection."""
|
| - with self._lock:
|
| - return list(t.test for t in self._tests)
|
| -
|
| -
|
| -def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
|
| +def _RunTestsFromQueue(runner, collection, out_results, watcher,
|
| num_retries, tag_results_with_device=False):
|
| - """Runs tests from the test_collection until empty using the given runner.
|
| + """Runs tests from the collection until empty using the given runner.
|
|
|
| Adds TestRunResults objects to the out_results list and may add tests to the
|
| out_retry list.
|
|
|
| Args:
|
| runner: A TestRunner object used to run the tests.
|
| - test_collection: A _TestCollection from which to get _Test objects to run.
|
| + collection: A TestCollection from which to get _Test objects to run.
|
| out_results: A list to add TestRunResults to.
|
| watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
|
| num_retries: Number of retries for a test.
|
| @@ -174,7 +99,7 @@ def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
|
| new_test_run_results.AddResult(test_result)
|
| return new_test_run_results
|
|
|
| - for test in test_collection:
|
| + for test in collection:
|
| watcher.Reset()
|
| try:
|
| if runner.device_serial not in android_commands.GetAttachedDevices():
|
| @@ -192,18 +117,18 @@ def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
|
| pass_results.AddResults(result.GetPass())
|
| out_results.append(pass_results)
|
| logging.warning('Will retry test, try #%s.' % test.tries)
|
| - test_collection.add(_Test(test=retry, tries=test.tries))
|
| + collection.add(_Test(test=retry, tries=test.tries))
|
| else:
|
| # All tests passed or retry limit reached. Either way, record results.
|
| out_results.append(result)
|
| except:
|
| # An unhandleable exception, ensure tests get run by another device and
|
| # reraise this exception on the main thread.
|
| - test_collection.add(test)
|
| + collection.add(test)
|
| raise
|
| finally:
|
| # Retries count as separate tasks so always mark the popped test as done.
|
| - test_collection.test_completed()
|
| + collection.test_completed()
|
|
|
|
|
| def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
|
| @@ -238,7 +163,7 @@ def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
|
|
|
| Args:
|
| runners: A list of TestRunner objects.
|
| - test_collection_factory: A callable to generate a _TestCollection object for
|
| + test_collection_factory: A callable to generate a TestCollection object for
|
| each test runner.
|
| num_retries: Number of retries for a test.
|
| timeout: Watchdog timeout in seconds.
|
| @@ -384,16 +309,17 @@ def RunTests(tests, runner_factory, devices, shard=True,
|
|
|
| tests_expanded = ApplyMaxPerRun(tests, max_per_run)
|
| if shard:
|
| - # Generate a shared _TestCollection object for all test runners, so they
|
| + # Generate a shared TestCollection object for all test runners, so they
|
| # draw from a common pool of tests.
|
| - shared_test_collection = _TestCollection([_Test(t) for t in tests_expanded])
|
| + shared_test_collection = test_collection.TestCollection(
|
| + [_Test(t) for t in tests_expanded])
|
| test_collection_factory = lambda: shared_test_collection
|
| tag_results_with_device = False
|
| log_string = 'sharded across devices'
|
| else:
|
| - # Generate a unique _TestCollection object for each test runner, but use
|
| + # Generate a unique TestCollection object for each test runner, but use
|
| # the same set of tests.
|
| - test_collection_factory = lambda: _TestCollection(
|
| + test_collection_factory = lambda: test_collection.TestCollection(
|
| [_Test(t) for t in tests_expanded])
|
| tag_results_with_device = True
|
| log_string = 'replicated on each device'
|
|
|