| Index: build/android/pylib/base/shard.py | 
| diff --git a/build/android/pylib/base/shard.py b/build/android/pylib/base/shard.py | 
| index b4e1281ef979537043fb42c15609ba6a158162c8..fdca5249a06b06f920d7b9ccf7a4d9e9fe026c60 100644 | 
| --- a/build/android/pylib/base/shard.py | 
| +++ b/build/android/pylib/base/shard.py | 
| @@ -5,61 +5,154 @@ | 
| """Implements test sharding logic.""" | 
|  | 
| import logging | 
| -import sys | 
| import threading | 
|  | 
| from pylib import android_commands | 
| from pylib import forwarder | 
| +from pylib.utils import reraiser_thread | 
|  | 
| import test_result | 
|  | 
|  | 
| -class _Worker(threading.Thread): | 
| -  """Runs tests from the test_queue using the given runner in a separate thread. | 
| +class _Test(object): | 
| +  """Holds a test with additional metadata.""" | 
| +  def __init__(self, test, tries=0): | 
| +    """Initializes the _Test object. | 
|  | 
| -  Places results in the out_results. | 
| +    Args: | 
| +      test: the test. | 
| +      tries: number of tries so far. | 
| +    """ | 
| +    self.test = test | 
| +    self.tries = tries | 
| + | 
| + | 
| +class _TestCollection(object): | 
| +  """A threadsafe collection of tests. | 
| + | 
| +  Args: | 
| +    tests: list of tests to put in the collection. | 
| """ | 
| -  def __init__(self, runner, test_queue, out_results, out_retry): | 
| -    """Initializes the worker. | 
| +  def __init__(self, tests=[]): | 
| +    self._lock = threading.Lock() | 
| +    self._tests = [] | 
| +    self._tests_in_progress = 0 | 
| +    # Used to signal that an item is avaliable or all items have been handled. | 
| +    self._item_avaliable_or_all_done = threading.Event() | 
| +    for t in tests: | 
| +      self.add(t) | 
| + | 
| +  def _pop(self): | 
| +    """Pop a test from the collection. | 
| + | 
| +    Waits until a test is avaliable or all tests have been handled. | 
| + | 
| +    Returns: | 
| +      A test or None if all tests have been handled. | 
| +    """ | 
| +    while True: | 
| +      # Wait for a test to be avaliable or all tests to have been handled. | 
| +      self._item_avaliable_or_all_done.wait() | 
| +      with self._lock: | 
| +        # Check which of the two conditions triggered the signal. | 
| +        if self._tests_in_progress == 0: | 
| +          return None | 
| +        try: | 
| +          return self._tests.pop() | 
| +        except IndexError: | 
| +          # Another thread beat us to the avaliable test, wait again. | 
| +          self._item_avaliable_or_all_done.clear() | 
| + | 
| +  def add(self, test): | 
| +    """Add an test to the collection. | 
|  | 
| Args: | 
| -      runner: A TestRunner object used to run the tests. | 
| -      test_queue: A list from which to get tests to run. | 
| -      out_results: A list to add TestResults to. | 
| -      out_retry: A list to add tests to retry. | 
| -    """ | 
| -    super(_Worker, self).__init__() | 
| -    self.daemon = True | 
| -    self._exc_info = None | 
| -    self._runner = runner | 
| -    self._test_queue = test_queue | 
| -    self._out_results = out_results | 
| -    self._out_retry = out_retry | 
| - | 
| -  #override | 
| -  def run(self): | 
| -    """Run tests from the queue in a seperate thread until it is empty. | 
| - | 
| -    Adds TestResults objects to the out_results list and may add tests to the | 
| -    out_retry list. | 
| +      item: A test to add. | 
| """ | 
| +    with self._lock: | 
| +      self._tests.append(test) | 
| +      self._item_avaliable_or_all_done.set() | 
| +      self._tests_in_progress += 1 | 
| + | 
| +  def test_completed(self): | 
| +    """Indicate that a test has been fully handled.""" | 
| +    with self._lock: | 
| +      self._tests_in_progress -= 1 | 
| +      if self._tests_in_progress == 0: | 
| +        # All tests have been handled, signal all waiting threads. | 
| +        self._item_avaliable_or_all_done.set() | 
| + | 
| +  def __iter__(self): | 
| +    """Iterate through tests in the collection until all have been handled.""" | 
| +    while True: | 
| +      r = self._pop() | 
| +      if r is None: | 
| +        break | 
| +      yield r | 
| + | 
| + | 
| +def _RunTestsFromQueue(runner, test_collection, out_results): | 
| +  """Runs tests from the test_collection until empty using the given runner. | 
| + | 
| +  Adds TestResults objects to the out_results list and may add tests to the | 
| +  out_retry list. | 
| + | 
| +  Args: | 
| +    runner: A TestRunner object used to run the tests. | 
| +    test_collection: A _TestCollection from which to get _Test objects to run. | 
| +    out_results: A list to add TestResults to. | 
| +  """ | 
| +  for test in test_collection: | 
| try: | 
| -      while True: | 
| -        test = self._test_queue.pop() | 
| -        result, retry = self._runner.Run(test) | 
| -        self._out_results.append(result) | 
| -        if retry: | 
| -          self._out_retry.append(retry) | 
| -    except IndexError: | 
| -      pass | 
| +      if not android_commands.IsDeviceAttached(runner.device): | 
| +        # Device is unresponsive, stop handling tests on this device. | 
| +        msg = 'Device %s is unresponsive.' % runner.device | 
| +        logging.warning(msg) | 
| +        raise android_commands.errors.DeviceUnresponsiveError(msg) | 
| +      result, retry = runner.RunTest(test.test) | 
| +      test.tries += 1 | 
| +      if retry and test.tries <= 3: | 
| +        # Retry non-passing results, only record passing results. | 
| +        out_results.append(test_result.TestResults.FromRun(ok=result.ok)) | 
| +        logging.warning('****Retrying test, try #%s.' % test.tries) | 
| +        test_collection.add(_Test(test=retry, tries=test.tries)) | 
| +      else: | 
| +        # All tests passed or retry limit reached. Either way, record results. | 
| +        out_results.append(result) | 
| +    except android_commands.errors.DeviceUnresponsiveError: | 
| +      # Device is unresponsive, stop handling tests on this device and ensure | 
| +      # current test gets runs by another device. Don't reraise this exception | 
| +      # on the main thread. | 
| +      test_collection.add(test) | 
| +      return | 
| except: | 
| -      self._exc_info = sys.exc_info() | 
| +      # An unhandleable exception, ensure tests get run by another device and | 
| +      # reraise this exception on the main thread. | 
| +      test_collection.add(test) | 
| raise | 
| +    finally: | 
| +      # Retries count as separate tasks so always mark the popped test as done. | 
| +      test_collection.test_completed() | 
| + | 
| + | 
| +def _SetUp(runner_factory, device, out_runners): | 
| +  """Creates a test runner for each device and calls SetUp() in parallel. | 
|  | 
| -  def ReraiseIfException(self): | 
| -    """Reraise exception if an exception was raised in the thread.""" | 
| -    if self._exc_info: | 
| -      raise self._exc_info[0], self._exc_info[1], self._exc_info[2] | 
| +  Note: if a device is unresponsive the corresponding TestRunner will not be | 
| +    added to out_runners. | 
| + | 
| +  Args: | 
| +    runner_factory: callable that takes a device and returns a TestRunner. | 
| +    device: the device serial number to set up. | 
| +    out_runners: list to add the successfully set up TestRunner object. | 
| +  """ | 
| +  try: | 
| +    logging.warning('*****Creating shard for %s.', device) | 
| +    runner = runner_factory(device) | 
| +    runner.SetUp() | 
| +    out_runners.append(runner) | 
| +  except android_commands.errors.DeviceUnresponsiveError as e: | 
| +    logging.warning('****Failed to create shard for %s: [%s]', (device, e)) | 
|  | 
|  | 
| def _RunAllTests(runners, tests): | 
| @@ -70,28 +163,21 @@ def _RunAllTests(runners, tests): | 
| tests: a list of Tests to run using the given TestRunners. | 
|  | 
| Returns: | 
| -    Tuple: (list of TestResults, list of tests to retry) | 
| +    A TestResults object. | 
| """ | 
| -  tests_queue = list(tests) | 
| -  workers = [] | 
| +  logging.warning('****Running %s tests with %s test runners.' % | 
| +                  (len(tests), len(runners))) | 
| +  tests_collection = _TestCollection([_Test(t) for t in tests]) | 
| results = [] | 
| -  retry = [] | 
| -  for r in runners: | 
| -    worker = _Worker(r, tests_queue, results, retry) | 
| -    worker.start() | 
| -    workers.append(worker) | 
| -  while workers: | 
| -    for w in workers[:]: | 
| -      # Allow the main thread to periodically check for keyboard interrupts. | 
| -      w.join(0.1) | 
| -      if not w.isAlive(): | 
| -        w.ReraiseIfException() | 
| -        workers.remove(w) | 
| -  return (results, retry) | 
| +  workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread( | 
| +      _RunTestsFromQueue, [r, tests_collection, results]) for r in runners]) | 
| +  workers.StartAll() | 
| +  workers.JoinAll() | 
| +  return test_result.TestResults.FromTestResults(results) | 
|  | 
|  | 
| def _CreateRunners(runner_factory, devices): | 
| -  """Creates a test runner for each device. | 
| +  """Creates a test runner for each device and calls SetUp() in parallel. | 
|  | 
| Note: if a device is unresponsive the corresponding TestRunner will not be | 
| included in the returned list. | 
| @@ -103,20 +189,29 @@ def _CreateRunners(runner_factory, devices): | 
| Returns: | 
| A list of TestRunner objects. | 
| """ | 
| +  logging.warning('****Creating %s test runners.' % len(devices)) | 
| test_runners = [] | 
| -  for index, device in enumerate(devices): | 
| -    logging.warning('*' * 80) | 
| -    logging.warning('Creating shard %d for %s', index, device) | 
| -    logging.warning('*' * 80) | 
| -    try: | 
| -      test_runners.append(runner_factory(device)) | 
| -    except android_commands.errors.DeviceUnresponsiveError as e: | 
| -      logging.warning('****Failed to create a shard: [%s]', e) | 
| +  threads = reraiser_thread.ReraiserThreadGroup( | 
| +      [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners]) | 
| +       for d in devices]) | 
| +  threads.StartAll() | 
| +  threads.JoinAll() | 
| return test_runners | 
|  | 
|  | 
| -def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', | 
| -                     tries=3): | 
| +def _TearDownRunners(runners): | 
| +  """Calls TearDown() for each test runner in parallel. | 
| +  Args: | 
| +    runners: a list of TestRunner objects. | 
| +  """ | 
| +  threads = reraiser_thread.ReraiserThreadGroup( | 
| +      [reraiser_thread.ReraiserThread(runner.TearDown) | 
| +       for runner in runners]) | 
| +  threads.StartAll() | 
| +  threads.JoinAll() | 
| + | 
| + | 
| +def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'): | 
| """Run all tests on attached devices, retrying tests that don't pass. | 
|  | 
| Args: | 
| @@ -124,34 +219,18 @@ def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', | 
| devices: list of attached device serial numbers as strings. | 
| tests: list of tests to run. | 
| build_type: either 'Debug' or 'Release'. | 
| -    tries: number of tries before accepting failure. | 
|  | 
| Returns: | 
| A test_result.TestResults object. | 
| """ | 
| -  final_results = test_result.TestResults() | 
| -  results = test_result.TestResults() | 
| forwarder.Forwarder.KillHost(build_type) | 
| -  try_count = 0 | 
| -  while tests: | 
| -    devices = set(devices).intersection(android_commands.GetAttachedDevices()) | 
| -    if not devices: | 
| -      # There are no visible devices attached, this is unrecoverable. | 
| -      msg = 'No devices attached and visible to run tests!' | 
| -      logging.critical(msg) | 
| -      raise Exception(msg) | 
| -    if try_count >= tries: | 
| -      # We've retried too many times, return the TestResults up to this point. | 
| -      results.ok = final_results.ok | 
| -      final_results = results | 
| -      break | 
| -    try_count += 1 | 
| -    runners = _CreateRunners(runner_factory, devices) | 
| +  runners = _CreateRunners(runner_factory, devices) | 
| +  try: | 
| +    return _RunAllTests(runners, tests) | 
| +  finally: | 
| try: | 
| -      results_list, tests = _RunAllTests(runners, tests) | 
| -      results = test_result.TestResults.FromTestResults(results_list) | 
| -      final_results.ok += results.ok | 
| +      _TearDownRunners(runners) | 
| except android_commands.errors.DeviceUnresponsiveError as e: | 
| -      logging.warning('****Failed to run test: [%s]', e) | 
| -  forwarder.Forwarder.KillHost(build_type) | 
| -  return final_results | 
| +      logging.warning('****Device unresponsive during TearDown: [%s]', e) | 
| +    finally: | 
| +      forwarder.Forwarder.KillHost(build_type) | 
|  |