Index: third_party/grpc/tools/gcp/stress_test/stress_test_utils.py |
diff --git a/third_party/grpc/tools/gcp/stress_test/stress_test_utils.py b/third_party/grpc/tools/gcp/stress_test/stress_test_utils.py |
new file mode 100755 |
index 0000000000000000000000000000000000000000..c4b437e3459daf0d5314f21ac26caf4d37095e6f |
--- /dev/null |
+++ b/third_party/grpc/tools/gcp/stress_test/stress_test_utils.py |
@@ -0,0 +1,197 @@ |
+#!/usr/bin/env python2.7 |
+# Copyright 2015-2016, Google Inc. |
+# All rights reserved. |
+# |
+# Redistribution and use in source and binary forms, with or without |
+# modification, are permitted provided that the following conditions are |
+# met: |
+# |
+# * Redistributions of source code must retain the above copyright |
+# notice, this list of conditions and the following disclaimer. |
+# * Redistributions in binary form must reproduce the above |
+# copyright notice, this list of conditions and the following disclaimer |
+# in the documentation and/or other materials provided with the |
+# distribution. |
+# * Neither the name of Google Inc. nor the names of its |
+# contributors may be used to endorse or promote products derived from |
+# this software without specific prior written permission. |
+# |
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
+ |
+import datetime |
+import json |
+import os |
+import re |
+import select |
+import subprocess |
+import sys |
+import time |
+ |
+# Import big_query_utils module |
+bq_utils_dir = os.path.abspath(os.path.join( |
+ os.path.dirname(__file__), '../utils')) |
+sys.path.append(bq_utils_dir) |
+import big_query_utils as bq_utils |
+ |
+ |
+class EventType: |
+ STARTING = 'STARTING' |
+ SUCCESS = 'SUCCESS' |
+ FAILURE = 'FAILURE' |
+ |
+ |
+class BigQueryHelper: |
+ """Helper class for the stress test wrappers to interact with BigQuery. |
+ """ |
+ |
+ def __init__(self, run_id, image_type, pod_name, project_id, dataset_id, |
+ summary_table_id, qps_table_id): |
+ self.run_id = run_id |
+ self.image_type = image_type |
+ self.pod_name = pod_name |
+ self.project_id = project_id |
+ self.dataset_id = dataset_id |
+ self.summary_table_id = summary_table_id |
+ self.qps_table_id = qps_table_id |
+ |
+ def initialize(self): |
+ self.bq = bq_utils.create_big_query() |
+ |
+ def setup_tables(self): |
+ return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \ |
+ and self.__create_summary_table() \ |
+ and self.__create_qps_table() |
+ |
+ def insert_summary_row(self, event_type, details): |
+ row_values_dict = { |
+ 'run_id': self.run_id, |
+ 'image_type': self.image_type, |
+ 'pod_name': self.pod_name, |
+ 'event_date': datetime.datetime.now().isoformat(), |
+ 'event_type': event_type, |
+ 'details': details |
+ } |
+ # row_unique_id is something that uniquely identifies the row (BigQuery uses |
+ # it for duplicate detection). |
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type) |
+ row = bq_utils.make_row(row_unique_id, row_values_dict) |
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, |
+ self.summary_table_id, [row]) |
+ |
+ def insert_qps_row(self, qps, recorded_at): |
+ row_values_dict = { |
+ 'run_id': self.run_id, |
+ 'pod_name': self.pod_name, |
+ 'recorded_at': recorded_at, |
+ 'qps': qps |
+ } |
+ |
+ # row_unique_id is something that uniquely identifies the row (BigQuery uses |
+ # it for duplicate detection). |
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at) |
+ row = bq_utils.make_row(row_unique_id, row_values_dict) |
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id, |
+ self.qps_table_id, [row]) |
+ |
+ def check_if_any_tests_failed(self, num_query_retries=3): |
+ query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND ' |
+ 'event_type="%s"') % (self.dataset_id, self.summary_table_id, |
+ self.run_id, EventType.FAILURE) |
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) |
+ page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute( |
+ num_retries=num_query_retries) |
+ num_failures = int(page['totalRows']) |
+ print 'num rows: ', num_failures |
+ return num_failures > 0 |
+ |
+ def print_summary_records(self, num_query_retries=3): |
+ line = '-' * 120 |
+ print line |
+ print 'Summary records' |
+ print 'Run Id: ', self.run_id |
+ print 'Dataset Id: ', self.dataset_id |
+ print line |
+ query = ('SELECT pod_name, image_type, event_type, event_date, details' |
+ ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % ( |
+ self.dataset_id, self.summary_table_id, self.run_id) |
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) |
+ |
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format( |
+ 'Pod name', 'Image type', 'Event type', 'Date', 'Details') |
+ print line |
+ page_token = None |
+ while True: |
+ page = self.bq.jobs().getQueryResults( |
+ pageToken=page_token, |
+ **query_job['jobReference']).execute(num_retries=num_query_retries) |
+ rows = page.get('rows', []) |
+ for row in rows: |
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format( |
+ row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'], |
+ row['f'][3]['v'], row['f'][4]['v']) |
+ page_token = page.get('pageToken') |
+ if not page_token: |
+ break |
+ |
+ def print_qps_records(self, num_query_retries=3): |
+ line = '-' * 80 |
+ print line |
+ print 'QPS Summary' |
+ print 'Run Id: ', self.run_id |
+ print 'Dataset Id: ', self.dataset_id |
+ print line |
+ query = ( |
+ 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' ' |
+ 'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id, |
+ self.run_id) |
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query) |
+ print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps') |
+ print line |
+ page_token = None |
+ while True: |
+ page = self.bq.jobs().getQueryResults( |
+ pageToken=page_token, |
+ **query_job['jobReference']).execute(num_retries=num_query_retries) |
+ rows = page.get('rows', []) |
+ for row in rows: |
+ print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'], |
+ row['f'][2]['v']) |
+ page_token = page.get('pageToken') |
+ if not page_token: |
+ break |
+ |
+ def __create_summary_table(self): |
+ summary_table_schema = [ |
+ ('run_id', 'STRING', 'Test run id'), |
+ ('image_type', 'STRING', 'Client or Server?'), |
+ ('pod_name', 'STRING', 'GKE pod hosting this image'), |
+ ('event_date', 'STRING', 'The date of this event'), |
+ ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'), |
+ ('details', 'STRING', 'Any other relevant details') |
+ ] |
+ desc = ('The table that contains START/SUCCESS/FAILURE events for ' |
+ ' the stress test clients and servers') |
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, |
+ self.summary_table_id, summary_table_schema, |
+ desc) |
+ |
+ def __create_qps_table(self): |
+ qps_table_schema = [ |
+ ('run_id', 'STRING', 'Test run id'), |
+ ('pod_name', 'STRING', 'GKE pod hosting this image'), |
+ ('recorded_at', 'STRING', 'Metrics recorded at time'), |
+ ('qps', 'INTEGER', 'Queries per second') |
+ ] |
+ desc = 'The table that cointains the qps recorded at various intervals' |
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id, |
+ self.qps_table_id, qps_table_schema, desc) |