Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(602)

Side by Side Diff: build/android/pylib/base/test_dispatcher.py

Issue 18770008: [Android] Redesigns the sharder to allow replicated vs distributed tests (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Adds support for cleanup_test_files Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright (c) 2013 The Chromium Authors. All rights reserved. 1 # Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be 2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file. 3 # found in the LICENSE file.
4 4
5 """Implements test sharding logic.""" 5 """Dispatches tests, either sharding or replicating them.
6
7 To dispatch, performs the following steps:
8 * Create a test collection factory, using the given tests
9 - If sharding: test collection factory returns the same shared test collection
10 to all test runners
11 - If replciating: test collection factory returns a unique test collection to
12 each test runner, with the same set of tests in each.
13 * Get the list of devices to run on
14 * Create test runners
15 * Run each test runner in its own thread, pulling tests from the test collection
16 generated from the test collection factory until there are no tests left.
frankf 2013/07/17 21:08:41 This should be a high level overview of what this
gkanwar 2013/07/17 21:16:50 This is a high-level overview of what the module d
17 """
6 18
7 import logging 19 import logging
8 import threading 20 import threading
9 21
10 from pylib import android_commands 22 from pylib import android_commands
11 from pylib import constants 23 from pylib import constants
12 from pylib import forwarder 24 from pylib import forwarder
13 from pylib.utils import reraiser_thread 25 from pylib.utils import reraiser_thread
14 from pylib.utils import watchdog_timer 26 from pylib.utils import watchdog_timer
15 27
(...skipping 22 matching lines...) Expand all
38 return pre_increment 50 return pre_increment
39 51
40 52
41 class _Test(object): 53 class _Test(object):
42 """Holds a test with additional metadata.""" 54 """Holds a test with additional metadata."""
43 55
44 def __init__(self, test, tries=0): 56 def __init__(self, test, tries=0):
45 """Initializes the _Test object. 57 """Initializes the _Test object.
46 58
47 Args: 59 Args:
48 test: the test. 60 test: The test.
49 tries: number of tries so far. 61 tries: Number of tries so far.
50 """ 62 """
51 self.test = test 63 self.test = test
52 self.tries = tries 64 self.tries = tries
53 65
54 66
55 class _TestCollection(object): 67 class _TestCollection(object):
56 """A threadsafe collection of tests. 68 """A threadsafe collection of tests.
57 69
58 Args: 70 Args:
59 tests: list of tests to put in the collection. 71 tests: List of tests to put in the collection.
60 """ 72 """
61 73
62 def __init__(self, tests=[]): 74 def __init__(self, tests=[]):
63 self._lock = threading.Lock() 75 self._lock = threading.Lock()
64 self._tests = [] 76 self._tests = []
65 self._tests_in_progress = 0 77 self._tests_in_progress = 0
66 # Used to signal that an item is avaliable or all items have been handled. 78 # Used to signal that an item is avaliable or all items have been handled.
67 self._item_avaliable_or_all_done = threading.Event() 79 self._item_avaliable_or_all_done = threading.Event()
68 for t in tests: 80 for t in tests:
69 self.add(t) 81 self.add(t)
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
111 def __iter__(self): 123 def __iter__(self):
112 """Iterate through tests in the collection until all have been handled.""" 124 """Iterate through tests in the collection until all have been handled."""
113 while True: 125 while True:
114 r = self._pop() 126 r = self._pop()
115 if r is None: 127 if r is None:
116 break 128 break
117 yield r 129 yield r
118 130
119 131
120 def _RunTestsFromQueue(runner, test_collection, out_results, watcher, 132 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
121 num_retries): 133 num_retries, tag_results_with_device=False):
122 """Runs tests from the test_collection until empty using the given runner. 134 """Runs tests from the test_collection until empty using the given runner.
123 135
124 Adds TestRunResults objects to the out_results list and may add tests to the 136 Adds TestRunResults objects to the out_results list and may add tests to the
125 out_retry list. 137 out_retry list.
126 138
127 Args: 139 Args:
128 runner: A TestRunner object used to run the tests. 140 runner: A TestRunner object used to run the tests.
129 test_collection: A _TestCollection from which to get _Test objects to run. 141 test_collection: A _TestCollection from which to get _Test objects to run.
130 out_results: A list to add TestRunResults to. 142 out_results: A list to add TestRunResults to.
131 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. 143 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
132 num_retries: Number of retries for a test. 144 num_retries: Number of retries for a test.
145 tag_results_with_device: If True, appends the name of the device on which
146 the test was run to the test name. Used when replicating to identify
147 which device ran each copy of the test, and to ensure each copy of the
148 test is recorded separately.
133 """ 149 """
150
151 def TagTestRunResults(test_run_results):
152 """Tags all results with the last 4 digits of the device id.
153
154 Used when replicating tests to distinguish the same tests run on different
155 devices. We use a set to store test results, so the hash (generated from
156 name and tag) must be unique to be considered different results.
157 """
158 new_test_run_results = base_test_result.TestRunResults()
159 for test_result in test_run_results.GetAll():
160 test_result.SetTag(runner.device[-4:])
161 new_test_run_results.AddResult(test_result)
162 return new_test_run_results
163
134 for test in test_collection: 164 for test in test_collection:
135 watcher.Reset() 165 watcher.Reset()
136 try: 166 try:
137 if not android_commands.IsDeviceAttached(runner.device): 167 if not android_commands.IsDeviceAttached(runner.device):
138 # Device is unresponsive, stop handling tests on this device. 168 # Device is unresponsive, stop handling tests on this device.
139 msg = 'Device %s is unresponsive.' % runner.device 169 msg = 'Device %s is unresponsive.' % runner.device
140 logging.warning(msg) 170 logging.warning(msg)
141 raise android_commands.errors.DeviceUnresponsiveError(msg) 171 raise android_commands.errors.DeviceUnresponsiveError(msg)
142 result, retry = runner.RunTest(test.test) 172 result, retry = runner.RunTest(test.test)
173 if tag_results_with_device:
174 result = TagTestRunResults(result)
143 test.tries += 1 175 test.tries += 1
144 if retry and test.tries <= num_retries: 176 if retry and test.tries <= num_retries:
145 # Retry non-passing results, only record passing results. 177 # Retry non-passing results, only record passing results.
146 pass_results = base_test_result.TestRunResults() 178 pass_results = base_test_result.TestRunResults()
147 pass_results.AddResults(result.GetPass()) 179 pass_results.AddResults(result.GetPass())
148 out_results.append(pass_results) 180 out_results.append(pass_results)
149 logging.warning('Will retry test, try #%s.' % test.tries) 181 logging.warning('Will retry test, try #%s.' % test.tries)
150 test_collection.add(_Test(test=retry, tries=test.tries)) 182 test_collection.add(_Test(test=retry, tries=test.tries))
151 else: 183 else:
152 # All tests passed or retry limit reached. Either way, record results. 184 # All tests passed or retry limit reached. Either way, record results.
153 out_results.append(result) 185 out_results.append(result)
154 except: 186 except:
155 # An unhandleable exception, ensure tests get run by another device and 187 # An unhandleable exception, ensure tests get run by another device and
156 # reraise this exception on the main thread. 188 # reraise this exception on the main thread.
157 test_collection.add(test) 189 test_collection.add(test)
158 raise 190 raise
159 finally: 191 finally:
160 # Retries count as separate tasks so always mark the popped test as done. 192 # Retries count as separate tasks so always mark the popped test as done.
161 test_collection.test_completed() 193 test_collection.test_completed()
162 194
163 195
164 def _SetUp(runner_factory, device, out_runners, threadsafe_counter): 196 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
165 """Creates a test runner for each device and calls SetUp() in parallel. 197 """Creates a test runner for each device and calls SetUp() in parallel.
166 198
167 Note: if a device is unresponsive the corresponding TestRunner will not be 199 Note: if a device is unresponsive the corresponding TestRunner will not be
168 added to out_runners. 200 added to out_runners.
169 201
170 Args: 202 Args:
171 runner_factory: callable that takes a device and index and returns a 203 runner_factory: Callable that takes a device and index and returns a
172 TestRunner object. 204 TestRunner object.
173 device: the device serial number to set up. 205 device: The device serial number to set up.
174 out_runners: list to add the successfully set up TestRunner object. 206 out_runners: List to add the successfully set up TestRunner object.
175 threadsafe_counter: a _ThreadSafeCounter object used to get shard indices. 207 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
176 """ 208 """
177 try: 209 try:
178 index = threadsafe_counter.GetAndIncrement() 210 index = threadsafe_counter.GetAndIncrement()
179 logging.warning('Creating shard %s for device %s.', index, device) 211 logging.warning('Creating shard %s for device %s.', index, device)
180 runner = runner_factory(device, index) 212 runner = runner_factory(device, index)
181 runner.SetUp() 213 runner.SetUp()
182 out_runners.append(runner) 214 out_runners.append(runner)
183 except android_commands.errors.DeviceUnresponsiveError as e: 215 except android_commands.errors.DeviceUnresponsiveError as e:
184 logging.warning('Failed to create shard for %s: [%s]', device, e) 216 logging.warning('Failed to create shard for %s: [%s]', device, e)
185 217
186 218
187 def _RunAllTests(runners, tests, num_retries, timeout=None): 219 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
220 tag_results_with_device=False):
188 """Run all tests using the given TestRunners. 221 """Run all tests using the given TestRunners.
189 222
190 Args: 223 Args:
191 runners: a list of TestRunner objects. 224 runners: A list of TestRunner objects.
192 tests: a list of Tests to run using the given TestRunners. 225 test_collection_factory: A callable to generate a _TestCollection object for
193 num_retries: number of retries for a test. 226 each test runner.
194 timeout: watchdog timeout in seconds, defaults to the default timeout. 227 num_retries: Number of retries for a test.
228 timeout: Watchdog timeout in seconds.
229 tag_results_with_device: If True, appends the name of the device on which
230 the test was run to the test name. Used when replicating to identify
231 which device ran each copy of the test, and to ensure each copy of the
232 test is recorded separately.
195 233
196 Returns: 234 Returns:
197 A tuple of (TestRunResults object, exit code) 235 A tuple of (TestRunResults object, exit code)
198 """ 236 """
199 logging.warning('Running %s tests with %s test runners.' % 237 logging.warning('Running tests with %s test runners.' % (len(runners)))
200 (len(tests), len(runners)))
201 tests_collection = _TestCollection([_Test(t) for t in tests])
202 results = [] 238 results = []
203 exit_code = 0 239 exit_code = 0
204 watcher = watchdog_timer.WatchdogTimer(timeout) 240 watcher = watchdog_timer.WatchdogTimer(timeout)
241
205 workers = reraiser_thread.ReraiserThreadGroup( 242 workers = reraiser_thread.ReraiserThreadGroup(
206 [reraiser_thread.ReraiserThread( 243 [reraiser_thread.ReraiserThread(
207 _RunTestsFromQueue, 244 _RunTestsFromQueue,
208 [r, tests_collection, results, watcher, num_retries], 245 [r, test_collection_factory(), results, watcher, num_retries,
246 tag_results_with_device],
209 name=r.device[-4:]) 247 name=r.device[-4:])
210 for r in runners]) 248 for r in runners])
211 run_results = base_test_result.TestRunResults() 249 run_results = base_test_result.TestRunResults()
212 workers.StartAll() 250 workers.StartAll()
213 251
214 # Catch DeviceUnresponsiveErrors and set a warning exit code 252 # Catch DeviceUnresponsiveErrors and set a warning exit code
215 try: 253 try:
216 workers.JoinAll(watcher) 254 workers.JoinAll(watcher)
217 except android_commands.errors.DeviceUnresponsiveError as e: 255 except android_commands.errors.DeviceUnresponsiveError as e:
218 logging.error(e) 256 logging.error(e)
219 exit_code = constants.WARNING_EXIT_CODE 257 exit_code = constants.WARNING_EXIT_CODE
220 258
221 for r in results: 259 for r in results:
222 run_results.AddTestRunResults(r) 260 run_results.AddTestRunResults(r)
223 if not run_results.DidRunPass(): 261 if not run_results.DidRunPass():
224 exit_code = constants.ERROR_EXIT_CODE 262 exit_code = constants.ERROR_EXIT_CODE
225 return (run_results, exit_code) 263 return (run_results, exit_code)
226 264
227 265
228 def _CreateRunners(runner_factory, devices, timeout=None): 266 def _CreateRunners(runner_factory, devices, timeout=None):
229 """Creates a test runner for each device and calls SetUp() in parallel. 267 """Creates a test runner for each device and calls SetUp() in parallel.
230 268
231 Note: if a device is unresponsive the corresponding TestRunner will not be 269 Note: if a device is unresponsive the corresponding TestRunner will not be
232 included in the returned list. 270 included in the returned list.
233 271
234 Args: 272 Args:
235 runner_factory: callable that takes a device and index and returns a 273 runner_factory: Callable that takes a device and index and returns a
236 TestRunner object. 274 TestRunner object.
237 devices: list of device serial numbers as strings. 275 devices: List of device serial numbers as strings.
238 timeout: watchdog timeout in seconds, defaults to the default timeout. 276 timeout: Watchdog timeout in seconds, defaults to the default timeout.
239 277
240 Returns: 278 Returns:
241 A list of TestRunner objects. 279 A list of TestRunner objects.
242 """ 280 """
243 logging.warning('Creating %s test runners.' % len(devices)) 281 logging.warning('Creating %s test runners.' % len(devices))
244 runners = [] 282 runners = []
245 counter = _ThreadSafeCounter() 283 counter = _ThreadSafeCounter()
246 threads = reraiser_thread.ReraiserThreadGroup( 284 threads = reraiser_thread.ReraiserThreadGroup(
247 [reraiser_thread.ReraiserThread(_SetUp, 285 [reraiser_thread.ReraiserThread(_SetUp,
248 [runner_factory, d, runners, counter], 286 [runner_factory, d, runners, counter],
249 name=d[-4:]) 287 name=d[-4:])
250 for d in devices]) 288 for d in devices])
251 threads.StartAll() 289 threads.StartAll()
252 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 290 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
253 return runners 291 return runners
254 292
255 293
256 def _TearDownRunners(runners, timeout=None): 294 def _TearDownRunners(runners, timeout=None):
257 """Calls TearDown() for each test runner in parallel. 295 """Calls TearDown() for each test runner in parallel.
258 296
259 Args: 297 Args:
260 runners: a list of TestRunner objects. 298 runners: A list of TestRunner objects.
261 timeout: watchdog timeout in seconds, defaults to the default timeout. 299 timeout: Watchdog timeout in seconds, defaults to the default timeout.
262 """ 300 """
263 threads = reraiser_thread.ReraiserThreadGroup( 301 threads = reraiser_thread.ReraiserThreadGroup(
264 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:]) 302 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
265 for r in runners]) 303 for r in runners])
266 threads.StartAll() 304 threads.StartAll()
267 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 305 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
268 306
269 307
270 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug', 308
271 test_timeout=DEFAULT_TIMEOUT, 309 def _GetAttachedDevices(wait_for_debugger=False, test_device=None):
272 setup_timeout=DEFAULT_TIMEOUT, 310 """Get all attached devices.
273 num_retries=2): 311
312 If we are using a debugger, limit to only one device.
313
314 Args:
315 wait_for_debugger: True if this run will use a debugger.
316 test_device: Name of a specific device to use.
317
318 Returns:
319 A list of attached devices.
320 """
321 attached_devices = []
322
323 attached_devices = android_commands.GetAttachedDevices()
324 if test_device:
325 assert (test_device in attached_devices,
326 'Did not find device %s among attached device. Attached devices: %s'
327 % (test_device, ', '.join(attached_devices)))
328 attached_devices = [test_device]
329
330 if len(attached_devices) > 1 and wait_for_debugger:
331 logging.warning('Debugger can not be sharded, using first available device')
332 attached_devices = attached_devices[:1]
333
334 return attached_devices
335
336
337 def RunTests(tests, runner_factory, wait_for_debugger, test_device, shard,
338 build_type='Debug',
339 test_timeout=DEFAULT_TIMEOUT,
340 setup_timeout=DEFAULT_TIMEOUT,
341 num_retries=2):
274 """Run all tests on attached devices, retrying tests that don't pass. 342 """Run all tests on attached devices, retrying tests that don't pass.
275 343
276 Args: 344 Args:
277 runner_factory: callable that takes a device and index and returns a 345 tests: List of tests to run.
278 TestRunner object. 346 runner_factory: Callable that takes a device and index and returns a
279 devices: list of attached device serial numbers as strings. 347 TestRunner object.
280 tests: list of tests to run. 348 wait_for_debugger: True if this test is using a debugger.
281 build_type: either 'Debug' or 'Release'. 349 test_device: A specific device to run tests on, or None.
282 test_timeout: watchdog timeout in seconds for running tests, defaults to the 350 shard: True if we should shard, False if we should replicate tests.
283 default timeout. 351 - Sharding tests will distribute tests across all test runners through a
284 setup_timeout: watchdog timeout in seconds for creating and cleaning up 352 shared test collection.
285 test runners, defaults to the default timeout. 353 - Replicating tests will copy all tests to each test runner through a
286 num_retries: number of retries for a test. 354 unique test collection for each test runner.
355 build_type: Either 'Debug' or 'Release'.
356 test_timeout: Watchdog timeout in seconds for running tests.
357 setup_timeout: Watchdog timeout in seconds for creating and cleaning up
358 test runners.
359 num_retries: Number of retries for a test.
287 360
288 Returns: 361 Returns:
289 A tuple of (base_test_result.TestRunResults object, exit code). 362 A tuple of (base_test_result.TestRunResults object, exit code).
290 """ 363 """
291 if not tests: 364 if not tests:
292 logging.error('No tests to run.') 365 logging.error('No tests to run.')
293 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE) 366 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
294 367
368 if shard:
369 # Generate a shared _TestCollection object for all test runners, so they
370 # draw from a common pool of tests.
371 shared_test_collection = _TestCollection([_Test(t) for t in tests])
372 test_collection_factory = lambda: shared_test_collection
373 tag_results_with_device = False
374 else:
375 # Generate a unique _TestCollection object for each test runner, but use
376 # the same set of tests.
377 test_collection_factory = lambda: _TestCollection([_Test(t) for t in tests])
378 tag_results_with_device = True
379
380 devices = _GetAttachedDevices(wait_for_debugger, test_device)
381
295 logging.info('Will run %d tests: %s', len(tests), str(tests)) 382 logging.info('Will run %d tests: %s', len(tests), str(tests))
383
296 forwarder.Forwarder.KillHost(build_type) 384 forwarder.Forwarder.KillHost(build_type)
297 runners = _CreateRunners(runner_factory, devices, setup_timeout) 385 runners = _CreateRunners(runner_factory, devices, setup_timeout)
298 try: 386 try:
299 return _RunAllTests(runners, tests, num_retries, test_timeout) 387 return _RunAllTests(runners, test_collection_factory,
388 num_retries, test_timeout, tag_results_with_device)
300 finally: 389 finally:
301 try: 390 try:
302 _TearDownRunners(runners, setup_timeout) 391 _TearDownRunners(runners, setup_timeout)
303 except android_commands.errors.DeviceUnresponsiveError as e: 392 except android_commands.errors.DeviceUnresponsiveError as e:
304 logging.warning('Device unresponsive during TearDown: [%s]', e) 393 logging.warning('Device unresponsive during TearDown: [%s]', e)
305 finally: 394 finally:
306 forwarder.Forwarder.KillHost(build_type) 395 forwarder.Forwarder.KillHost(build_type)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698