OLD | NEW |
| (Empty) |
1 #!/usr/bin/env python | |
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
3 # Use of this source code is governed by a BSD-style license that can be | |
4 # found in the LICENSE file. | |
5 | |
6 """Shards a given test suite and runs the shards in parallel. | |
7 | |
8 ShardingSupervisor is called to process the command line options and creates | |
9 the specified number of worker threads. These threads then run each shard of | |
10 the test in a separate process and report on the results. When all the shards | |
11 have been completed, the supervisor reprints any lines indicating a test | |
12 failure for convenience. If only one shard is to be run, a single subprocess | |
13 is started for that shard and the output is identical to gtest's output. | |
14 | |
15 Usage: python sharding_supervisor.py [options] path/to/test [gtest_args] | |
16 """ | |
17 | |
18 | |
19 import optparse | |
20 import os | |
21 import pty | |
22 import Queue | |
23 import subprocess | |
24 import sys | |
25 import threading | |
26 | |
27 | |
28 DEFAULT_NUM_CORES = 4 | |
29 DEFAULT_SHARDS_PER_CORE = 5 # num_shards = cores * SHARDS_PER_CORE | |
30 DEFAULT_RUNS_PER_CORE = 1 # num_workers = cores * RUNS_PER_CORE | |
31 | |
32 | |
33 def DetectNumCores(): | |
34 """Detects the number of cores on the machine. | |
35 | |
36 Returns: | |
37 The number of cores on the machine or DEFAULT_NUM_CORES if it could not | |
38 be found. | |
39 """ | |
40 try: | |
41 # Linux, Unix, MacOS | |
42 if hasattr(os, "sysconf"): | |
43 if "SC_NPROCESSORS_ONLN" in os.sysconf_names: | |
44 # Linux, Unix | |
45 return int(os.sysconf("SC_NPROCESSORS_ONLN")) | |
46 else: | |
47 # OSX | |
48 return int(os.popen2("sysctl -n hw.ncpu")[1].read()) | |
49 # Windows | |
50 return int(os.environ["NUMBER_OF_PROCESSORS"]) | |
51 except ValueError: | |
52 return DEFAULT_NUM_CORES | |
53 | |
54 | |
55 def RunShard(test, num_shards, index, gtest_args, stdout, stderr): | |
56 """Runs a single test shard in a subprocess. | |
57 | |
58 Returns: | |
59 The Popen object representing the subprocess handle. | |
60 """ | |
61 args = [test] | |
62 args.extend(gtest_args) | |
63 env = os.environ.copy() | |
64 env["GTEST_TOTAL_SHARDS"] = str(num_shards) | |
65 env["GTEST_SHARD_INDEX"] = str(index) | |
66 return subprocess.Popen( | |
67 args, stdout=stdout, stderr=stderr, env=env) | |
68 | |
69 | |
70 class ShardRunner(threading.Thread): | |
71 """Worker thread that manages a single shard at a time. | |
72 | |
73 Attributes: | |
74 supervisor: The ShardingSupervisor that this worker reports to. | |
75 counter: Called to get the next shard index to run. | |
76 """ | |
77 | |
78 def __init__(self, supervisor, counter): | |
79 """Inits ShardRunner with a supervisor and counter.""" | |
80 threading.Thread.__init__(self) | |
81 self.supervisor = supervisor | |
82 self.counter = counter | |
83 | |
84 def run(self): | |
85 """Runs shards and outputs the results. | |
86 | |
87 Gets the next shard index from the supervisor, runs it in a subprocess, | |
88 and collects the output. Each line is prefixed with 'N>', where N is the | |
89 current shard index. | |
90 """ | |
91 while True: | |
92 try: | |
93 index = self.counter.get_nowait() | |
94 except Queue.Empty: | |
95 break | |
96 shard = RunShard( | |
97 self.supervisor.test, self.supervisor.num_shards, index, | |
98 self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT) | |
99 while True: | |
100 line = shard.stdout.readline() | |
101 if not line: | |
102 if shard.poll() is not None: | |
103 break | |
104 continue | |
105 line = "%i>%s" % (index, line) | |
106 if (line.find("FAIL", 0, 20) >= 0 and line.find(".") >= 0 and | |
107 line.find("ms)")) < 0: | |
108 self.supervisor.LogLineFailure(line) | |
109 sys.stdout.write(line) | |
110 if shard.returncode != 0: | |
111 self.supervisor.LogShardFailure(index) | |
112 | |
113 | |
114 class ShardingSupervisor(object): | |
115 """Supervisor object that handles the worker threads. | |
116 | |
117 Attributes: | |
118 test: Name of the test to shard. | |
119 num_shards: Total number of shards to split the test into. | |
120 num_runs: Total number of worker threads to create for running shards. | |
121 color: Indicates which coloring mode to use in the output. | |
122 gtest_args: The options to pass to gtest. | |
123 failure_log: List of statements from shard output indicating a failure. | |
124 failed_shards: List of shards that contained failing tests. | |
125 """ | |
126 | |
127 def __init__( | |
128 self, test, num_shards, num_runs, color, gtest_args): | |
129 """Inits ShardingSupervisor with given options and gtest arguments.""" | |
130 self.test = test | |
131 self.num_shards = num_shards | |
132 self.num_runs = num_runs | |
133 self.color = color | |
134 self.gtest_args = gtest_args | |
135 self.failure_log = [] | |
136 self.failed_shards = [] | |
137 | |
138 def ShardTest(self): | |
139 """Runs the test and manages the worker threads. | |
140 | |
141 Runs the test and outputs a summary at the end. All the tests in the | |
142 suite are run by creating (cores * runs_per_core) threads and | |
143 (cores * shards_per_core) shards. When all the worker threads have | |
144 finished, the lines saved in the failure_log are printed again. | |
145 | |
146 Returns: | |
147 The number of shards that had failing tests. | |
148 """ | |
149 workers = [] | |
150 counter = Queue.Queue() | |
151 for i in range(self.num_shards): | |
152 counter.put(i) | |
153 for i in range(self.num_runs): | |
154 worker = ShardRunner(self, counter) | |
155 worker.start() | |
156 workers.append(worker) | |
157 for worker in workers: | |
158 worker.join() | |
159 return self.PrintSummary() | |
160 | |
161 def LogLineFailure(self, line): | |
162 """Saves a line in the failure log to be printed at the end. | |
163 | |
164 Args: | |
165 line: The line to save in the failure_log. | |
166 """ | |
167 self.failure_log.append(line) | |
168 | |
169 def LogShardFailure(self, index): | |
170 """Records that a test in the given shard has failed. | |
171 | |
172 Args: | |
173 index: The index of the failing shard. | |
174 """ | |
175 self.failed_shards.append(index) | |
176 | |
177 def PrintSummary(self): | |
178 """Prints a summary of the test results. | |
179 | |
180 If any shards had failing tests, the list is sorted and printed. Then all | |
181 the lines that indicate a test failure are reproduced. | |
182 | |
183 Returns: | |
184 The number of shards that had failing tests. | |
185 """ | |
186 sys.stderr.write("\n") | |
187 num_failed = len(self.failed_shards) | |
188 if num_failed > 0: | |
189 self.failed_shards.sort() | |
190 if self.color: | |
191 sys.stderr.write("\x1b[1;5;31m") | |
192 sys.stderr.write("FAILED SHARDS: %s\n" % str(self.failed_shards)) | |
193 else: | |
194 if self.color: | |
195 sys.stderr.write("\x1b[1;5;32m") | |
196 sys.stderr.write("ALL SHARDS PASSED!\n") | |
197 if self.failure_log: | |
198 if self.color: | |
199 sys.stderr.write("\x1b[1;5;31m") | |
200 sys.stderr.write("FAILED TESTS:\n") | |
201 if self.color: | |
202 sys.stderr.write("\x1b[0;37m") | |
203 for line in self.failure_log: | |
204 sys.stderr.write(line) | |
205 if self.color: | |
206 sys.stderr.write("\x1b[0;37m") | |
207 return num_failed | |
208 | |
209 | |
210 def main(): | |
211 parser = optparse.OptionParser() | |
212 parser.add_option( | |
213 "-n", "--shards_per_core", type="int", default=DEFAULT_SHARDS_PER_CORE, | |
214 help="number of shards to generate per CPU") | |
215 parser.add_option( | |
216 "-r", "--runs_per_core", type="int", default=DEFAULT_RUNS_PER_CORE, | |
217 help="number of shards to run in parallel per CPU") | |
218 parser.add_option( | |
219 "-c", "--color", action="store_true", default=sys.stdout.isatty(), | |
220 help="force color output, also used by gtest if --gtest_color is not" | |
221 " specified") | |
222 parser.add_option( | |
223 "--no-color", action="store_false", dest="color", | |
224 help="disable color output") | |
225 parser.add_option( | |
226 "-s", "--runshard", type="int", help="single shard index to run") | |
227 parser.disable_interspersed_args() | |
228 (options, args) = parser.parse_args() | |
229 | |
230 if not args: | |
231 parser.error("You must specify a path to test!") | |
232 if not os.path.exists(args[0]): | |
233 parser.error("%s does not exist!" % args[0]) | |
234 | |
235 num_cores = DetectNumCores() | |
236 | |
237 if options.shards_per_core < 1: | |
238 parser.error("You must have at least 1 shard per core!") | |
239 num_shards = num_cores * options.shards_per_core | |
240 | |
241 if options.runs_per_core < 1: | |
242 parser.error("You must have at least 1 run per core!") | |
243 num_runs = num_cores * options.runs_per_core | |
244 | |
245 gtest_args = ["--gtest_color=%s" % { | |
246 True: "yes", False: "no"}[options.color]] + args[1:] | |
247 | |
248 if options.runshard != None: | |
249 # run a single shard and exit | |
250 if (options.runshard < 0 or options.runshard >= num_shards): | |
251 parser.error("Invalid shard number given parameters!") | |
252 shard = RunShard( | |
253 args[0], num_shards, options.runshard, gtest_args, None, None) | |
254 shard.communicate() | |
255 return shard.poll() | |
256 | |
257 # shard and run the whole test | |
258 ss = ShardingSupervisor( | |
259 args[0], num_shards, num_runs, options.color, gtest_args) | |
260 return ss.ShardTest() | |
261 | |
262 | |
263 if __name__ == "__main__": | |
264 sys.exit(main()) | |
265 | |
OLD | NEW |