OLD | NEW |
1 # Copyright (c) 2013 The Chromium Authors. All rights reserved. | 1 # Copyright (c) 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 """Implements test sharding logic.""" | 5 """Implements test sharding logic.""" |
6 | 6 |
7 import logging | 7 import logging |
8 import sys | |
9 import threading | 8 import threading |
10 | 9 |
11 from pylib import android_commands | 10 from pylib import android_commands |
12 from pylib import forwarder | 11 from pylib import forwarder |
| 12 from pylib.utils import reraiser_thread |
13 | 13 |
14 import test_result | 14 import test_result |
15 | 15 |
16 | 16 |
17 class _Worker(threading.Thread): | 17 class _Test(object): |
18 """Runs tests from the test_queue using the given runner in a separate thread. | 18 """Holds a test with additional metadata.""" |
19 | 19 def __init__(self, test, tries=0): |
20 Places results in the out_results. | 20 """Initializes the _Test object. |
21 """ | |
22 def __init__(self, runner, test_queue, out_results, out_retry): | |
23 """Initializes the worker. | |
24 | 21 |
25 Args: | 22 Args: |
26 runner: A TestRunner object used to run the tests. | 23 test: the test. |
27 test_queue: A list from which to get tests to run. | 24 tries: number of tries so far. |
28 out_results: A list to add TestResults to. | |
29 out_retry: A list to add tests to retry. | |
30 """ | 25 """ |
31 super(_Worker, self).__init__() | 26 self.test = test |
32 self.daemon = True | 27 self.tries = tries |
33 self._exc_info = None | |
34 self._runner = runner | |
35 self._test_queue = test_queue | |
36 self._out_results = out_results | |
37 self._out_retry = out_retry | |
38 | 28 |
39 #override | |
40 def run(self): | |
41 """Run tests from the queue in a seperate thread until it is empty. | |
42 | 29 |
43 Adds TestResults objects to the out_results list and may add tests to the | 30 class _TestCollection(object): |
44 out_retry list. | 31 """A threadsafe collection of tests. |
| 32 |
| 33 Args: |
| 34 tests: list of tests to put in the collection. |
| 35 """ |
| 36 def __init__(self, tests=[]): |
| 37 self._lock = threading.Lock() |
| 38 self._tests = [] |
| 39 self._tests_in_progress = 0 |
| 40 # Used to signal that an item is avaliable or all items have been handled. |
| 41 self._item_avaliable_or_all_done = threading.Event() |
| 42 for t in tests: |
| 43 self.add(t) |
| 44 |
| 45 def _pop(self): |
| 46 """Pop a test from the collection. |
| 47 |
| 48 Waits until a test is avaliable or all tests have been handled. |
| 49 |
| 50 Returns: |
| 51 A test or None if all tests have been handled. |
45 """ | 52 """ |
| 53 while True: |
| 54 # Wait for a test to be avaliable or all tests to have been handled. |
| 55 self._item_avaliable_or_all_done.wait() |
| 56 with self._lock: |
| 57 # Check which of the two conditions triggered the signal. |
| 58 if self._tests_in_progress == 0: |
| 59 return None |
| 60 try: |
| 61 return self._tests.pop() |
| 62 except IndexError: |
| 63 # Another thread beat us to the avaliable test, wait again. |
| 64 self._item_avaliable_or_all_done.clear() |
| 65 |
| 66 def add(self, test): |
| 67 """Add an test to the collection. |
| 68 |
| 69 Args: |
| 70 item: A test to add. |
| 71 """ |
| 72 with self._lock: |
| 73 self._tests.append(test) |
| 74 self._item_avaliable_or_all_done.set() |
| 75 self._tests_in_progress += 1 |
| 76 |
| 77 def test_completed(self): |
| 78 """Indicate that a test has been fully handled.""" |
| 79 with self._lock: |
| 80 self._tests_in_progress -= 1 |
| 81 if self._tests_in_progress == 0: |
| 82 # All tests have been handled, signal all waiting threads. |
| 83 self._item_avaliable_or_all_done.set() |
| 84 |
| 85 def __iter__(self): |
| 86 """Iterate through tests in the collection until all have been handled.""" |
| 87 while True: |
| 88 r = self._pop() |
| 89 if r is None: |
| 90 break |
| 91 yield r |
| 92 |
| 93 |
| 94 def _RunTestsFromQueue(runner, test_collection, out_results): |
| 95 """Runs tests from the test_collection until empty using the given runner. |
| 96 |
| 97 Adds TestResults objects to the out_results list and may add tests to the |
| 98 out_retry list. |
| 99 |
| 100 Args: |
| 101 runner: A TestRunner object used to run the tests. |
| 102 test_collection: A _TestCollection from which to get _Test objects to run. |
| 103 out_results: A list to add TestResults to. |
| 104 """ |
| 105 for test in test_collection: |
46 try: | 106 try: |
47 while True: | 107 if not android_commands.IsDeviceAttached(runner.device): |
48 test = self._test_queue.pop() | 108 # Device is unresponsive, stop handling tests on this device. |
49 result, retry = self._runner.Run(test) | 109 msg = 'Device %s is unresponsive.' % runner.device |
50 self._out_results.append(result) | 110 logging.warning(msg) |
51 if retry: | 111 raise android_commands.errors.DeviceUnresponsiveError(msg) |
52 self._out_retry.append(retry) | 112 result, retry = runner.RunTest(test.test) |
53 except IndexError: | 113 test.tries += 1 |
54 pass | 114 if retry and test.tries <= 3: |
| 115 # Retry non-passing results, only record passing results. |
| 116 out_results.append(test_result.TestResults.FromRun(ok=result.ok)) |
| 117 logging.warning('****Retrying test, try #%s.' % test.tries) |
| 118 test_collection.add(_Test(test=retry, tries=test.tries)) |
| 119 else: |
| 120 # All tests passed or retry limit reached. Either way, record results. |
| 121 out_results.append(result) |
| 122 except android_commands.errors.DeviceUnresponsiveError: |
| 123 # Device is unresponsive, stop handling tests on this device and ensure |
| 124 # current test gets runs by another device. Don't reraise this exception |
| 125 # on the main thread. |
| 126 test_collection.add(test) |
| 127 return |
55 except: | 128 except: |
56 self._exc_info = sys.exc_info() | 129 # An unhandleable exception, ensure tests get run by another device and |
| 130 # reraise this exception on the main thread. |
| 131 test_collection.add(test) |
57 raise | 132 raise |
| 133 finally: |
| 134 # Retries count as separate tasks so always mark the popped test as done. |
| 135 test_collection.test_completed() |
58 | 136 |
59 def ReraiseIfException(self): | 137 |
60 """Reraise exception if an exception was raised in the thread.""" | 138 def _SetUp(runner_factory, device, out_runners): |
61 if self._exc_info: | 139 """Creates a test runner for each device and calls SetUp() in parallel. |
62 raise self._exc_info[0], self._exc_info[1], self._exc_info[2] | 140 |
| 141 Note: if a device is unresponsive the corresponding TestRunner will not be |
| 142 added to out_runners. |
| 143 |
| 144 Args: |
| 145 runner_factory: callable that takes a device and returns a TestRunner. |
| 146 device: the device serial number to set up. |
| 147 out_runners: list to add the successfully set up TestRunner object. |
| 148 """ |
| 149 try: |
| 150 logging.warning('*****Creating shard for %s.', device) |
| 151 runner = runner_factory(device) |
| 152 runner.SetUp() |
| 153 out_runners.append(runner) |
| 154 except android_commands.errors.DeviceUnresponsiveError as e: |
| 155 logging.warning('****Failed to create shard for %s: [%s]', (device, e)) |
63 | 156 |
64 | 157 |
65 def _RunAllTests(runners, tests): | 158 def _RunAllTests(runners, tests): |
66 """Run all tests using the given TestRunners. | 159 """Run all tests using the given TestRunners. |
67 | 160 |
68 Args: | 161 Args: |
69 runners: a list of TestRunner objects. | 162 runners: a list of TestRunner objects. |
70 tests: a list of Tests to run using the given TestRunners. | 163 tests: a list of Tests to run using the given TestRunners. |
71 | 164 |
72 Returns: | 165 Returns: |
73 Tuple: (list of TestResults, list of tests to retry) | 166 A TestResults object. |
74 """ | 167 """ |
75 tests_queue = list(tests) | 168 logging.warning('****Running %s tests with %s test runners.' % |
76 workers = [] | 169 (len(tests), len(runners))) |
| 170 tests_collection = _TestCollection([_Test(t) for t in tests]) |
77 results = [] | 171 results = [] |
78 retry = [] | 172 workers = reraiser_thread.ReraiserThreadGroup([reraiser_thread.ReraiserThread( |
79 for r in runners: | 173 _RunTestsFromQueue, [r, tests_collection, results]) for r in runners]) |
80 worker = _Worker(r, tests_queue, results, retry) | 174 workers.StartAll() |
81 worker.start() | 175 workers.JoinAll() |
82 workers.append(worker) | 176 return test_result.TestResults.FromTestResults(results) |
83 while workers: | |
84 for w in workers[:]: | |
85 # Allow the main thread to periodically check for keyboard interrupts. | |
86 w.join(0.1) | |
87 if not w.isAlive(): | |
88 w.ReraiseIfException() | |
89 workers.remove(w) | |
90 return (results, retry) | |
91 | 177 |
92 | 178 |
93 def _CreateRunners(runner_factory, devices): | 179 def _CreateRunners(runner_factory, devices): |
94 """Creates a test runner for each device. | 180 """Creates a test runner for each device and calls SetUp() in parallel. |
95 | 181 |
96 Note: if a device is unresponsive the corresponding TestRunner will not be | 182 Note: if a device is unresponsive the corresponding TestRunner will not be |
97 included in the returned list. | 183 included in the returned list. |
98 | 184 |
99 Args: | 185 Args: |
100 runner_factory: callable that takes a device and returns a TestRunner. | 186 runner_factory: callable that takes a device and returns a TestRunner. |
101 devices: list of device serial numbers as strings. | 187 devices: list of device serial numbers as strings. |
102 | 188 |
103 Returns: | 189 Returns: |
104 A list of TestRunner objects. | 190 A list of TestRunner objects. |
105 """ | 191 """ |
| 192 logging.warning('****Creating %s test runners.' % len(devices)) |
106 test_runners = [] | 193 test_runners = [] |
107 for index, device in enumerate(devices): | 194 threads = reraiser_thread.ReraiserThreadGroup( |
108 logging.warning('*' * 80) | 195 [reraiser_thread.ReraiserThread(_SetUp, [runner_factory, d, test_runners]) |
109 logging.warning('Creating shard %d for %s', index, device) | 196 for d in devices]) |
110 logging.warning('*' * 80) | 197 threads.StartAll() |
111 try: | 198 threads.JoinAll() |
112 test_runners.append(runner_factory(device)) | |
113 except android_commands.errors.DeviceUnresponsiveError as e: | |
114 logging.warning('****Failed to create a shard: [%s]', e) | |
115 return test_runners | 199 return test_runners |
116 | 200 |
117 | 201 |
118 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', | 202 def _TearDownRunners(runners): |
119 tries=3): | 203 """Calls TearDown() for each test runner in parallel. |
| 204 Args: |
| 205 runners: a list of TestRunner objects. |
| 206 """ |
| 207 threads = reraiser_thread.ReraiserThreadGroup( |
| 208 [reraiser_thread.ReraiserThread(runner.TearDown) |
| 209 for runner in runners]) |
| 210 threads.StartAll() |
| 211 threads.JoinAll() |
| 212 |
| 213 |
| 214 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug'): |
120 """Run all tests on attached devices, retrying tests that don't pass. | 215 """Run all tests on attached devices, retrying tests that don't pass. |
121 | 216 |
122 Args: | 217 Args: |
123 runner_factory: callable that takes a device and returns a TestRunner. | 218 runner_factory: callable that takes a device and returns a TestRunner. |
124 devices: list of attached device serial numbers as strings. | 219 devices: list of attached device serial numbers as strings. |
125 tests: list of tests to run. | 220 tests: list of tests to run. |
126 build_type: either 'Debug' or 'Release'. | 221 build_type: either 'Debug' or 'Release'. |
127 tries: number of tries before accepting failure. | |
128 | 222 |
129 Returns: | 223 Returns: |
130 A test_result.TestResults object. | 224 A test_result.TestResults object. |
131 """ | 225 """ |
132 final_results = test_result.TestResults() | |
133 results = test_result.TestResults() | |
134 forwarder.Forwarder.KillHost(build_type) | 226 forwarder.Forwarder.KillHost(build_type) |
135 try_count = 0 | 227 runners = _CreateRunners(runner_factory, devices) |
136 while tests: | 228 try: |
137 devices = set(devices).intersection(android_commands.GetAttachedDevices()) | 229 return _RunAllTests(runners, tests) |
138 if not devices: | 230 finally: |
139 # There are no visible devices attached, this is unrecoverable. | |
140 msg = 'No devices attached and visible to run tests!' | |
141 logging.critical(msg) | |
142 raise Exception(msg) | |
143 if try_count >= tries: | |
144 # We've retried too many times, return the TestResults up to this point. | |
145 results.ok = final_results.ok | |
146 final_results = results | |
147 break | |
148 try_count += 1 | |
149 runners = _CreateRunners(runner_factory, devices) | |
150 try: | 231 try: |
151 results_list, tests = _RunAllTests(runners, tests) | 232 _TearDownRunners(runners) |
152 results = test_result.TestResults.FromTestResults(results_list) | |
153 final_results.ok += results.ok | |
154 except android_commands.errors.DeviceUnresponsiveError as e: | 233 except android_commands.errors.DeviceUnresponsiveError as e: |
155 logging.warning('****Failed to run test: [%s]', e) | 234 logging.warning('****Device unresponsive during TearDown: [%s]', e) |
156 forwarder.Forwarder.KillHost(build_type) | 235 finally: |
157 return final_results | 236 forwarder.Forwarder.KillHost(build_type) |
OLD | NEW |