OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python2.7 |
| 2 # Copyright 2015-2016, Google Inc. |
| 3 # All rights reserved. |
| 4 # |
| 5 # Redistribution and use in source and binary forms, with or without |
| 6 # modification, are permitted provided that the following conditions are |
| 7 # met: |
| 8 # |
| 9 # * Redistributions of source code must retain the above copyright |
| 10 # notice, this list of conditions and the following disclaimer. |
| 11 # * Redistributions in binary form must reproduce the above |
| 12 # copyright notice, this list of conditions and the following disclaimer |
| 13 # in the documentation and/or other materials provided with the |
| 14 # distribution. |
| 15 # * Neither the name of Google Inc. nor the names of its |
| 16 # contributors may be used to endorse or promote products derived from |
| 17 # this software without specific prior written permission. |
| 18 # |
| 19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 20 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 21 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 22 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 23 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 24 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 25 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 26 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 27 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 28 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 30 |
| 31 import datetime |
| 32 import os |
| 33 import re |
| 34 import select |
| 35 import subprocess |
| 36 import sys |
| 37 import time |
| 38 |
| 39 from stress_test_utils import EventType |
| 40 from stress_test_utils import BigQueryHelper |
| 41 |
| 42 |
| 43 # TODO (sree): Write a python grpc client to directly query the metrics instead |
| 44 # of calling metrics_client |
| 45 def _get_qps(metrics_cmd): |
| 46 qps = 0 |
| 47 try: |
| 48 # Note: gpr_log() writes even non-error messages to stderr stream. So it is |
| 49 # important that we set stderr=subprocess.STDOUT |
| 50 p = subprocess.Popen(args=metrics_cmd, |
| 51 stdout=subprocess.PIPE, |
| 52 stderr=subprocess.STDOUT) |
| 53 retcode = p.wait() |
| 54 (out_str, err_str) = p.communicate() |
| 55 if retcode != 0: |
| 56 print 'Error in reading metrics information' |
| 57 print 'Output: ', out_str |
| 58 else: |
| 59 # The overall qps is printed at the end of the line |
| 60 m = re.search('\d+$', out_str) |
| 61 qps = int(m.group()) if m else 0 |
| 62 except Exception as ex: |
| 63 print 'Exception while reading metrics information: ' + str(ex) |
| 64 return qps |
| 65 |
| 66 |
| 67 def run_client(): |
| 68 """This is a wrapper around the stress test client and performs the following: |
| 69 1) Create the following two tables in Big Query: |
| 70 (i) Summary table: To record events like the test started, completed |
| 71 successfully or failed |
| 72 (ii) Qps table: To periodically record the QPS sent by this client |
| 73 2) Start the stress test client and add a row in the Big Query summary |
| 74 table |
| 75 3) Once every few seconds (as specificed by the poll_interval_secs) poll |
| 76 the status of the stress test client process and perform the |
| 77 following: |
| 78 3.1) If the process is still running, get the current qps by invoking |
| 79 the metrics client program and add a row in the Big Query |
| 80 Qps table. Sleep for a duration specified by poll_interval_secs |
| 81 3.2) If the process exited successfully, add a row in the Big Query |
| 82 Summary table and exit |
| 83 3.3) If the process failed, add a row in Big Query summary table and |
| 84 wait forever. |
| 85 NOTE: This script typically runs inside a GKE pod which means |
| 86 that the pod gets destroyed when the script exits. However, in |
| 87 case the stress test client fails, we would not want the pod to |
| 88 be destroyed (since we might want to connect to the pod for |
| 89 examining logs). This is the reason why the script waits forever |
| 90 in case of failures |
| 91 """ |
| 92 env = dict(os.environ) |
| 93 image_type = env['STRESS_TEST_IMAGE_TYPE'] |
| 94 image_name = env['STRESS_TEST_IMAGE'] |
| 95 args_str = env['STRESS_TEST_ARGS_STR'] |
| 96 metrics_client_image = env['METRICS_CLIENT_IMAGE'] |
| 97 metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR'] |
| 98 run_id = env['RUN_ID'] |
| 99 pod_name = env['POD_NAME'] |
| 100 logfile_name = env.get('LOGFILE_NAME') |
| 101 poll_interval_secs = float(env['POLL_INTERVAL_SECS']) |
| 102 project_id = env['GCP_PROJECT_ID'] |
| 103 dataset_id = env['DATASET_ID'] |
| 104 summary_table_id = env['SUMMARY_TABLE_ID'] |
| 105 qps_table_id = env['QPS_TABLE_ID'] |
| 106 |
| 107 bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id, |
| 108 dataset_id, summary_table_id, qps_table_id) |
| 109 bq_helper.initialize() |
| 110 |
| 111 # Create BigQuery Dataset and Tables: Summary Table and Metrics Table |
| 112 if not bq_helper.setup_tables(): |
| 113 print 'Error in creating BigQuery tables' |
| 114 return |
| 115 |
| 116 start_time = datetime.datetime.now() |
| 117 |
| 118 logfile = None |
| 119 details = 'Logging to stdout' |
| 120 if logfile_name is not None: |
| 121 print 'Opening logfile: %s ...' % logfile_name |
| 122 details = 'Logfile: %s' % logfile_name |
| 123 logfile = open(logfile_name, 'w') |
| 124 |
| 125 # Update status that the test is starting (in the status table) |
| 126 bq_helper.insert_summary_row(EventType.STARTING, details) |
| 127 |
| 128 metrics_cmd = [metrics_client_image |
| 129 ] + [x for x in metrics_client_args_str.split()] |
| 130 stress_cmd = [image_name] + [x for x in args_str.split()] |
| 131 |
| 132 print 'Launching process %s ...' % stress_cmd |
| 133 stress_p = subprocess.Popen(args=stress_cmd, |
| 134 stdout=logfile, |
| 135 stderr=subprocess.STDOUT) |
| 136 |
| 137 qps_history = [1, 1, 1] # Maintain the last 3 qps readings |
| 138 qps_history_idx = 0 # Index into the qps_history list |
| 139 |
| 140 is_error = False |
| 141 while True: |
| 142 # Check if stress_client is still running. If so, collect metrics and upload |
| 143 # to BigQuery status table |
| 144 if stress_p.poll() is not None: |
| 145 end_time = datetime.datetime.now().isoformat() |
| 146 event_type = EventType.SUCCESS |
| 147 details = 'End time: %s' % end_time |
| 148 if stress_p.returncode != 0: |
| 149 event_type = EventType.FAILURE |
| 150 details = 'Return code = %d. End time: %s' % (stress_p.returncode, |
| 151 end_time) |
| 152 is_error = True |
| 153 bq_helper.insert_summary_row(event_type, details) |
| 154 print details |
| 155 break |
| 156 |
| 157 # Stress client still running. Get metrics |
| 158 qps = _get_qps(metrics_cmd) |
| 159 qps_recorded_at = datetime.datetime.now().isoformat() |
| 160 print 'qps: %d at %s' % (qps, qps_recorded_at) |
| 161 |
| 162 # If QPS has been zero for the last 3 iterations, flag it as error and exit |
| 163 qps_history[qps_history_idx] = qps |
| 164 qps_history_idx = (qps_history_idx + 1) % len(qps_history) |
| 165 if sum(qps_history) == 0: |
| 166 details = 'QPS has been zero for the last %d seconds - as of : %s' % ( |
| 167 poll_interval_secs * 3, qps_recorded_at) |
| 168 is_error = True |
| 169 bq_helper.insert_summary_row(EventType.FAILURE, details) |
| 170 print details |
| 171 break |
| 172 |
| 173 # Upload qps metrics to BiqQuery |
| 174 bq_helper.insert_qps_row(qps, qps_recorded_at) |
| 175 |
| 176 time.sleep(poll_interval_secs) |
| 177 |
| 178 if is_error: |
| 179 print 'Waiting indefinitely..' |
| 180 select.select([], [], []) |
| 181 |
| 182 print 'Completed' |
| 183 return |
| 184 |
| 185 |
| 186 if __name__ == '__main__': |
| 187 run_client() |
OLD | NEW |