Index: build/android/pylib/base/test_dispatcher.py |
diff --git a/build/android/pylib/base/test_dispatcher.py b/build/android/pylib/base/test_dispatcher.py |
index 7b00ccd51f6b2ce9fee997ab0631a8408d58c3bd..929c408ec50245c749cddf45800e68fae69344db 100644 |
--- a/build/android/pylib/base/test_dispatcher.py |
+++ b/build/android/pylib/base/test_dispatcher.py |
@@ -24,6 +24,7 @@ import threading |
from pylib import android_commands |
from pylib import constants |
from pylib.base import base_test_result |
+from pylib.base import test_collection |
from pylib.device import device_errors |
from pylib.utils import reraiser_thread |
from pylib.utils import watchdog_timer |
@@ -65,92 +66,16 @@ class _Test(object): |
self.tries = tries |
-class _TestCollection(object): |
- """A threadsafe collection of tests. |
- |
- Args: |
- tests: List of tests to put in the collection. |
- """ |
- |
- def __init__(self, tests=None): |
- if not tests: |
- tests = [] |
- self._lock = threading.Lock() |
- self._tests = [] |
- self._tests_in_progress = 0 |
- # Used to signal that an item is available or all items have been handled. |
- self._item_available_or_all_done = threading.Event() |
- for t in tests: |
- self.add(t) |
- |
- def _pop(self): |
- """Pop a test from the collection. |
- |
- Waits until a test is available or all tests have been handled. |
- |
- Returns: |
- A test or None if all tests have been handled. |
- """ |
- while True: |
- # Wait for a test to be available or all tests to have been handled. |
- self._item_available_or_all_done.wait() |
- with self._lock: |
- # Check which of the two conditions triggered the signal. |
- if self._tests_in_progress == 0: |
- return None |
- try: |
- return self._tests.pop(0) |
- except IndexError: |
- # Another thread beat us to the available test, wait again. |
- self._item_available_or_all_done.clear() |
- |
- def add(self, test): |
- """Add an test to the collection. |
- |
- Args: |
- test: A test to add. |
- """ |
- with self._lock: |
- self._tests.append(test) |
- self._item_available_or_all_done.set() |
- self._tests_in_progress += 1 |
- |
- def test_completed(self): |
- """Indicate that a test has been fully handled.""" |
- with self._lock: |
- self._tests_in_progress -= 1 |
- if self._tests_in_progress == 0: |
- # All tests have been handled, signal all waiting threads. |
- self._item_available_or_all_done.set() |
- |
- def __iter__(self): |
- """Iterate through tests in the collection until all have been handled.""" |
- while True: |
- r = self._pop() |
- if r is None: |
- break |
- yield r |
- |
- def __len__(self): |
- """Return the number of tests currently in the collection.""" |
- return len(self._tests) |
- |
- def test_names(self): |
- """Return a list of the names of the tests currently in the collection.""" |
- with self._lock: |
- return list(t.test for t in self._tests) |
- |
- |
-def _RunTestsFromQueue(runner, test_collection, out_results, watcher, |
+def _RunTestsFromQueue(runner, collection, out_results, watcher, |
num_retries, tag_results_with_device=False): |
- """Runs tests from the test_collection until empty using the given runner. |
+ """Runs tests from the collection until empty using the given runner. |
Adds TestRunResults objects to the out_results list and may add tests to the |
out_retry list. |
Args: |
runner: A TestRunner object used to run the tests. |
- test_collection: A _TestCollection from which to get _Test objects to run. |
+ collection: A TestCollection from which to get _Test objects to run. |
out_results: A list to add TestRunResults to. |
watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. |
num_retries: Number of retries for a test. |
@@ -174,7 +99,7 @@ def _RunTestsFromQueue(runner, test_collection, out_results, watcher, |
new_test_run_results.AddResult(test_result) |
return new_test_run_results |
- for test in test_collection: |
+ for test in collection: |
watcher.Reset() |
try: |
if runner.device_serial not in android_commands.GetAttachedDevices(): |
@@ -192,18 +117,18 @@ def _RunTestsFromQueue(runner, test_collection, out_results, watcher, |
pass_results.AddResults(result.GetPass()) |
out_results.append(pass_results) |
logging.warning('Will retry test, try #%s.' % test.tries) |
- test_collection.add(_Test(test=retry, tries=test.tries)) |
+ collection.add(_Test(test=retry, tries=test.tries)) |
else: |
# All tests passed or retry limit reached. Either way, record results. |
out_results.append(result) |
except: |
# An unhandleable exception, ensure tests get run by another device and |
# reraise this exception on the main thread. |
- test_collection.add(test) |
+ collection.add(test) |
raise |
finally: |
# Retries count as separate tasks so always mark the popped test as done. |
- test_collection.test_completed() |
+ collection.test_completed() |
def _SetUp(runner_factory, device, out_runners, threadsafe_counter): |
@@ -238,7 +163,7 @@ def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None, |
Args: |
runners: A list of TestRunner objects. |
- test_collection_factory: A callable to generate a _TestCollection object for |
+ test_collection_factory: A callable to generate a TestCollection object for |
each test runner. |
num_retries: Number of retries for a test. |
timeout: Watchdog timeout in seconds. |
@@ -384,16 +309,17 @@ def RunTests(tests, runner_factory, devices, shard=True, |
tests_expanded = ApplyMaxPerRun(tests, max_per_run) |
if shard: |
- # Generate a shared _TestCollection object for all test runners, so they |
+ # Generate a shared TestCollection object for all test runners, so they |
# draw from a common pool of tests. |
- shared_test_collection = _TestCollection([_Test(t) for t in tests_expanded]) |
+ shared_test_collection = test_collection.TestCollection( |
+ [_Test(t) for t in tests_expanded]) |
test_collection_factory = lambda: shared_test_collection |
tag_results_with_device = False |
log_string = 'sharded across devices' |
else: |
- # Generate a unique _TestCollection object for each test runner, but use |
+ # Generate a unique TestCollection object for each test runner, but use |
# the same set of tests. |
- test_collection_factory = lambda: _TestCollection( |
+ test_collection_factory = lambda: test_collection.TestCollection( |
[_Test(t) for t in tests_expanded]) |
tag_results_with_device = True |
log_string = 'replicated on each device' |