Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4047)

Unified Diff: build/android/pylib/base/shard.py

Issue 12317059: [Andoid] Threaded TestRunner creation and SetUp and TearDown calls. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: nit Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « build/android/pylib/base/new_base_test_runner.py ('k') | build/android/pylib/base/shard_unittest.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: build/android/pylib/base/shard.py
diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py
index b4e1281ef979537043fb42c15609ba6a158162c8..628629e466a58a9746513606f8738d9a704933db 100644
--- a/build/android/pylib/base/shard.py
+++ b/build/android/pylib/base/shard.py
@@ -5,61 +5,142 @@
"""Implements test sharding logic."""
import logging
-import sys
import threading
from pylib import android_commands
from pylib import forwarder
+from pylib.utils import reraiser_thread
import test_result
-class _Worker(threading.Thread):
- """Runs tests from the test_queue using the given runner in a separate thread.
+class _Test(object):
+ """Holds a test with additional metadata."""
+ def __init__(self, test, tries=0):
+ """Initializes the _Test object.
- Places results in the out_results.
+ Args:
+ test: the test.
+ tries: number of tries so far.
+ """
+ self.test = test
+ self.tries = tries
+
+
+class _TestQueue(object):
+ """A queue that implements specific blocking semantics.
+
+ Args:
+ items: items to put in the queue.
"""
- def __init__(self, runner, test_queue, out_results, out_retry):
- """Initializes the worker.
+ def __init__(self, items=[]):
+ self._lock = threading.Lock()
+ self._items = list(items)
+ self._incomplete_count = len(self._items)
frankf 2013/02/26 00:57:07 incomplete_count -> tests_in_progress
craigdh 2013/02/26 01:23:16 Done.
+ self._can_pop = threading.Event()
frankf 2013/02/26 00:57:07 This event is signalling two things 1) when an ite
craigdh 2013/02/26 01:23:16 Improved naming and added comments.
+ self._can_pop.set()
frankf 2013/02/26 00:57:07 what if items is empty?
craigdh 2013/02/26 01:23:16 It got cleared if someone went to pop and found th
- Args:
- runner: A TestRunner object used to run the tests.
- test_queue: A list from which to get tests to run.
- out_results: A list to add TestResults to.
- out_retry: A list to add tests to retry.
+ def pop(self):
+ """Pop an item from the queue.
+
+ Waits until an item is avaliable or all items have been handled.
+
+ Returns:
+ An item or None if all items have been handled.
"""
- super(_Worker, self).__init__()
- self.daemon = True
- self._exc_info = None
- self._runner = runner
- self._test_queue = test_queue
- self._out_results = out_results
- self._out_retry = out_retry
-
- #override
- def run(self):
- """Run tests from the queue in a seperate thread until it is empty.
-
- Adds TestResults objects to the out_results list and may add tests to the
- out_retry list.
+ while True:
+ self._can_pop.wait()
+ with self._lock:
+ if self._incomplete_count == 0:
+ return None
+ try:
+ return self._items.pop()
+ except IndexError:
+ self._can_pop.clear()
+
+ def add(self, item):
+ """Add an item to the queue.
+
+ Args:
+ item: An item to add.
"""
+ with self._lock:
+ self._items.append(item)
+ self._can_pop.set()
+ self._incomplete_count += 1
+
+ def task_done(self):
+ """Indicate that a queue item has been fully handled."""
+ with self._lock:
+ self._incomplete_count -= 1
+ if self._incomplete_count == 0:
+ self._can_pop.set()
+ assert self._incomplete_count >= 0
+
+ def __iter__(self):
+ """Iterate through items in the queue until all items have been handled."""
+ while True:
+ r = self.pop()
+ if r is None:
+ break
+ yield r
+
+
+def _RunTestsFromQueue(runner, test_queue, out_results):
+ """Runs tests from the test_queue until empty using the given runner.
+
+ Adds TestResults objects to the out_results list and may add tests to the
+ out_retry list.
+
+ Args:
+ runner: A TestRunner object used to run the tests.
+ test_queue: A _TestQueue from which to get _Test objects to run.
+ out_results: A list to add TestResults to.
+ """
+ for test in test_queue:
+ if not android_commands.IsDeviceAttached(runner.device):
+ raise android_commands.errors.DeviceUnresponsiveError(
+ 'Device %s is unresponsive.' % runner.device)
try:
- while True:
- test = self._test_queue.pop()
- result, retry = self._runner.Run(test)
- self._out_results.append(result)
- if retry:
- self._out_retry.append(retry)
- except IndexError:
- pass
+ result, retry = runner.RunTest(test.test)
+ # TODO(frankf): Don't break TestResults encapsulation.
+ out_results.append(test_result.TestResults.FromRun(ok=result.ok))
+ if retry:
+ if test.tries == 2:
+ # Out of retries, store results.
+ result.ok = []
+ out_results.append(result)
+ else:
+ # Retry, don't store results.
+ logging.warning('****Retrying test, retry #%s.' % (test.tries + 1))
+ test_queue.add(_Test(test=retry, tries=test.tries + 1))
+ except android_commands.errors.DeviceUnresponsiveError:
+ test_queue.add(test)
except:
- self._exc_info = sys.exc_info()
+ test_queue.add(test)
raise
+ finally:
+ test_queue.task_done()
- def ReraiseIfException(self):
- """Reraise exception if an exception was raised in the thread."""
- if self._exc_info:
- raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
+
+def _SetUp(runner_factory, device, out_runners):
+ """Creates a test runner for each device and calls SetUp() in parallel.
+
+ Note: if a device is unresponsive the corresponding TestRunner will not be
+ added to out_runners.
+
+ Args:
+ runner_factory: callable that takes a device and returns a TestRunner.
+ device: the device serial number to set up.
+ out_runners: list to add the successfully set up TestRunner object.
+ """
+ try:
+ logging.warning('*****Creating shard for %s.', device)
+ runner = runner_factory(device)
+ runner.SetUp()
+ out_runners.append(runner)
+ except android_commands.errors.DeviceUnresponsiveError as e:
+ logging.warning('****Failed to create shard for %s: [%s]', (device, e))
def _RunAllTests(runners, tests):
@@ -70,28 +151,21 @@ def _RunAllTests(runners, tests):
tests: a list of Tests to run using the given TestRunners.
Returns:
- Tuple: (list of TestResults, list of tests to retry)
+ A TestResults object.
"""
- tests_queue = list(tests)
- workers = []
+ logging.warning('****Running %s tests with %s test runners.' %
+ (len(tests), len(runners)))
+ tests_queue = _TestQueue([_Test(t) for t in tests])
results = []
- retry = []
- for r in runners:
- worker = _Worker(r, tests_queue, results, retry)
- worker.start()
- workers.append(worker)
- while workers:
- for w in workers[:]:
- # Allow the main thread to periodically check for keyboard interrupts.
- w.join(0.1)
- if not w.isAlive():
- w.ReraiseIfException()
- workers.remove(w)
- return (results, retry)
+ workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread(
+ _RunTestsFromQueue, [r, tests_queue, results]) for r in runners])
+ workers.StartAll()
+ workers.JoinAll()
+ return test_result.TestResults.FromTestResults(results)
def _CreateRunners(runner_factory, devices):
- """Creates a test runner for each device.
+ """Creates a test runner for each device and calls SetUp() in parallel.
Note: if a device is unresponsive the corresponding TestRunner will not be
included in the returned list.
@@ -103,20 +177,29 @@ def _CreateRunners(runner_factory, devices):
Returns:
A list of TestRunner objects.
"""
+ logging.warning('****Creating %s test runners.' % len(devices))
test_runners = []
- for index, device in enumerate(devices):
- logging.warning('*' * 80)
- logging.warning('Creating shard %d for %s', index, device)
- logging.warning('*' * 80)
- try:
- test_runners.append(runner_factory(device))
- except android_commands.errors.DeviceUnresponsiveError as e:
- logging.warning('****Failed to create a shard: [%s]', e)
+ threads = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners])
+ for d in devices])
+ threads.StartAll()
+ threads.JoinAll()
return test_runners
-def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
- tries=3):
+def _TearDownRunners(runners):
+ """Calls TearDown() for each test runner in parallel.
+ Args:
+ runners: a list of TestRunner objects.
+ """
+ threads = reraiser_thread.ReraiserThreadGroup(
+ [reraiser_thread.ReraiserThread(runner.TearDown)
+ for runner in runners])
+ threads.StartAll()
+ threads.JoinAll()
+
+
+def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'):
"""Run all tests on attached devices, retrying tests that don't pass.
Args:
@@ -124,34 +207,18 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
devices: list of attached device serial numbers as strings.
tests: list of tests to run.
build_type: either 'Debug' or 'Release'.
- tries: number of tries before accepting failure.
Returns:
A test_result.TestResults object.
"""
- final_results = test_result.TestResults()
- results = test_result.TestResults()
forwarder.Forwarder.KillHost(build_type)
- try_count = 0
- while tests:
- devices = set(devices).intersection(android_commands.GetAttachedDevices())
- if not devices:
- # There are no visible devices attached, this is unrecoverable.
- msg = 'No devices attached and visible to run tests!'
- logging.critical(msg)
- raise Exception(msg)
- if try_count >= tries:
- # We've retried too many times, return the TestResults up to this point.
- results.ok = final_results.ok
- final_results = results
- break
- try_count += 1
- runners = _CreateRunners(runner_factory, devices)
+ runners = _CreateRunners(runner_factory, devices)
+ try:
+ return _RunAllTests(runners, tests)
+ finally:
try:
- results_list, tests = _RunAllTests(runners, tests)
- results = test_result.TestResults.FromTestResults(results_list)
- final_results.ok += results.ok
+ _TearDownRunners(runners)
except android_commands.errors.DeviceUnresponsiveError as e:
- logging.warning('****Failed to run test: [%s]', e)
- forwarder.Forwarder.KillHost(build_type)
- return final_results
+ logging.warning('****Device unresponsive during TearDown: [%s]', e)
+ finally:
+ forwarder.Forwarder.KillHost(build_type)
« no previous file with comments | « build/android/pylib/base/new_base_test_runner.py ('k') | build/android/pylib/base/shard_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698