Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(69)

Side by Side Diff: chrome/tools/sharding_supervisor/sharding_supervisor.py

Issue 7312026: Moved sharding_supervisor (Closed) Base URL: http://git.chromium.org/git/chromium.git@trunk
Patch Set: Created 9 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « no previous file | tools/sharding_supervisor/sharding_supervisor.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 #!/usr/bin/env python
2 # Copyright (c) 2011 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Shards a given test suite and runs the shards in parallel.
7
8 ShardingSupervisor is called to process the command line options and creates
9 the specified number of worker threads. These threads then run each shard of
10 the test in a separate process and report on the results. When all the shards
11 have been completed, the supervisor reprints any lines indicating a test
12 failure for convenience. If only one shard is to be run, a single subprocess
13 is started for that shard and the output is identical to gtest's output.
14
15 Usage: python sharding_supervisor.py [options] path/to/test [gtest_args]
16 """
17
18
19 import optparse
20 import os
21 import pty
22 import Queue
23 import subprocess
24 import sys
25 import threading
26
27
28 DEFAULT_NUM_CORES = 4
29 DEFAULT_SHARDS_PER_CORE = 5 # num_shards = cores * SHARDS_PER_CORE
30 DEFAULT_RUNS_PER_CORE = 1 # num_workers = cores * RUNS_PER_CORE
31
32
33 def DetectNumCores():
34 """Detects the number of cores on the machine.
35
36 Returns:
37 The number of cores on the machine or DEFAULT_NUM_CORES if it could not
38 be found.
39 """
40 try:
41 # Linux, Unix, MacOS
42 if hasattr(os, "sysconf"):
43 if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
44 # Linux, Unix
45 return int(os.sysconf("SC_NPROCESSORS_ONLN"))
46 else:
47 # OSX
48 return int(os.popen2("sysctl -n hw.ncpu")[1].read())
49 # Windows
50 return int(os.environ["NUMBER_OF_PROCESSORS"])
51 except ValueError:
52 return DEFAULT_NUM_CORES
53
54
55 def RunShard(test, num_shards, index, gtest_args, stdout, stderr):
56 """Runs a single test shard in a subprocess.
57
58 Returns:
59 The Popen object representing the subprocess handle.
60 """
61 args = [test]
62 args.extend(gtest_args)
63 env = os.environ.copy()
64 env["GTEST_TOTAL_SHARDS"] = str(num_shards)
65 env["GTEST_SHARD_INDEX"] = str(index)
66 return subprocess.Popen(
67 args, stdout=stdout, stderr=stderr, env=env)
68
69
70 class ShardRunner(threading.Thread):
71 """Worker thread that manages a single shard at a time.
72
73 Attributes:
74 supervisor: The ShardingSupervisor that this worker reports to.
75 counter: Called to get the next shard index to run.
76 """
77
78 def __init__(self, supervisor, counter):
79 """Inits ShardRunner with a supervisor and counter."""
80 threading.Thread.__init__(self)
81 self.supervisor = supervisor
82 self.counter = counter
83
84 def run(self):
85 """Runs shards and outputs the results.
86
87 Gets the next shard index from the supervisor, runs it in a subprocess,
88 and collects the output. Each line is prefixed with 'N>', where N is the
89 current shard index.
90 """
91 while True:
92 try:
93 index = self.counter.get_nowait()
94 except Queue.Empty:
95 break
96 shard = RunShard(
97 self.supervisor.test, self.supervisor.num_shards, index,
98 self.supervisor.gtest_args, subprocess.PIPE, subprocess.STDOUT)
99 while True:
100 line = shard.stdout.readline()
101 if not line:
102 if shard.poll() is not None:
103 break
104 continue
105 line = "%i>%s" % (index, line)
106 if (line.find("FAIL", 0, 20) >= 0 and line.find(".") >= 0 and
107 line.find("ms)")) < 0:
108 self.supervisor.LogLineFailure(line)
109 sys.stdout.write(line)
110 if shard.returncode != 0:
111 self.supervisor.LogShardFailure(index)
112
113
114 class ShardingSupervisor(object):
115 """Supervisor object that handles the worker threads.
116
117 Attributes:
118 test: Name of the test to shard.
119 num_shards: Total number of shards to split the test into.
120 num_runs: Total number of worker threads to create for running shards.
121 color: Indicates which coloring mode to use in the output.
122 gtest_args: The options to pass to gtest.
123 failure_log: List of statements from shard output indicating a failure.
124 failed_shards: List of shards that contained failing tests.
125 """
126
127 def __init__(
128 self, test, num_shards, num_runs, color, gtest_args):
129 """Inits ShardingSupervisor with given options and gtest arguments."""
130 self.test = test
131 self.num_shards = num_shards
132 self.num_runs = num_runs
133 self.color = color
134 self.gtest_args = gtest_args
135 self.failure_log = []
136 self.failed_shards = []
137
138 def ShardTest(self):
139 """Runs the test and manages the worker threads.
140
141 Runs the test and outputs a summary at the end. All the tests in the
142 suite are run by creating (cores * runs_per_core) threads and
143 (cores * shards_per_core) shards. When all the worker threads have
144 finished, the lines saved in the failure_log are printed again.
145
146 Returns:
147 The number of shards that had failing tests.
148 """
149 workers = []
150 counter = Queue.Queue()
151 for i in range(self.num_shards):
152 counter.put(i)
153 for i in range(self.num_runs):
154 worker = ShardRunner(self, counter)
155 worker.start()
156 workers.append(worker)
157 for worker in workers:
158 worker.join()
159 return self.PrintSummary()
160
161 def LogLineFailure(self, line):
162 """Saves a line in the failure log to be printed at the end.
163
164 Args:
165 line: The line to save in the failure_log.
166 """
167 self.failure_log.append(line)
168
169 def LogShardFailure(self, index):
170 """Records that a test in the given shard has failed.
171
172 Args:
173 index: The index of the failing shard.
174 """
175 self.failed_shards.append(index)
176
177 def PrintSummary(self):
178 """Prints a summary of the test results.
179
180 If any shards had failing tests, the list is sorted and printed. Then all
181 the lines that indicate a test failure are reproduced.
182
183 Returns:
184 The number of shards that had failing tests.
185 """
186 sys.stderr.write("\n")
187 num_failed = len(self.failed_shards)
188 if num_failed > 0:
189 self.failed_shards.sort()
190 if self.color:
191 sys.stderr.write("\x1b[1;5;31m")
192 sys.stderr.write("FAILED SHARDS: %s\n" % str(self.failed_shards))
193 else:
194 if self.color:
195 sys.stderr.write("\x1b[1;5;32m")
196 sys.stderr.write("ALL SHARDS PASSED!\n")
197 if self.failure_log:
198 if self.color:
199 sys.stderr.write("\x1b[1;5;31m")
200 sys.stderr.write("FAILED TESTS:\n")
201 if self.color:
202 sys.stderr.write("\x1b[0;37m")
203 for line in self.failure_log:
204 sys.stderr.write(line)
205 if self.color:
206 sys.stderr.write("\x1b[0;37m")
207 return num_failed
208
209
210 def main():
211 parser = optparse.OptionParser()
212 parser.add_option(
213 "-n", "--shards_per_core", type="int", default=DEFAULT_SHARDS_PER_CORE,
214 help="number of shards to generate per CPU")
215 parser.add_option(
216 "-r", "--runs_per_core", type="int", default=DEFAULT_RUNS_PER_CORE,
217 help="number of shards to run in parallel per CPU")
218 parser.add_option(
219 "-c", "--color", action="store_true", default=sys.stdout.isatty(),
220 help="force color output, also used by gtest if --gtest_color is not"
221 " specified")
222 parser.add_option(
223 "--no-color", action="store_false", dest="color",
224 help="disable color output")
225 parser.add_option(
226 "-s", "--runshard", type="int", help="single shard index to run")
227 parser.disable_interspersed_args()
228 (options, args) = parser.parse_args()
229
230 if not args:
231 parser.error("You must specify a path to test!")
232 if not os.path.exists(args[0]):
233 parser.error("%s does not exist!" % args[0])
234
235 num_cores = DetectNumCores()
236
237 if options.shards_per_core < 1:
238 parser.error("You must have at least 1 shard per core!")
239 num_shards = num_cores * options.shards_per_core
240
241 if options.runs_per_core < 1:
242 parser.error("You must have at least 1 run per core!")
243 num_runs = num_cores * options.runs_per_core
244
245 gtest_args = ["--gtest_color=%s" % {
246 True: "yes", False: "no"}[options.color]] + args[1:]
247
248 if options.runshard != None:
249 # run a single shard and exit
250 if (options.runshard < 0 or options.runshard >= num_shards):
251 parser.error("Invalid shard number given parameters!")
252 shard = RunShard(
253 args[0], num_shards, options.runshard, gtest_args, None, None)
254 shard.communicate()
255 return shard.poll()
256
257 # shard and run the whole test
258 ss = ShardingSupervisor(
259 args[0], num_shards, num_runs, options.color, gtest_args)
260 return ss.ShardTest()
261
262
263 if __name__ == "__main__":
264 sys.exit(main())
265
OLDNEW
« no previous file with comments | « no previous file | tools/sharding_supervisor/sharding_supervisor.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698