Index: third_party/grpc/tools/run_tests/run_stress_tests.py |
diff --git a/third_party/grpc/tools/run_tests/run_stress_tests.py b/third_party/grpc/tools/run_tests/run_stress_tests.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..d1faf7d9641c012fda9e3b16488eba931977783e |
--- /dev/null |
+++ b/third_party/grpc/tools/run_tests/run_stress_tests.py |
@@ -0,0 +1,328 @@ |
+#!/usr/bin/env python2.7 |
+# Copyright 2015-2016, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+"""Run stress test in C++""" |
+ |
+import argparse |
+import atexit |
+import dockerjob |
+import itertools |
+import jobset |
+import json |
+import multiprocessing |
+import os |
+import re |
+import subprocess |
+import sys |
+import tempfile |
+import time |
+import uuid |
+ |
+# Docker doesn't clean up after itself, so we do it on exit. |
+atexit.register(lambda: subprocess.call(['stty', 'echo'])) |
+ |
+ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..')) |
+os.chdir(ROOT) |
+ |
+_DEFAULT_SERVER_PORT = 8080 |
+_DEFAULT_METRICS_PORT = 8081 |
+_DEFAULT_TEST_CASES = 'empty_unary:20,large_unary:20,client_streaming:20,server_streaming:20,empty_stream:20' |
+_DEFAULT_NUM_CHANNELS_PER_SERVER = 5 |
+_DEFAULT_NUM_STUBS_PER_CHANNEL = 10 |
+ |
+# 15 mins default |
+_DEFAULT_TEST_DURATION_SECS = 900 |
+ |
+class CXXLanguage: |
+ |
+ def __init__(self): |
+ self.client_cwd = None |
+ self.server_cwd = None |
+ self.safename = 'cxx' |
+ |
+ def client_cmd(self, args): |
+ return ['bins/opt/stress_test'] + args |
+ |
+ def server_cmd(self, args): |
+ return ['bins/opt/interop_server'] + args |
+ |
+ def global_env(self): |
+ return {} |
+ |
+ def __str__(self): |
+ return 'c++' |
+ |
+ |
+_LANGUAGES = {'c++': CXXLanguage(),} |
+ |
+# languages supported as cloud_to_cloud servers |
+_SERVERS = ['c++'] |
+ |
+DOCKER_WORKDIR_ROOT = '/var/local/git/grpc' |
+ |
+ |
+def docker_run_cmdline(cmdline, image, docker_args=[], cwd=None, environ=None): |
+ """Wraps given cmdline array to create 'docker run' cmdline from it.""" |
+ docker_cmdline = ['docker', 'run', '-i', '--rm=true'] |
+ |
+ # turn environ into -e docker args |
+ if environ: |
+ for k, v in environ.iteritems(): |
+ docker_cmdline += ['-e', '%s=%s' % (k, v)] |
+ |
+ # set working directory |
+ workdir = DOCKER_WORKDIR_ROOT |
+ if cwd: |
+ workdir = os.path.join(workdir, cwd) |
+ docker_cmdline += ['-w', workdir] |
+ |
+ docker_cmdline += docker_args + [image] + cmdline |
+ return docker_cmdline |
+ |
+ |
+def bash_login_cmdline(cmdline): |
+ """Creates bash -l -c cmdline from args list.""" |
+ # Use login shell: |
+ # * rvm and nvm require it |
+ # * makes error messages clearer if executables are missing |
+ return ['bash', '-l', '-c', ' '.join(cmdline)] |
+ |
+ |
+def _job_kill_handler(job): |
+ if job._spec.container_name: |
+ dockerjob.docker_kill(job._spec.container_name) |
+ # When the job times out and we decide to kill it, |
+ # we need to wait a before restarting the job |
+ # to prevent "container name already in use" error. |
+ # TODO(jtattermusch): figure out a cleaner way to to this. |
+ time.sleep(2) |
+ |
+ |
+def cloud_to_cloud_jobspec(language, |
+ test_cases, |
+ server_addresses, |
+ test_duration_secs, |
+ num_channels_per_server, |
+ num_stubs_per_channel, |
+ metrics_port, |
+ docker_image=None): |
+ """Creates jobspec for cloud-to-cloud interop test""" |
+ cmdline = bash_login_cmdline(language.client_cmd([ |
+ '--test_cases=%s' % test_cases, '--server_addresses=%s' % |
+ server_addresses, '--test_duration_secs=%s' % test_duration_secs, |
+ '--num_stubs_per_channel=%s' % num_stubs_per_channel, |
+ '--num_channels_per_server=%s' % num_channels_per_server, |
+ '--metrics_port=%s' % metrics_port |
+ ])) |
+ print cmdline |
+ cwd = language.client_cwd |
+ environ = language.global_env() |
+ if docker_image: |
+ container_name = dockerjob.random_name('interop_client_%s' % |
+ language.safename) |
+ cmdline = docker_run_cmdline( |
+ cmdline, |
+ image=docker_image, |
+ environ=environ, |
+ cwd=cwd, |
+ docker_args=['--net=host', '--name', container_name]) |
+ cwd = None |
+ |
+ test_job = jobset.JobSpec(cmdline=cmdline, |
+ cwd=cwd, |
+ environ=environ, |
+ shortname='cloud_to_cloud:%s:%s_server:stress_test' % ( |
+ language, server_name), |
+ timeout_seconds=test_duration_secs * 2, |
+ flake_retries=0, |
+ timeout_retries=0, |
+ kill_handler=_job_kill_handler) |
+ test_job.container_name = container_name |
+ return test_job |
+ |
+ |
+def server_jobspec(language, docker_image, test_duration_secs): |
+ """Create jobspec for running a server""" |
+ container_name = dockerjob.random_name('interop_server_%s' % |
+ language.safename) |
+ cmdline = bash_login_cmdline(language.server_cmd(['--port=%s' % |
+ _DEFAULT_SERVER_PORT])) |
+ environ = language.global_env() |
+ docker_cmdline = docker_run_cmdline( |
+ cmdline, |
+ image=docker_image, |
+ cwd=language.server_cwd, |
+ environ=environ, |
+ docker_args=['-p', str(_DEFAULT_SERVER_PORT), '--name', container_name]) |
+ |
+ server_job = jobset.JobSpec(cmdline=docker_cmdline, |
+ environ=environ, |
+ shortname='interop_server_%s' % language, |
+ timeout_seconds=test_duration_secs * 3) |
+ server_job.container_name = container_name |
+ return server_job |
+ |
+ |
+def build_interop_stress_image_jobspec(language, tag=None): |
+ """Creates jobspec for building stress test docker image for a language""" |
+ if not tag: |
+ tag = 'grpc_interop_stress_%s:%s' % (language.safename, uuid.uuid4()) |
+ env = {'INTEROP_IMAGE': tag, |
+ 'BASE_NAME': 'grpc_interop_stress_%s' % language.safename} |
+ build_job = jobset.JobSpec(cmdline=['tools/jenkins/build_interop_stress_image.sh'], |
+ environ=env, |
+ shortname='build_docker_%s' % (language), |
+ timeout_seconds=30 * 60) |
+ build_job.tag = tag |
+ return build_job |
+ |
+argp = argparse.ArgumentParser(description='Run stress tests.') |
+argp.add_argument('-l', |
+ '--language', |
+ choices=['all'] + sorted(_LANGUAGES), |
+ nargs='+', |
+ default=['all'], |
+ help='Clients to run.') |
+argp.add_argument('-j', '--jobs', default=multiprocessing.cpu_count(), type=int) |
+argp.add_argument( |
+ '-s', |
+ '--server', |
+ choices=['all'] + sorted(_SERVERS), |
+ action='append', |
+ help='Run cloud_to_cloud servers in a separate docker ' + 'image.', |
+ default=[]) |
+argp.add_argument( |
+ '--override_server', |
+ action='append', |
+ type=lambda kv: kv.split('='), |
+ help= |
+ 'Use servername=HOST:PORT to explicitly specify a server. E.g. ' |
+ 'csharp=localhost:50000', |
+ default=[]) |
+argp.add_argument('--test_duration_secs', |
+ help='The duration of the test in seconds', |
+ default=_DEFAULT_TEST_DURATION_SECS) |
+ |
+args = argp.parse_args() |
+ |
+servers = set( |
+ s |
+ for s in itertools.chain.from_iterable(_SERVERS if x == 'all' else [x] |
+ for x in args.server)) |
+ |
+languages = set(_LANGUAGES[l] |
+ for l in itertools.chain.from_iterable(_LANGUAGES.iterkeys( |
+ ) if x == 'all' else [x] for x in args.language)) |
+ |
+docker_images = {} |
+# languages for which to build docker images |
+languages_to_build = set( |
+ _LANGUAGES[k] |
+ for k in set([str(l) for l in languages] + [s for s in servers])) |
+build_jobs = [] |
+for l in languages_to_build: |
+ job = build_interop_stress_image_jobspec(l) |
+ docker_images[str(l)] = job.tag |
+ build_jobs.append(job) |
+ |
+if build_jobs: |
+ jobset.message('START', 'Building interop docker images.', do_newline=True) |
+ num_failures, _ = jobset.run(build_jobs, |
+ newline_on_success=True, |
+ maxjobs=args.jobs) |
+ if num_failures == 0: |
+ jobset.message('SUCCESS', |
+ 'All docker images built successfully.', |
+ do_newline=True) |
+ else: |
+ jobset.message('FAILED', |
+ 'Failed to build interop docker images.', |
+ do_newline=True) |
+ for image in docker_images.itervalues(): |
+ dockerjob.remove_image(image, skip_nonexistent=True) |
+ sys.exit(1) |
+ |
+# Start interop servers. |
+server_jobs = {} |
+server_addresses = {} |
+try: |
+ for s in servers: |
+ lang = str(s) |
+ spec = server_jobspec(_LANGUAGES[lang], docker_images.get(lang), args.test_duration_secs) |
+ job = dockerjob.DockerJob(spec) |
+ server_jobs[lang] = job |
+ server_addresses[lang] = ('localhost', |
+ job.mapped_port(_DEFAULT_SERVER_PORT)) |
+ |
+ jobs = [] |
+ |
+ for server in args.override_server: |
+ server_name = server[0] |
+ (server_host, server_port) = server[1].split(':') |
+ server_addresses[server_name] = (server_host, server_port) |
+ |
+ for server_name, server_address in server_addresses.iteritems(): |
+ (server_host, server_port) = server_address |
+ for language in languages: |
+ test_job = cloud_to_cloud_jobspec( |
+ language, |
+ _DEFAULT_TEST_CASES, |
+ ('%s:%s' % (server_host, server_port)), |
+ args.test_duration_secs, |
+ _DEFAULT_NUM_CHANNELS_PER_SERVER, |
+ _DEFAULT_NUM_STUBS_PER_CHANNEL, |
+ _DEFAULT_METRICS_PORT, |
+ docker_image=docker_images.get(str(language))) |
+ jobs.append(test_job) |
+ |
+ if not jobs: |
+ print 'No jobs to run.' |
+ for image in docker_images.itervalues(): |
+ dockerjob.remove_image(image, skip_nonexistent=True) |
+ sys.exit(1) |
+ |
+ num_failures, resultset = jobset.run(jobs, |
+ newline_on_success=True, |
+ maxjobs=args.jobs) |
+ if num_failures: |
+ jobset.message('FAILED', 'Some tests failed', do_newline=True) |
+ else: |
+ jobset.message('SUCCESS', 'All tests passed', do_newline=True) |
+ |
+finally: |
+ # Check if servers are still running. |
+ for server, job in server_jobs.iteritems(): |
+ if not job.is_running(): |
+ print 'Server "%s" has exited prematurely.' % server |
+ |
+ dockerjob.finish_jobs([j for j in server_jobs.itervalues()]) |
+ |
+ for image in docker_images.itervalues(): |
+ print 'Removing docker image %s' % image |
+ dockerjob.remove_image(image) |