OLD | NEW |
| (Empty) |
1 # Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 """Takes care of sharding the python-drive tests in multiple devices.""" | |
6 | |
7 import copy | |
8 import logging | |
9 import multiprocessing | |
10 | |
11 from python_test_caller import CallPythonTest | |
12 from run_java_tests import FatalTestException | |
13 import sharded_tests_queue | |
14 from test_result import TestResults | |
15 | |
16 | |
17 def SetTestsContainer(tests_container): | |
18 """Sets PythonTestSharder as a top-level field. | |
19 | |
20 PythonTestSharder uses multiprocessing.Pool, which creates a pool of | |
21 processes. This is used to initialize each worker in the pool, ensuring that | |
22 each worker has access to this shared pool of tests. | |
23 | |
24 The multiprocessing module requires that this be a top-level method. | |
25 | |
26 Args: | |
27 tests_container: the container for all the tests. | |
28 """ | |
29 PythonTestSharder.tests_container = tests_container | |
30 | |
31 | |
32 def _DefaultRunnable(test_runner): | |
33 """A default runnable for a PythonTestRunner. | |
34 | |
35 Args: | |
36 test_runner: A PythonTestRunner which will run tests. | |
37 | |
38 Returns: | |
39 The test results. | |
40 """ | |
41 return test_runner.RunTests() | |
42 | |
43 | |
44 class PythonTestRunner(object): | |
45 """Thin wrapper around a list of PythonTestBase instances. | |
46 | |
47 This is meant to be a long-lived object which can run multiple Python tests | |
48 within its lifetime. Tests will receive the device_id and shard_index. | |
49 | |
50 The shard index affords the ability to create unique port numbers (e.g. | |
51 DEFAULT_PORT + shard_index) if the test so wishes. | |
52 """ | |
53 | |
54 def __init__(self, options): | |
55 """Constructor. | |
56 | |
57 Args: | |
58 options: Options to use for setting up tests. | |
59 """ | |
60 self.options = options | |
61 | |
62 def RunTests(self): | |
63 """Runs tests from the shared pool of tests, aggregating results. | |
64 | |
65 Returns: | |
66 A list of test results for all of the tests which this runner executed. | |
67 """ | |
68 tests = PythonTestSharder.tests_container | |
69 | |
70 results = [] | |
71 for t in tests: | |
72 res = CallPythonTest(t, self.options) | |
73 results.append(res) | |
74 | |
75 return TestResults.FromTestResults(results) | |
76 | |
77 | |
78 class PythonTestSharder(object): | |
79 """Runs Python tests in parallel on multiple devices. | |
80 | |
81 This is lifted more or less wholesale from BaseTestRunner. | |
82 | |
83 Under the covers, it creates a pool of long-lived PythonTestRunners, which | |
84 execute tests from the pool of tests. | |
85 | |
86 Args: | |
87 attached_devices: a list of device IDs attached to the host. | |
88 available_tests: a list of tests to run which subclass PythonTestBase. | |
89 options: Options to use for setting up tests. | |
90 | |
91 Returns: | |
92 An aggregated list of test results. | |
93 """ | |
94 tests_container = None | |
95 | |
96 def __init__(self, attached_devices, available_tests, options): | |
97 self.options = options | |
98 self.attached_devices = attached_devices | |
99 self.retries = options.shard_retries | |
100 self.tests = available_tests | |
101 | |
102 def _SetupSharding(self, tests): | |
103 """Creates the shared pool of tests and makes it available to test runners. | |
104 | |
105 Args: | |
106 tests: the list of tests which will be consumed by workers. | |
107 """ | |
108 SetTestsContainer(sharded_tests_queue.ShardedTestsQueue( | |
109 len(self.attached_devices), tests)) | |
110 | |
111 def RunShardedTests(self): | |
112 """Runs tests in parallel using a pool of workers. | |
113 | |
114 Returns: | |
115 A list of test results aggregated from all test runs. | |
116 """ | |
117 logging.warning('*' * 80) | |
118 logging.warning('Sharding in ' + str(len(self.attached_devices)) + | |
119 ' devices.') | |
120 logging.warning('Note that the output is not synchronized.') | |
121 logging.warning('Look for the "Final result" banner in the end.') | |
122 logging.warning('*' * 80) | |
123 all_passed = [] | |
124 test_results = TestResults() | |
125 tests_to_run = self.tests | |
126 for retry in xrange(self.retries): | |
127 logging.warning('Try %d of %d', retry + 1, self.retries) | |
128 self._SetupSharding(self.tests) | |
129 test_runners = self._MakeTestRunners(self.attached_devices) | |
130 logging.warning('Starting...') | |
131 pool = multiprocessing.Pool(len(self.attached_devices), | |
132 SetTestsContainer, | |
133 [PythonTestSharder.tests_container]) | |
134 | |
135 # List of TestResults objects from each test execution. | |
136 try: | |
137 results_lists = pool.map(_DefaultRunnable, test_runners) | |
138 except Exception: | |
139 logging.exception('Unable to run tests. Something with the ' | |
140 'PythonTestRunners has gone wrong.') | |
141 raise FatalTestException('PythonTestRunners were unable to run tests.') | |
142 | |
143 test_results = TestResults.FromTestResults(results_lists) | |
144 # Accumulate passing results. | |
145 all_passed += test_results.ok | |
146 # If we have failed tests, map them to tests to retry. | |
147 failed_tests = test_results.GetAllBroken() | |
148 tests_to_run = self._GetTestsToRetry(self.tests, | |
149 failed_tests) | |
150 | |
151 # Bail out early if we have no more tests. This can happen if all tests | |
152 # pass before we're out of retries, for example. | |
153 if not tests_to_run: | |
154 break | |
155 | |
156 final_results = TestResults() | |
157 # all_passed has accumulated all passing test results. | |
158 # test_results will have the results from the most recent run, which could | |
159 # include a variety of failure modes (unknown, crashed, failed, etc). | |
160 final_results = test_results | |
161 final_results.ok = all_passed | |
162 | |
163 return final_results | |
164 | |
165 def _MakeTestRunners(self, attached_devices): | |
166 """Initialize and return a list of PythonTestRunners. | |
167 | |
168 Args: | |
169 attached_devices: list of device IDs attached to host. | |
170 | |
171 Returns: | |
172 A list of PythonTestRunners, one for each device. | |
173 """ | |
174 test_runners = [] | |
175 for index, device in enumerate(attached_devices): | |
176 logging.warning('*' * 80) | |
177 logging.warning('Creating shard %d for %s', index, device) | |
178 logging.warning('*' * 80) | |
179 # Bind the PythonTestRunner to a device & shard index. Give it the | |
180 # runnable which it will use to actually execute the tests. | |
181 test_options = copy.deepcopy(self.options) | |
182 test_options.ensure_value('device_id', device) | |
183 test_options.ensure_value('shard_index', index) | |
184 test_runner = PythonTestRunner(test_options) | |
185 test_runners.append(test_runner) | |
186 | |
187 return test_runners | |
188 | |
189 def _GetTestsToRetry(self, available_tests, failed_tests): | |
190 """Infers a list of tests to retry from failed tests and available tests. | |
191 | |
192 Args: | |
193 available_tests: a list of tests which subclass PythonTestBase. | |
194 failed_tests: a list of SingleTestResults representing failed tests. | |
195 | |
196 Returns: | |
197 A list of test objects which correspond to test names found in | |
198 failed_tests, or an empty list if there is no correspondence. | |
199 """ | |
200 failed_test_names = map(lambda t: t.test_name, failed_tests) | |
201 tests_to_retry = [t for t in available_tests | |
202 if t.qualified_name in failed_test_names] | |
203 return tests_to_retry | |
OLD | NEW |