Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(334)

Side by Side Diff: build/android/pylib/base/test_dispatcher.py

Issue 2605793002: [android] Convert linker tests to platform mode. (Closed)
Patch Set: mikecase comment Created 3 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Dispatches tests, either sharding or replicating them.
6
7 Performs the following steps:
8 * Create a test collection factory, using the given tests
9 - If sharding: test collection factory returns the same shared test collection
10 to all test runners
11 - If replciating: test collection factory returns a unique test collection to
12 each test runner, with the same set of tests in each.
13 * Create a test runner for each device.
14 * Run each test runner in its own thread, grabbing tests from the test
15 collection until there are no tests left.
16 """
17
18 # TODO(jbudorick) Deprecate and remove this class after any relevant parts have
19 # been ported to the new environment / test instance model.
20
21 import logging
22 import threading
23
24 from devil.android import device_errors
25 from devil.utils import reraiser_thread
26 from devil.utils import watchdog_timer
27 from pylib import constants
28 from pylib.base import base_test_result
29 from pylib.base import test_collection
30
31
32 DEFAULT_TIMEOUT = 7 * 60 # seven minutes
33
34
35 class _ThreadSafeCounter(object):
36 """A threadsafe counter."""
37
38 def __init__(self):
39 self._lock = threading.Lock()
40 self._value = 0
41
42 def GetAndIncrement(self):
43 """Get the current value and increment it atomically.
44
45 Returns:
46 The value before incrementing.
47 """
48 with self._lock:
49 pre_increment = self._value
50 self._value += 1
51 return pre_increment
52
53
54 class _Test(object):
55 """Holds a test with additional metadata."""
56
57 def __init__(self, test, tries=0):
58 """Initializes the _Test object.
59
60 Args:
61 test: The test.
62 tries: Number of tries so far.
63 """
64 self.test = test
65 self.tries = tries
66
67
68 def _RunTestsFromQueue(runner, collection, out_results, watcher,
69 num_retries, tag_results_with_device=False):
70 """Runs tests from the collection until empty using the given runner.
71
72 Adds TestRunResults objects to the out_results list and may add tests to the
73 out_retry list.
74
75 Args:
76 runner: A TestRunner object used to run the tests.
77 collection: A TestCollection from which to get _Test objects to run.
78 out_results: A list to add TestRunResults to.
79 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
80 num_retries: Number of retries for a test.
81 tag_results_with_device: If True, appends the name of the device on which
82 the test was run to the test name. Used when replicating to identify
83 which device ran each copy of the test, and to ensure each copy of the
84 test is recorded separately.
85 """
86
87 def TagTestRunResults(test_run_results):
88 """Tags all results with the last 4 digits of the device id.
89
90 Used when replicating tests to distinguish the same tests run on different
91 devices. We use a set to store test results, so the hash (generated from
92 name and tag) must be unique to be considered different results.
93 """
94 new_test_run_results = base_test_result.TestRunResults()
95 for test_result in test_run_results.GetAll():
96 test_result.SetName('%s_%s' % (runner.device_serial[-4:],
97 test_result.GetName()))
98 new_test_run_results.AddResult(test_result)
99 return new_test_run_results
100
101 for test in collection:
102 watcher.Reset()
103 try:
104 if not runner.device.IsOnline():
105 # Device is unresponsive, stop handling tests on this device.
106 msg = 'Device %s is unresponsive.' % runner.device_serial
107 logging.warning(msg)
108 raise device_errors.DeviceUnreachableError(msg)
109 result, retry = runner.RunTest(test.test)
110 if tag_results_with_device:
111 result = TagTestRunResults(result)
112 test.tries += 1
113 if retry and test.tries <= num_retries:
114 # Retry non-passing results, only record passing results.
115 pass_results = base_test_result.TestRunResults()
116 pass_results.AddResults(result.GetPass())
117 out_results.append(pass_results)
118 logging.warning('Will retry test %s, try #%s.', retry, test.tries)
119 collection.add(_Test(test=retry, tries=test.tries))
120 else:
121 # All tests passed or retry limit reached. Either way, record results.
122 out_results.append(result)
123 except:
124 # An unhandleable exception, ensure tests get run by another device and
125 # reraise this exception on the main thread.
126 collection.add(test)
127 raise
128 finally:
129 # Retries count as separate tasks so always mark the popped test as done.
130 collection.test_completed()
131
132
133 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
134 """Creates a test runner for each device and calls SetUp() in parallel.
135
136 Note: if a device is unresponsive the corresponding TestRunner will not be
137 added to out_runners.
138
139 Args:
140 runner_factory: Callable that takes a device and index and returns a
141 TestRunner object.
142 device: The device serial number to set up.
143 out_runners: List to add the successfully set up TestRunner object.
144 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
145 """
146 try:
147 index = threadsafe_counter.GetAndIncrement()
148 logging.warning('Creating shard %s for device %s.', index, device)
149 runner = runner_factory(device, index)
150 if runner:
151 runner.SetUp()
152 out_runners.append(runner)
153 else:
154 logging.info('Device %s is not active. Will not create shard %s.',
155 str(device), index)
156 except (device_errors.CommandFailedError,
157 device_errors.CommandTimeoutError,
158 device_errors.DeviceUnreachableError):
159 logging.exception('Failed to create shard for %s', str(device))
160
161
162 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
163 tag_results_with_device=False):
164 """Run all tests using the given TestRunners.
165
166 Args:
167 runners: A list of TestRunner objects.
168 test_collection_factory: A callable to generate a TestCollection object for
169 each test runner.
170 num_retries: Number of retries for a test.
171 timeout: Watchdog timeout in seconds.
172 tag_results_with_device: If True, appends the name of the device on which
173 the test was run to the test name. Used when replicating to identify
174 which device ran each copy of the test, and to ensure each copy of the
175 test is recorded separately.
176
177 Returns:
178 A tuple of (TestRunResults object, exit code)
179 """
180 logging.warning('Running tests with %s test %s.',
181 len(runners), 'runners' if len(runners) != 1 else 'runner')
182 results = []
183 exit_code = 0
184 run_results = base_test_result.TestRunResults()
185 watcher = watchdog_timer.WatchdogTimer(timeout)
186 test_collections = [test_collection_factory() for _ in runners]
187
188 threads = [
189 reraiser_thread.ReraiserThread(
190 _RunTestsFromQueue,
191 [r, tc, results, watcher, num_retries, tag_results_with_device],
192 name=r.device_serial[-4:])
193 for r, tc in zip(runners, test_collections)]
194
195 workers = reraiser_thread.ReraiserThreadGroup(threads)
196 workers.StartAll()
197
198 try:
199 workers.JoinAll(watcher)
200 except device_errors.CommandFailedError:
201 logging.exception('Command failed on device.')
202 except device_errors.CommandTimeoutError:
203 logging.exception('Command timed out on device.')
204 except device_errors.DeviceUnreachableError:
205 logging.exception('Device became unreachable.')
206
207 if not all((len(tc) == 0 for tc in test_collections)):
208 logging.error('Only ran %d tests (all devices are likely offline).',
209 len(results))
210 for tc in test_collections:
211 run_results.AddResults(base_test_result.BaseTestResult(
212 t, base_test_result.ResultType.UNKNOWN) for t in tc.test_names())
213
214 for r in results:
215 run_results.AddTestRunResults(r)
216 if not run_results.DidRunPass():
217 exit_code = constants.ERROR_EXIT_CODE
218 return (run_results, exit_code)
219
220
221 def _CreateRunners(runner_factory, devices, timeout=None):
222 """Creates a test runner for each device and calls SetUp() in parallel.
223
224 Note: if a device is unresponsive the corresponding TestRunner will not be
225 included in the returned list.
226
227 Args:
228 runner_factory: Callable that takes a device and index and returns a
229 TestRunner object.
230 devices: List of device serial numbers as strings.
231 timeout: Watchdog timeout in seconds, defaults to the default timeout.
232
233 Returns:
234 A list of TestRunner objects.
235 """
236 logging.warning('Creating %s test %s.', len(devices),
237 'runners' if len(devices) != 1 else 'runner')
238 runners = []
239 counter = _ThreadSafeCounter()
240 threads = reraiser_thread.ReraiserThreadGroup(
241 [reraiser_thread.ReraiserThread(_SetUp,
242 [runner_factory, d, runners, counter],
243 name=str(d)[-4:])
244 for d in devices])
245 threads.StartAll()
246 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
247 return runners
248
249
250 def _TearDownRunners(runners, timeout=None):
251 """Calls TearDown() for each test runner in parallel.
252
253 Args:
254 runners: A list of TestRunner objects.
255 timeout: Watchdog timeout in seconds, defaults to the default timeout.
256 """
257 threads = reraiser_thread.ReraiserThreadGroup(
258 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:])
259 for r in runners])
260 threads.StartAll()
261 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
262
263
264 def ApplyMaxPerRun(tests, max_per_run):
265 """Rearrange the tests so that no group contains more than max_per_run tests.
266
267 Args:
268 tests:
269 max_per_run:
270
271 Returns:
272 A list of tests with no more than max_per_run per run.
273 """
274 tests_expanded = []
275 for test_group in tests:
276 if type(test_group) != str:
277 # Do not split test objects which are not strings.
278 tests_expanded.append(test_group)
279 else:
280 test_split = test_group.split(':')
281 for i in range(0, len(test_split), max_per_run):
282 tests_expanded.append(':'.join(test_split[i:i+max_per_run]))
283 return tests_expanded
284
285
286 def RunTests(tests, runner_factory, devices, shard=True,
287 test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
288 num_retries=2, max_per_run=256):
289 """Run all tests on attached devices, retrying tests that don't pass.
290
291 Args:
292 tests: List of tests to run.
293 runner_factory: Callable that takes a device and index and returns a
294 TestRunner object.
295 devices: List of attached devices.
296 shard: True if we should shard, False if we should replicate tests.
297 - Sharding tests will distribute tests across all test runners through a
298 shared test collection.
299 - Replicating tests will copy all tests to each test runner through a
300 unique test collection for each test runner.
301 test_timeout: Watchdog timeout in seconds for running tests.
302 setup_timeout: Watchdog timeout in seconds for creating and cleaning up
303 test runners.
304 num_retries: Number of retries for a test.
305 max_per_run: Maximum number of tests to run in any group.
306
307 Returns:
308 A tuple of (base_test_result.TestRunResults object, exit code).
309 """
310 if not tests:
311 logging.critical('No tests to run.')
312 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
313
314 tests_expanded = ApplyMaxPerRun(tests, max_per_run)
315 if shard:
316 # Generate a shared TestCollection object for all test runners, so they
317 # draw from a common pool of tests.
318 shared_test_collection = test_collection.TestCollection(
319 [_Test(t) for t in tests_expanded])
320 test_collection_factory = lambda: shared_test_collection
321 tag_results_with_device = False
322 log_string = 'sharded across devices'
323 else:
324 # Generate a unique TestCollection object for each test runner, but use
325 # the same set of tests.
326 test_collection_factory = lambda: test_collection.TestCollection(
327 [_Test(t) for t in tests_expanded])
328 tag_results_with_device = True
329 log_string = 'replicated on each device'
330
331 logging.info('Will run %d tests (%s): %s',
332 len(tests_expanded), log_string, str(tests_expanded))
333 runners = _CreateRunners(runner_factory, devices, setup_timeout)
334 try:
335 return _RunAllTests(runners, test_collection_factory,
336 num_retries, test_timeout, tag_results_with_device)
337 finally:
338 try:
339 _TearDownRunners(runners, setup_timeout)
340 except device_errors.DeviceUnreachableError as e:
341 logging.warning('Device unresponsive during TearDown: [%s]', e)
342 except Exception: # pylint: disable=broad-except
343 logging.exception('Unexpected exception caught during TearDown')
OLDNEW
« no previous file with comments | « build/android/pylib/base/base_test_runner.py ('k') | build/android/pylib/base/test_dispatcher_unittest.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698