Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(179)

Side by Side Diff: build/android/pylib/base/shard.py

Issue 18770008: [Android] Redesigns the sharder to allow replicated vs distributed tests (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Small fixes to formatting Created 7 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4
5 """Implements test sharding logic."""
6
7 import logging
8 import threading
9
10 from pylib import android_commands
11 from pylib import constants
12 from pylib import forwarder
13 from pylib.utils import reraiser_thread
14 from pylib.utils import watchdog_timer
15
16 import base_test_result
17
18
19 DEFAULT_TIMEOUT = 7 * 60 # seven minutes
20
21
22 class _ThreadSafeCounter(object):
23 """A threadsafe counter."""
24
25 def __init__(self):
26 self._lock = threading.Lock()
27 self._value = 0
28
29 def GetAndIncrement(self):
30 """Get the current value and increment it atomically.
31
32 Returns:
33 The value before incrementing.
34 """
35 with self._lock:
36 pre_increment = self._value
37 self._value += 1
38 return pre_increment
39
40
41 class _Test(object):
42 """Holds a test with additional metadata."""
43
44 def __init__(self, test, tries=0):
45 """Initializes the _Test object.
46
47 Args:
48 test: the test.
49 tries: number of tries so far.
50 """
51 self.test = test
52 self.tries = tries
53
54
55 class _TestCollection(object):
56 """A threadsafe collection of tests.
57
58 Args:
59 tests: list of tests to put in the collection.
60 """
61
62 def __init__(self, tests=[]):
63 self._lock = threading.Lock()
64 self._tests = []
65 self._tests_in_progress = 0
66 # Used to signal that an item is avaliable or all items have been handled.
67 self._item_avaliable_or_all_done = threading.Event()
68 for t in tests:
69 self.add(t)
70
71 def _pop(self):
72 """Pop a test from the collection.
73
74 Waits until a test is avaliable or all tests have been handled.
75
76 Returns:
77 A test or None if all tests have been handled.
78 """
79 while True:
80 # Wait for a test to be avaliable or all tests to have been handled.
81 self._item_avaliable_or_all_done.wait()
82 with self._lock:
83 # Check which of the two conditions triggered the signal.
84 if self._tests_in_progress == 0:
85 return None
86 try:
87 return self._tests.pop(0)
88 except IndexError:
89 # Another thread beat us to the avaliable test, wait again.
90 self._item_avaliable_or_all_done.clear()
91
92 def add(self, test):
93 """Add an test to the collection.
94
95 Args:
96 test: A test to add.
97 """
98 with self._lock:
99 self._tests.append(test)
100 self._item_avaliable_or_all_done.set()
101 self._tests_in_progress += 1
102
103 def test_completed(self):
104 """Indicate that a test has been fully handled."""
105 with self._lock:
106 self._tests_in_progress -= 1
107 if self._tests_in_progress == 0:
108 # All tests have been handled, signal all waiting threads.
109 self._item_avaliable_or_all_done.set()
110
111 def __iter__(self):
112 """Iterate through tests in the collection until all have been handled."""
113 while True:
114 r = self._pop()
115 if r is None:
116 break
117 yield r
118
119
120 def _RunTestsFromQueue(runner, test_collection, out_results, watcher,
121 num_retries):
122 """Runs tests from the test_collection until empty using the given runner.
123
124 Adds TestRunResults objects to the out_results list and may add tests to the
125 out_retry list.
126
127 Args:
128 runner: A TestRunner object used to run the tests.
129 test_collection: A _TestCollection from which to get _Test objects to run.
130 out_results: A list to add TestRunResults to.
131 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
132 num_retries: Number of retries for a test.
133 """
134 for test in test_collection:
135 watcher.Reset()
136 try:
137 if not android_commands.IsDeviceAttached(runner.device):
138 # Device is unresponsive, stop handling tests on this device.
139 msg = 'Device %s is unresponsive.' % runner.device
140 logging.warning(msg)
141 raise android_commands.errors.DeviceUnresponsiveError(msg)
142 result, retry = runner.RunTest(test.test)
143 test.tries += 1
144 if retry and test.tries <= num_retries:
145 # Retry non-passing results, only record passing results.
146 pass_results = base_test_result.TestRunResults()
147 pass_results.AddResults(result.GetPass())
148 out_results.append(pass_results)
149 logging.warning('Will retry test, try #%s.' % test.tries)
150 test_collection.add(_Test(test=retry, tries=test.tries))
151 else:
152 # All tests passed or retry limit reached. Either way, record results.
153 out_results.append(result)
154 except:
155 # An unhandleable exception, ensure tests get run by another device and
156 # reraise this exception on the main thread.
157 test_collection.add(test)
158 raise
159 finally:
160 # Retries count as separate tasks so always mark the popped test as done.
161 test_collection.test_completed()
162
163
164 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
165 """Creates a test runner for each device and calls SetUp() in parallel.
166
167 Note: if a device is unresponsive the corresponding TestRunner will not be
168 added to out_runners.
169
170 Args:
171 runner_factory: callable that takes a device and index and returns a
172 TestRunner object.
173 device: the device serial number to set up.
174 out_runners: list to add the successfully set up TestRunner object.
175 threadsafe_counter: a _ThreadSafeCounter object used to get shard indices.
176 """
177 try:
178 index = threadsafe_counter.GetAndIncrement()
179 logging.warning('Creating shard %s for device %s.', index, device)
180 runner = runner_factory(device, index)
181 runner.SetUp()
182 out_runners.append(runner)
183 except android_commands.errors.DeviceUnresponsiveError as e:
184 logging.warning('Failed to create shard for %s: [%s]', device, e)
185
186
187 def _RunAllTests(runners, tests, num_retries, timeout=None):
188 """Run all tests using the given TestRunners.
189
190 Args:
191 runners: a list of TestRunner objects.
192 tests: a list of Tests to run using the given TestRunners.
193 num_retries: number of retries for a test.
194 timeout: watchdog timeout in seconds, defaults to the default timeout.
195
196 Returns:
197 A tuple of (TestRunResults object, exit code)
198 """
199 logging.warning('Running %s tests with %s test runners.' %
200 (len(tests), len(runners)))
201 tests_collection = _TestCollection([_Test(t) for t in tests])
202 results = []
203 exit_code = 0
204 watcher = watchdog_timer.WatchdogTimer(timeout)
205 workers = reraiser_thread.ReraiserThreadGroup(
206 [reraiser_thread.ReraiserThread(
207 _RunTestsFromQueue,
208 [r, tests_collection, results, watcher, num_retries],
209 name=r.device[-4:])
210 for r in runners])
211 run_results = base_test_result.TestRunResults()
212 workers.StartAll()
213
214 # Catch DeviceUnresponsiveErrors and set a warning exit code
215 try:
216 workers.JoinAll(watcher)
217 except android_commands.errors.DeviceUnresponsiveError as e:
218 logging.error(e)
219 exit_code = constants.WARNING_EXIT_CODE
220
221 for r in results:
222 run_results.AddTestRunResults(r)
223 if not run_results.DidRunPass():
224 exit_code = constants.ERROR_EXIT_CODE
225 return (run_results, exit_code)
226
227
228 def _CreateRunners(runner_factory, devices, timeout=None):
229 """Creates a test runner for each device and calls SetUp() in parallel.
230
231 Note: if a device is unresponsive the corresponding TestRunner will not be
232 included in the returned list.
233
234 Args:
235 runner_factory: callable that takes a device and index and returns a
236 TestRunner object.
237 devices: list of device serial numbers as strings.
238 timeout: watchdog timeout in seconds, defaults to the default timeout.
239
240 Returns:
241 A list of TestRunner objects.
242 """
243 logging.warning('Creating %s test runners.' % len(devices))
244 runners = []
245 counter = _ThreadSafeCounter()
246 threads = reraiser_thread.ReraiserThreadGroup(
247 [reraiser_thread.ReraiserThread(_SetUp,
248 [runner_factory, d, runners, counter],
249 name=d[-4:])
250 for d in devices])
251 threads.StartAll()
252 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
253 return runners
254
255
256 def _TearDownRunners(runners, timeout=None):
257 """Calls TearDown() for each test runner in parallel.
258
259 Args:
260 runners: a list of TestRunner objects.
261 timeout: watchdog timeout in seconds, defaults to the default timeout.
262 """
263 threads = reraiser_thread.ReraiserThreadGroup(
264 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device[-4:])
265 for r in runners])
266 threads.StartAll()
267 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
268
269
270 def ShardAndRunTests(runner_factory, devices, tests, build_type='Debug',
271 test_timeout=DEFAULT_TIMEOUT,
272 setup_timeout=DEFAULT_TIMEOUT,
273 num_retries=2):
274 """Run all tests on attached devices, retrying tests that don't pass.
275
276 Args:
277 runner_factory: callable that takes a device and index and returns a
278 TestRunner object.
279 devices: list of attached device serial numbers as strings.
280 tests: list of tests to run.
281 build_type: either 'Debug' or 'Release'.
282 test_timeout: watchdog timeout in seconds for running tests, defaults to the
283 default timeout.
284 setup_timeout: watchdog timeout in seconds for creating and cleaning up
285 test runners, defaults to the default timeout.
286 num_retries: number of retries for a test.
287
288 Returns:
289 A tuple of (base_test_result.TestRunResults object, exit code).
290 """
291 if not tests:
292 logging.error('No tests to run.')
293 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
294
295 logging.info('Will run %d tests: %s', len(tests), str(tests))
296 forwarder.Forwarder.KillHost(build_type)
297 runners = _CreateRunners(runner_factory, devices, setup_timeout)
298 try:
299 return _RunAllTests(runners, tests, num_retries, test_timeout)
300 finally:
301 try:
302 _TearDownRunners(runners, setup_timeout)
303 except android_commands.errors.DeviceUnresponsiveError as e:
304 logging.warning('Device unresponsive during TearDown: [%s]', e)
305 finally:
306 forwarder.Forwarder.KillHost(build_type)
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698