OLD | NEW |
(Empty) | |
| 1 #!/usr/bin/env python2.7 |
| 2 # Copyright 2015-2016, Google Inc. |
| 3 # All rights reserved. |
| 4 # |
| 5 # Redistribution and use in source and binary forms, with or without |
| 6 # modification, are permitted provided that the following conditions are |
| 7 # met: |
| 8 # |
| 9 # * Redistributions of source code must retain the above copyright |
| 10 # notice, this list of conditions and the following disclaimer. |
| 11 # * Redistributions in binary form must reproduce the above |
| 12 # copyright notice, this list of conditions and the following disclaimer |
| 13 # in the documentation and/or other materials provided with the |
| 14 # distribution. |
| 15 # * Neither the name of Google Inc. nor the names of its |
| 16 # contributors may be used to endorse or promote products derived from |
| 17 # this software without specific prior written permission. |
| 18 # |
| 19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 20 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 21 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 22 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 23 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 24 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 25 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 26 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 27 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 28 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 29 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 30 |
| 31 import datetime |
| 32 import json |
| 33 import os |
| 34 import re |
| 35 import select |
| 36 import subprocess |
| 37 import sys |
| 38 import time |
| 39 |
| 40 # Import big_query_utils module |
| 41 bq_utils_dir = os.path.abspath(os.path.join( |
| 42 os.path.dirname(__file__), '../utils')) |
| 43 sys.path.append(bq_utils_dir) |
| 44 import big_query_utils as bq_utils |
| 45 |
| 46 |
| 47 class EventType: |
| 48 STARTING = 'STARTING' |
| 49 SUCCESS = 'SUCCESS' |
| 50 FAILURE = 'FAILURE' |
| 51 |
| 52 |
| 53 class BigQueryHelper: |
| 54 """Helper class for the stress test wrappers to interact with BigQuery. |
| 55 """ |
| 56 |
| 57 def __init__(self, run_id, image_type, pod_name, project_id, dataset_id, |
| 58 summary_table_id, qps_table_id): |
| 59 self.run_id = run_id |
| 60 self.image_type = image_type |
| 61 self.pod_name = pod_name |
| 62 self.project_id = project_id |
| 63 self.dataset_id = dataset_id |
| 64 self.summary_table_id = summary_table_id |
| 65 self.qps_table_id = qps_table_id |
| 66 |
| 67 def initialize(self): |
| 68 self.bq = bq_utils.create_big_query() |
| 69 |
| 70 def setup_tables(self): |
| 71 return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \ |
| 72 and self.__create_summary_table() \ |
| 73 and self.__create_qps_table() |
| 74 |
| 75 def insert_summary_row(self, event_type, details): |
| 76 row_values_dict = { |
| 77 'run_id': self.run_id, |
| 78 'image_type': self.image_type, |
| 79 'pod_name': self.pod_name, |
| 80 'event_date': datetime.datetime.now().isoformat(), |
| 81 'event_type': event_type, |
| 82 'details': details |
| 83 } |
| 84 # row_unique_id is something that uniquely identifies the row (BigQuery uses |
| 85 # it for duplicate detection). |
| 86 row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type) |
| 87 row = bq_utils.make_row(row_unique_id, row_values_dict) |
| 88 return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, |
| 89 self.summary_table_id, [row]) |
| 90 |
| 91 def insert_qps_row(self, qps, recorded_at): |
| 92 row_values_dict = { |
| 93 'run_id': self.run_id, |
| 94 'pod_name': self.pod_name, |
| 95 'recorded_at': recorded_at, |
| 96 'qps': qps |
| 97 } |
| 98 |
| 99 # row_unique_id is something that uniquely identifies the row (BigQuery uses |
| 100 # it for duplicate detection). |
| 101 row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at) |
| 102 row = bq_utils.make_row(row_unique_id, row_values_dict) |
| 103 return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, |
| 104 self.qps_table_id, [row]) |
| 105 |
| 106 def check_if_any_tests_failed(self, num_query_retries=3): |
| 107 query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND ' |
| 108 'event_type="%s"') % (self.dataset_id, self.summary_table_id, |
| 109 self.run_id, EventType.FAILURE) |
| 110 query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) |
| 111 page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute( |
| 112 num_retries=num_query_retries) |
| 113 num_failures = int(page['totalRows']) |
| 114 print 'num rows: ', num_failures |
| 115 return num_failures > 0 |
| 116 |
| 117 def print_summary_records(self, num_query_retries=3): |
| 118 line = '-' * 120 |
| 119 print line |
| 120 print 'Summary records' |
| 121 print 'Run Id: ', self.run_id |
| 122 print 'Dataset Id: ', self.dataset_id |
| 123 print line |
| 124 query = ('SELECT pod_name, image_type, event_type, event_date, details' |
| 125 ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % ( |
| 126 self.dataset_id, self.summary_table_id, self.run_id) |
| 127 query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) |
| 128 |
| 129 print '{:<25} {:<12} {:<12} {:<30} {}'.format( |
| 130 'Pod name', 'Image type', 'Event type', 'Date', 'Details') |
| 131 print line |
| 132 page_token = None |
| 133 while True: |
| 134 page = self.bq.jobs().getQueryResults( |
| 135 pageToken=page_token, |
| 136 **query_job['jobReference']).execute(num_retries=num_query_retries) |
| 137 rows = page.get('rows', []) |
| 138 for row in rows: |
| 139 print '{:<25} {:<12} {:<12} {:<30} {}'.format( |
| 140 row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'], |
| 141 row['f'][3]['v'], row['f'][4]['v']) |
| 142 page_token = page.get('pageToken') |
| 143 if not page_token: |
| 144 break |
| 145 |
| 146 def print_qps_records(self, num_query_retries=3): |
| 147 line = '-' * 80 |
| 148 print line |
| 149 print 'QPS Summary' |
| 150 print 'Run Id: ', self.run_id |
| 151 print 'Dataset Id: ', self.dataset_id |
| 152 print line |
| 153 query = ( |
| 154 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' ' |
| 155 'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id, |
| 156 self.run_id) |
| 157 query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) |
| 158 print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps') |
| 159 print line |
| 160 page_token = None |
| 161 while True: |
| 162 page = self.bq.jobs().getQueryResults( |
| 163 pageToken=page_token, |
| 164 **query_job['jobReference']).execute(num_retries=num_query_retries) |
| 165 rows = page.get('rows', []) |
| 166 for row in rows: |
| 167 print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'], |
| 168 row['f'][2]['v']) |
| 169 page_token = page.get('pageToken') |
| 170 if not page_token: |
| 171 break |
| 172 |
| 173 def __create_summary_table(self): |
| 174 summary_table_schema = [ |
| 175 ('run_id', 'STRING', 'Test run id'), |
| 176 ('image_type', 'STRING', 'Client or Server?'), |
| 177 ('pod_name', 'STRING', 'GKE pod hosting this image'), |
| 178 ('event_date', 'STRING', 'The date of this event'), |
| 179 ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'), |
| 180 ('details', 'STRING', 'Any other relevant details') |
| 181 ] |
| 182 desc = ('The table that contains START/SUCCESS/FAILURE events for ' |
| 183 ' the stress test clients and servers') |
| 184 return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, |
| 185 self.summary_table_id, summary_table_schema, |
| 186 desc) |
| 187 |
| 188 def __create_qps_table(self): |
| 189 qps_table_schema = [ |
| 190 ('run_id', 'STRING', 'Test run id'), |
| 191 ('pod_name', 'STRING', 'GKE pod hosting this image'), |
| 192 ('recorded_at', 'STRING', 'Metrics recorded at time'), |
| 193 ('qps', 'INTEGER', 'Queries per second') |
| 194 ] |
| 195 desc = 'The table that cointains the qps recorded at various intervals' |
| 196 return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, |
| 197 self.qps_table_id, qps_table_schema, desc) |
OLD | NEW |