Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Unified Diff: third_party/grpc/tools/run_tests/jobset.py

Issue 1932353002: Initial checkin of gRPC to third_party/ Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/grpc/tools/run_tests/jobset.py
diff --git a/third_party/grpc/tools/run_tests/jobset.py b/third_party/grpc/tools/run_tests/jobset.py
new file mode 100644
index 0000000000000000000000000000000000000000..a3b246dc084b295d17742d1e2b81d6caa4070a46
--- /dev/null
+++ b/third_party/grpc/tools/run_tests/jobset.py
@@ -0,0 +1,483 @@
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Run a group of subprocesses and then finish."""
+
+import hashlib
+import multiprocessing
+import os
+import platform
+import re
+import signal
+import subprocess
+import sys
+import tempfile
+import time
+
+
+# cpu cost measurement
+measure_cpu_costs = False
+
+
+_DEFAULT_MAX_JOBS = 16 * multiprocessing.cpu_count()
+_MAX_RESULT_SIZE = 8192
+
+def platform_string():
+ if platform.system() == 'Windows':
+ return 'windows'
+ elif platform.system()[:7] == 'MSYS_NT':
+ return 'windows'
+ elif platform.system() == 'Darwin':
+ return 'mac'
+ elif platform.system() == 'Linux':
+ return 'linux'
+ else:
+ return 'posix'
+
+
+# setup a signal handler so that signal.pause registers 'something'
+# when a child finishes
+# not using futures and threading to avoid a dependency on subprocess32
+if platform_string() == 'windows':
+ pass
+else:
+ have_alarm = False
+ def alarm_handler(unused_signum, unused_frame):
+ global have_alarm
+ have_alarm = False
+
+ signal.signal(signal.SIGCHLD, lambda unused_signum, unused_frame: None)
+ signal.signal(signal.SIGALRM, alarm_handler)
+
+
+_SUCCESS = object()
+_FAILURE = object()
+_RUNNING = object()
+_KILLED = object()
+
+
+_COLORS = {
+ 'red': [ 31, 0 ],
+ 'green': [ 32, 0 ],
+ 'yellow': [ 33, 0 ],
+ 'lightgray': [ 37, 0],
+ 'gray': [ 30, 1 ],
+ 'purple': [ 35, 0 ],
+ }
+
+
+_BEGINNING_OF_LINE = '\x1b[0G'
+_CLEAR_LINE = '\x1b[2K'
+
+
+_TAG_COLOR = {
+ 'FAILED': 'red',
+ 'FLAKE': 'purple',
+ 'TIMEOUT_FLAKE': 'purple',
+ 'WARNING': 'yellow',
+ 'TIMEOUT': 'red',
+ 'PASSED': 'green',
+ 'START': 'gray',
+ 'WAITING': 'yellow',
+ 'SUCCESS': 'green',
+ 'IDLE': 'gray',
+ }
+
+
+def message(tag, msg, explanatory_text=None, do_newline=False):
+ if message.old_tag == tag and message.old_msg == msg and not explanatory_text:
+ return
+ message.old_tag = tag
+ message.old_msg = msg
+ try:
+ if platform_string() == 'windows' or not sys.stdout.isatty():
+ if explanatory_text:
+ print explanatory_text
+ print '%s: %s' % (tag, msg)
+ return
+ sys.stdout.write('%s%s%s\x1b[%d;%dm%s\x1b[0m: %s%s' % (
+ _BEGINNING_OF_LINE,
+ _CLEAR_LINE,
+ '\n%s' % explanatory_text if explanatory_text is not None else '',
+ _COLORS[_TAG_COLOR[tag]][1],
+ _COLORS[_TAG_COLOR[tag]][0],
+ tag,
+ msg,
+ '\n' if do_newline or explanatory_text is not None else ''))
+ sys.stdout.flush()
+ except:
+ pass
+
+message.old_tag = ''
+message.old_msg = ''
+
+def which(filename):
+ if '/' in filename:
+ return filename
+ for path in os.environ['PATH'].split(os.pathsep):
+ if os.path.exists(os.path.join(path, filename)):
+ return os.path.join(path, filename)
+ raise Exception('%s not found' % filename)
+
+
+class JobSpec(object):
+ """Specifies what to run for a job."""
+
+ def __init__(self, cmdline, shortname=None, environ=None, hash_targets=None,
+ cwd=None, shell=False, timeout_seconds=5*60, flake_retries=0,
+ timeout_retries=0, kill_handler=None, cpu_cost=1.0):
+ """
+ Arguments:
+ cmdline: a list of arguments to pass as the command line
+ environ: a dictionary of environment variables to set in the child process
+ hash_targets: which files to include in the hash representing the jobs version
+ (or empty, indicating the job should not be hashed)
+ kill_handler: a handler that will be called whenever job.kill() is invoked
+ cpu_cost: number of cores per second this job needs
+ """
+ if environ is None:
+ environ = {}
+ if hash_targets is None:
+ hash_targets = []
+ self.cmdline = cmdline
+ self.environ = environ
+ self.shortname = cmdline[0] if shortname is None else shortname
+ self.hash_targets = hash_targets or []
+ self.cwd = cwd
+ self.shell = shell
+ self.timeout_seconds = timeout_seconds
+ self.flake_retries = flake_retries
+ self.timeout_retries = timeout_retries
+ self.kill_handler = kill_handler
+ self.cpu_cost = cpu_cost
+
+ def identity(self):
+ return '%r %r %r' % (self.cmdline, self.environ, self.hash_targets)
+
+ def __hash__(self):
+ return hash(self.identity())
+
+ def __cmp__(self, other):
+ return self.identity() == other.identity()
+
+ def __repr__(self):
+ return 'JobSpec(shortname=%s, cmdline=%s)' % (self.shortname, self.cmdline)
+
+
+class JobResult(object):
+ def __init__(self):
+ self.state = 'UNKNOWN'
+ self.returncode = -1
+ self.elapsed_time = 0
+ self.num_failures = 0
+ self.retries = 0
+ self.message = ''
+
+
+class Job(object):
+ """Manages one job."""
+
+ def __init__(self, spec, bin_hash, newline_on_success, travis, add_env):
+ self._spec = spec
+ self._bin_hash = bin_hash
+ self._newline_on_success = newline_on_success
+ self._travis = travis
+ self._add_env = add_env.copy()
+ self._retries = 0
+ self._timeout_retries = 0
+ self._suppress_failure_message = False
+ message('START', spec.shortname, do_newline=self._travis)
+ self.result = JobResult()
+ self.start()
+
+ def GetSpec(self):
+ return self._spec
+
+ def start(self):
+ self._tempfile = tempfile.TemporaryFile()
+ env = dict(os.environ)
+ env.update(self._spec.environ)
+ env.update(self._add_env)
+ self._start = time.time()
+ cmdline = self._spec.cmdline
+ if measure_cpu_costs:
+ cmdline = ['time', '--portability'] + cmdline
+ try_start = lambda: subprocess.Popen(args=cmdline,
+ stderr=subprocess.STDOUT,
+ stdout=self._tempfile,
+ cwd=self._spec.cwd,
+ shell=self._spec.shell,
+ env=env)
+ delay = 0.3
+ for i in range(0, 4):
+ try:
+ self._process = try_start()
+ break
+ except OSError:
+ message('WARNING', 'Failed to start %s, retrying in %f seconds' % (self._spec.shortname, delay))
+ time.sleep(delay)
+ delay *= 2
+ else:
+ self._process = try_start()
+ self._state = _RUNNING
+
+ def state(self, update_cache):
+ """Poll current state of the job. Prints messages at completion."""
+ def stdout(self=self):
+ self._tempfile.seek(0)
+ stdout = self._tempfile.read()
+ self.result.message = stdout[-_MAX_RESULT_SIZE:]
+ return stdout
+ if self._state == _RUNNING and self._process.poll() is not None:
+ elapsed = time.time() - self._start
+ self.result.elapsed_time = elapsed
+ if self._process.returncode != 0:
+ if self._retries < self._spec.flake_retries:
+ message('FLAKE', '%s [ret=%d, pid=%d]' % (
+ self._spec.shortname, self._process.returncode, self._process.pid),
+ stdout(), do_newline=True)
+ self._retries += 1
+ self.result.num_failures += 1
+ self.result.retries = self._timeout_retries + self._retries
+ self.start()
+ else:
+ self._state = _FAILURE
+ if not self._suppress_failure_message:
+ message('FAILED', '%s [ret=%d, pid=%d]' % (
+ self._spec.shortname, self._process.returncode, self._process.pid),
+ stdout(), do_newline=True)
+ self.result.state = 'FAILED'
+ self.result.num_failures += 1
+ self.result.returncode = self._process.returncode
+ else:
+ self._state = _SUCCESS
+ measurement = ''
+ if measure_cpu_costs:
+ m = re.search(r'real ([0-9.]+)\nuser ([0-9.]+)\nsys ([0-9.]+)', stdout())
+ real = float(m.group(1))
+ user = float(m.group(2))
+ sys = float(m.group(3))
+ if real > 0.5:
+ cores = (user + sys) / real
+ measurement = '; cpu_cost=%.01f; estimated=%.01f' % (cores, self._spec.cpu_cost)
+ message('PASSED', '%s [time=%.1fsec; retries=%d:%d%s]' % (
+ self._spec.shortname, elapsed, self._retries, self._timeout_retries, measurement),
+ do_newline=self._newline_on_success or self._travis)
+ self.result.state = 'PASSED'
+ if self._bin_hash:
+ update_cache.finished(self._spec.identity(), self._bin_hash)
+ elif (self._state == _RUNNING and
+ self._spec.timeout_seconds is not None and
+ time.time() - self._start > self._spec.timeout_seconds):
+ if self._timeout_retries < self._spec.timeout_retries:
+ message('TIMEOUT_FLAKE', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
+ self._timeout_retries += 1
+ self.result.num_failures += 1
+ self.result.retries = self._timeout_retries + self._retries
+ if self._spec.kill_handler:
+ self._spec.kill_handler(self)
+ self._process.terminate()
+ self.start()
+ else:
+ message('TIMEOUT', '%s [pid=%d]' % (self._spec.shortname, self._process.pid), stdout(), do_newline=True)
+ self.kill()
+ self.result.state = 'TIMEOUT'
+ self.result.num_failures += 1
+ return self._state
+
+ def kill(self):
+ if self._state == _RUNNING:
+ self._state = _KILLED
+ if self._spec.kill_handler:
+ self._spec.kill_handler(self)
+ self._process.terminate()
+
+ def suppress_failure_message(self):
+ self._suppress_failure_message = True
+
+
+class Jobset(object):
+ """Manages one run of jobs."""
+
+ def __init__(self, check_cancelled, maxjobs, newline_on_success, travis,
+ stop_on_failure, add_env, cache):
+ self._running = set()
+ self._check_cancelled = check_cancelled
+ self._cancelled = False
+ self._failures = 0
+ self._completed = 0
+ self._maxjobs = maxjobs
+ self._newline_on_success = newline_on_success
+ self._travis = travis
+ self._cache = cache
+ self._stop_on_failure = stop_on_failure
+ self._hashes = {}
+ self._add_env = add_env
+ self.resultset = {}
+ self._remaining = None
+
+ def set_remaining(self, remaining):
+ self._remaining = remaining
+
+ def get_num_failures(self):
+ return self._failures
+
+ def cpu_cost(self):
+ c = 0
+ for job in self._running:
+ c += job._spec.cpu_cost
+ return c
+
+ def start(self, spec):
+ """Start a job. Return True on success, False on failure."""
+ while True:
+ if self.cancelled(): return False
+ current_cpu_cost = self.cpu_cost()
+ if current_cpu_cost == 0: break
+ if current_cpu_cost + spec.cpu_cost <= self._maxjobs: break
+ self.reap()
+ if self.cancelled(): return False
+ if spec.hash_targets:
+ if spec.identity() in self._hashes:
+ bin_hash = self._hashes[spec.identity()]
+ else:
+ bin_hash = hashlib.sha1()
+ for fn in spec.hash_targets:
+ with open(which(fn)) as f:
+ bin_hash.update(f.read())
+ bin_hash = bin_hash.hexdigest()
+ self._hashes[spec.identity()] = bin_hash
+ should_run = self._cache.should_run(spec.identity(), bin_hash)
+ else:
+ bin_hash = None
+ should_run = True
+ if should_run:
+ job = Job(spec,
+ bin_hash,
+ self._newline_on_success,
+ self._travis,
+ self._add_env)
+ self._running.add(job)
+ if not self.resultset.has_key(job.GetSpec().shortname):
+ self.resultset[job.GetSpec().shortname] = []
+ return True
+
+ def reap(self):
+ """Collect the dead jobs."""
+ while self._running:
+ dead = set()
+ for job in self._running:
+ st = job.state(self._cache)
+ if st == _RUNNING: continue
+ if st == _FAILURE or st == _KILLED:
+ self._failures += 1
+ if self._stop_on_failure:
+ self._cancelled = True
+ for job in self._running:
+ job.kill()
+ dead.add(job)
+ break
+ for job in dead:
+ self._completed += 1
+ self.resultset[job.GetSpec().shortname].append(job.result)
+ self._running.remove(job)
+ if dead: return
+ if (not self._travis):
+ rstr = '' if self._remaining is None else '%d queued, ' % self._remaining
+ message('WAITING', '%s%d jobs running, %d complete, %d failed' % (
+ rstr, len(self._running), self._completed, self._failures))
+ if platform_string() == 'windows':
+ time.sleep(0.1)
+ else:
+ global have_alarm
+ if not have_alarm:
+ have_alarm = True
+ signal.alarm(10)
+ signal.pause()
+
+ def cancelled(self):
+ """Poll for cancellation."""
+ if self._cancelled: return True
+ if not self._check_cancelled(): return False
+ for job in self._running:
+ job.kill()
+ self._cancelled = True
+ return True
+
+ def finish(self):
+ while self._running:
+ if self.cancelled(): pass # poll cancellation
+ self.reap()
+ return not self.cancelled() and self._failures == 0
+
+
+def _never_cancelled():
+ return False
+
+
+# cache class that caches nothing
+class NoCache(object):
+ def should_run(self, cmdline, bin_hash):
+ return True
+
+ def finished(self, cmdline, bin_hash):
+ pass
+
+
+def tag_remaining(xs):
+ staging = []
+ for x in xs:
+ staging.append(x)
+ if len(staging) > 1000:
+ yield (staging.pop(0), None)
+ n = len(staging)
+ for i, x in enumerate(staging):
+ yield (x, n - i - 1)
+
+
+def run(cmdlines,
+ check_cancelled=_never_cancelled,
+ maxjobs=None,
+ newline_on_success=False,
+ travis=False,
+ infinite_runs=False,
+ stop_on_failure=False,
+ cache=None,
+ add_env={}):
+ js = Jobset(check_cancelled,
+ maxjobs if maxjobs is not None else _DEFAULT_MAX_JOBS,
+ newline_on_success, travis, stop_on_failure, add_env,
+ cache if cache is not None else NoCache())
+ for cmdline, remaining in tag_remaining(cmdlines):
+ if not js.start(cmdline):
+ break
+ if remaining is not None:
+ js.set_remaining(remaining)
+ js.finish()
+ return js.get_num_failures(), js.resultset
« no previous file with comments | « third_party/grpc/tools/run_tests/interop_html_report.template ('k') | third_party/grpc/tools/run_tests/package_targets.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698