Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Side by Side Diff: third_party/grpc/tools/run_tests/jobset.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 # Copyright 2015-2016, Google Inc.
2 # All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 # * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 # * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Run a group of subprocesses and then finish."""
31
32 import hashlib
33 import multiprocessing
34 import os
35 import platform
36 import re
37 import signal
38 import subprocess
39 import sys
40 import tempfile
41 import time
42
43
44 # cpu cost measurement
45 measure_cpu_costs = False
46
47
48 _DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
49 _MAX_RESULT_SIZE = 8192
50
51 def platform_string():
52 if platform.system() == 'Windows':
53 return 'windows'
54 elif platform.system()[:7] == 'MSYS_NT':
55 return 'windows'
56 elif platform.system() == 'Darwin':
57 return 'mac'
58 elif platform.system() == 'Linux':
59 return 'linux'
60 else:
61 return 'posix'
62
63
64 # setup a signal handler so that signal.pause registers 'something'
65 # when a child finishes
66 # not using futures and threading to avoid a dependency on subprocess32
67 if platform_string() == 'windows':
68 pass
69 else:
70 have_alarm = False
71 def alarm_handler(unused_signum, unused_frame):
72 global have_alarm
73 have_alarm = False
74
75 signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
76 signal.signal(signal.SIGALRM, alarm_handler)
77
78
79 _SUCCESS = object()
80 _FAILURE = object()
81 _RUNNING = object()
82 _KILLED = object()
83
84
85 _COLORS = {
86 'red': [ 31, 0 ],
87 'green': [ 32, 0 ],
88 'yellow': [ 33, 0 ],
89 'lightgray': [ 37, 0],
90 'gray': [ 30, 1 ],
91 'purple': [ 35, 0 ],
92 }
93
94
95 _BEGINNING_OF_LINE = '\x1b[0G'
96 _CLEAR_LINE = '\x1b[2K'
97
98
99 _TAG_COLOR = {
100 'FAILED': 'red',
101 'FLAKE': 'purple',
102 'TIMEOUT_FLAKE': 'purple',
103 'WARNING': 'yellow',
104 'TIMEOUT': 'red',
105 'PASSED': 'green',
106 'START': 'gray',
107 'WAITING': 'yellow',
108 'SUCCESS': 'green',
109 'IDLE': 'gray',
110 }
111
112
113 def message(tag, msg, explanatory_text=None, do_newline=False):
114 if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
115 return
116 message.old_tag = tag
117 message.old_msg = msg
118 try:
119 if platform_string() == 'windows' or not sys.stdout.isatty():
120 if explanatory_text:
121 print explanatory_text
122 print '%s: %s' % (tag, msg)
123 return
124 sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
125 _BEGINNING_OF_LINE,
126 _CLEAR_LINE,
127 '\n%s' % explanatory_text if explanatory_text is not None else '',
128 _COLORS[_TAG_COLOR[tag]][1],
129 _COLORS[_TAG_COLOR[tag]][0],
130 tag,
131 msg,
132 '\n' if do_newline or explanatory_text is not None else ''))
133 sys.stdout.flush()
134 except:
135 pass
136
137 message.old_tag = ''
138 message.old_msg = ''
139
140 def which(filename):
141 if '/' in filename:
142 return filename
143 for path in os.environ['PATH'].split(os.pathsep):
144 if os.path.exists(os.path.join(path, filename)):
145 return os.path.join(path, filename)
146 raise Exception('%s not found' % filename)
147
148
149 class JobSpec(object):
150 """Specifies what to run for a job."""
151
152 def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None,
153 cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
154 timeout_retries=0, kill_handler=None, cpu_cost=1.0):
155 """
156 Arguments:
157 cmdline: a list of arguments to pass as the command line
158 environ: a dictionary of environment variables to set in the child process
159 hash_targets: which files to include in the hash representing the jobs ver sion
160 (or empty, indicating the job should not be hashed)
161 kill_handler: a handler that will be called whenever job.kill() is invoked
162 cpu_cost: number of cores per second this job needs
163 """
164 if environ is None:
165 environ = {}
166 if hash_targets is None:
167 hash_targets = []
168 self.cmdline = cmdline
169 self.environ = environ
170 self.shortname = cmdline[0] if shortname is None else shortname
171 self.hash_targets = hash_targets or []
172 self.cwd = cwd
173 self.shell = shell
174 self.timeout_seconds = timeout_seconds
175 self.flake_retries = flake_retries
176 self.timeout_retries = timeout_retries
177 self.kill_handler = kill_handler
178 self.cpu_cost = cpu_cost
179
180 def identity(self):
181 return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets)
182
183 def __hash__(self):
184 return hash(self.identity())
185
186 def __cmp__(self, other):
187 return self.identity() == other.identity()
188
189 def __repr__(self):
190 return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, self.cmdline)
191
192
193 class JobResult(object):
194 def __init__(self):
195 self.state = 'UNKNOWN'
196 self.returncode = -1
197 self.elapsed_time = 0
198 self.num_failures = 0
199 self.retries = 0
200 self.message = ''
201
202
203 class Job(object):
204 """Manages one job."""
205
206 def __init__(self, spec, bin_hash, newline_on_success, travis, add_env):
207 self._spec = spec
208 self._bin_hash = bin_hash
209 self._newline_on_success = newline_on_success
210 self._travis = travis
211 self._add_env = add_env.copy()
212 self._retries = 0
213 self._timeout_retries = 0
214 self._suppress_failure_message = False
215 message('START', spec.shortname, do_newline=self._travis)
216 self.result = JobResult()
217 self.start()
218
219 def GetSpec(self):
220 return self._spec
221
222 def start(self):
223 self._tempfile = tempfile.TemporaryFile()
224 env = dict(os.environ)
225 env.update(self._spec.environ)
226 env.update(self._add_env)
227 self._start = time.time()
228 cmdline = self._spec.cmdline
229 if measure_cpu_costs:
230 cmdline = ['time', '--portability'] + cmdline
231 try_start = lambda: subprocess.Popen(args=cmdline,
232 stderr=subprocess.STDOUT,
233 stdout=self._tempfile,
234 cwd=self._spec.cwd,
235 shell=self._spec.shell,
236 env=env)
237 delay = 0.3
238 for i in range(0, 4):
239 try:
240 self._process = try_start()
241 break
242 except OSError:
243 message('WARNING', 'Failed to start %s, retrying in %f seconds' % (self. _spec.shortname, delay))
244 time.sleep(delay)
245 delay *= 2
246 else:
247 self._process = try_start()
248 self._state = _RUNNING
249
250 def state(self, update_cache):
251 """Poll current state of the job. Prints messages at completion."""
252 def stdout(self=self):
253 self._tempfile.seek(0)
254 stdout = self._tempfile.read()
255 self.result.message = stdout[-_MAX_RESULT_SIZE:]
256 return stdout
257 if self._state == _RUNNING and self._process.poll() is not None:
258 elapsed = time.time() - self._start
259 self.result.elapsed_time = elapsed
260 if self._process.returncode != 0:
261 if self._retries < self._spec.flake_retries:
262 message('FLAKE', '%s [ret=%d, pid=%d]' % (
263 self._spec.shortname, self._process.returncode, self._process.pid),
264 stdout(), do_newline=True)
265 self._retries += 1
266 self.result.num_failures += 1
267 self.result.retries = self._timeout_retries + self._retries
268 self.start()
269 else:
270 self._state = _FAILURE
271 if not self._suppress_failure_message:
272 message('FAILED', '%s [ret=%d, pid=%d]' % (
273 self._spec.shortname, self._process.returncode, self._process.pi d),
274 stdout(), do_newline=True)
275 self.result.state = 'FAILED'
276 self.result.num_failures += 1
277 self.result.returncode = self._process.returncode
278 else:
279 self._state = _SUCCESS
280 measurement = ''
281 if measure_cpu_costs:
282 m = re.search(r'real ([0-9.]+)\nuser ([0-9.]+)\nsys ([0-9.]+)', stdout ())
283 real = float(m.group(1))
284 user = float(m.group(2))
285 sys = float(m.group(3))
286 if real > 0.5:
287 cores = (user + sys) / real
288 measurement = '; cpu_cost=%.01f; estimated=%.01f' % (cores, self._sp ec.cpu_cost)
289 message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % (
290 self._spec.shortname, elapsed, self._retries, self._timeout_ retries, measurement),
291 do_newline=self._newline_on_success or self._travis)
292 self.result.state = 'PASSED'
293 if self._bin_hash:
294 update_cache.finished(self._spec.identity(), self._bin_hash)
295 elif (self._state == _RUNNING and
296 self._spec.timeout_seconds is not None and
297 time.time() - self._start > self._spec.timeout_seconds):
298 if self._timeout_retries < self._spec.timeout_retries:
299 message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._pr ocess.pid), stdout(), do_newline=True)
300 self._timeout_retries += 1
301 self.result.num_failures += 1
302 self.result.retries = self._timeout_retries + self._retries
303 if self._spec.kill_handler:
304 self._spec.kill_handler(self)
305 self._process.terminate()
306 self.start()
307 else:
308 message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process. pid), stdout(), do_newline=True)
309 self.kill()
310 self.result.state = 'TIMEOUT'
311 self.result.num_failures += 1
312 return self._state
313
314 def kill(self):
315 if self._state == _RUNNING:
316 self._state = _KILLED
317 if self._spec.kill_handler:
318 self._spec.kill_handler(self)
319 self._process.terminate()
320
321 def suppress_failure_message(self):
322 self._suppress_failure_message = True
323
324
325 class Jobset(object):
326 """Manages one run of jobs."""
327
328 def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
329 stop_on_failure, add_env, cache):
330 self._running = set()
331 self._check_cancelled = check_cancelled
332 self._cancelled = False
333 self._failures = 0
334 self._completed = 0
335 self._maxjobs = maxjobs
336 self._newline_on_success = newline_on_success
337 self._travis = travis
338 self._cache = cache
339 self._stop_on_failure = stop_on_failure
340 self._hashes = {}
341 self._add_env = add_env
342 self.resultset = {}
343 self._remaining = None
344
345 def set_remaining(self, remaining):
346 self._remaining = remaining
347
348 def get_num_failures(self):
349 return self._failures
350
351 def cpu_cost(self):
352 c = 0
353 for job in self._running:
354 c += job._spec.cpu_cost
355 return c
356
357 def start(self, spec):
358 """Start a job. Return True on success, False on failure."""
359 while True:
360 if self.cancelled(): return False
361 current_cpu_cost = self.cpu_cost()
362 if current_cpu_cost == 0: break
363 if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
364 self.reap()
365 if self.cancelled(): return False
366 if spec.hash_targets:
367 if spec.identity() in self._hashes:
368 bin_hash = self._hashes[spec.identity()]
369 else:
370 bin_hash = hashlib.sha1()
371 for fn in spec.hash_targets:
372 with open(which(fn)) as f:
373 bin_hash.update(f.read())
374 bin_hash = bin_hash.hexdigest()
375 self._hashes[spec.identity()] = bin_hash
376 should_run = self._cache.should_run(spec.identity(), bin_hash)
377 else:
378 bin_hash = None
379 should_run = True
380 if should_run:
381 job = Job(spec,
382 bin_hash,
383 self._newline_on_success,
384 self._travis,
385 self._add_env)
386 self._running.add(job)
387 if not self.resultset.has_key(job.GetSpec().shortname):
388 self.resultset[job.GetSpec().shortname] = []
389 return True
390
391 def reap(self):
392 """Collect the dead jobs."""
393 while self._running:
394 dead = set()
395 for job in self._running:
396 st = job.state(self._cache)
397 if st == _RUNNING: continue
398 if st == _FAILURE or st == _KILLED:
399 self._failures += 1
400 if self._stop_on_failure:
401 self._cancelled = True
402 for job in self._running:
403 job.kill()
404 dead.add(job)
405 break
406 for job in dead:
407 self._completed += 1
408 self.resultset[job.GetSpec().shortname].append(job.result)
409 self._running.remove(job)
410 if dead: return
411 if (not self._travis):
412 rstr = '' if self._remaining is None else '%d queued, ' % self._remainin g
413 message('WAITING', '%s%d jobs running, %d complete, %d failed' % (
414 rstr, len(self._running), self._completed, self._failures))
415 if platform_string() == 'windows':
416 time.sleep(0.1)
417 else:
418 global have_alarm
419 if not have_alarm:
420 have_alarm = True
421 signal.alarm(10)
422 signal.pause()
423
424 def cancelled(self):
425 """Poll for cancellation."""
426 if self._cancelled: return True
427 if not self._check_cancelled(): return False
428 for job in self._running:
429 job.kill()
430 self._cancelled = True
431 return True
432
433 def finish(self):
434 while self._running:
435 if self.cancelled(): pass # poll cancellation
436 self.reap()
437 return not self.cancelled() and self._failures == 0
438
439
440 def _never_cancelled():
441 return False
442
443
444 # cache class that caches nothing
445 class NoCache(object):
446 def should_run(self, cmdline, bin_hash):
447 return True
448
449 def finished(self, cmdline, bin_hash):
450 pass
451
452
453 def tag_remaining(xs):
454 staging = []
455 for x in xs:
456 staging.append(x)
457 if len(staging) > 1000:
458 yield (staging.pop(0), None)
459 n = len(staging)
460 for i, x in enumerate(staging):
461 yield (x, n - i - 1)
462
463
464 def run(cmdlines,
465 check_cancelled=_never_cancelled,
466 maxjobs=None,
467 newline_on_success=False,
468 travis=False,
469 infinite_runs=False,
470 stop_on_failure=False,
471 cache=None,
472 add_env={}):
473 js = Jobset(check_cancelled,
474 maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
475 newline_on_success, travis, stop_on_failure, add_env,
476 cache if cache is not None else NoCache())
477 for cmdline, remaining in tag_remaining(cmdlines):
478 if not js.start(cmdline):
479 break
480 if remaining is not None:
481 js.set_remaining(remaining)
482 js.finish()
483 return js.get_num_failures(), js.resultset
OLDNEW
« no previous file with comments | « third_party/grpc/tools/run_tests/interop_html_report.template ('k') | third_party/grpc/tools/run_tests/package_targets.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698