| Index: third_party/grpc/tools/gcp/stress_test/stress_test_utils.py
|
| diff --git a/third_party/grpc/tools/gcp/stress_test/stress_test_utils.py b/third_party/grpc/tools/gcp/stress_test/stress_test_utils.py
|
| new file mode 100755
|
| index 0000000000000000000000000000000000000000..c4b437e3459daf0d5314f21ac26caf4d37095e6f
|
| --- /dev/null
|
| +++ b/third_party/grpc/tools/gcp/stress_test/stress_test_utils.py
|
| @@ -0,0 +1,197 @@
|
| +#!/usr/bin/env python2.7
|
| +# Copyright 2015-2016, Google Inc.
|
| +# All rights reserved.
|
| +#
|
| +# Redistribution and use in source and binary forms, with or without
|
| +# modification, are permitted provided that the following conditions are
|
| +# met:
|
| +#
|
| +# * Redistributions of source code must retain the above copyright
|
| +# notice, this list of conditions and the following disclaimer.
|
| +# * Redistributions in binary form must reproduce the above
|
| +# copyright notice, this list of conditions and the following disclaimer
|
| +# in the documentation and/or other materials provided with the
|
| +# distribution.
|
| +# * Neither the name of Google Inc. nor the names of its
|
| +# contributors may be used to endorse or promote products derived from
|
| +# this software without specific prior written permission.
|
| +#
|
| +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
| +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
| +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
| +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
| +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
| +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
| +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
| +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
| +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
| +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
| +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
| +
|
| +import datetime
|
| +import json
|
| +import os
|
| +import re
|
| +import select
|
| +import subprocess
|
| +import sys
|
| +import time
|
| +
|
| +# Import big_query_utils module
|
| +bq_utils_dir = os.path.abspath(os.path.join(
|
| + os.path.dirname(__file__), '../utils'))
|
| +sys.path.append(bq_utils_dir)
|
| +import big_query_utils as bq_utils
|
| +
|
| +
|
| +class EventType:
|
| + STARTING = 'STARTING'
|
| + SUCCESS = 'SUCCESS'
|
| + FAILURE = 'FAILURE'
|
| +
|
| +
|
| +class BigQueryHelper:
|
| + """Helper class for the stress test wrappers to interact with BigQuery.
|
| + """
|
| +
|
| + def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
|
| + summary_table_id, qps_table_id):
|
| + self.run_id = run_id
|
| + self.image_type = image_type
|
| + self.pod_name = pod_name
|
| + self.project_id = project_id
|
| + self.dataset_id = dataset_id
|
| + self.summary_table_id = summary_table_id
|
| + self.qps_table_id = qps_table_id
|
| +
|
| + def initialize(self):
|
| + self.bq = bq_utils.create_big_query()
|
| +
|
| + def setup_tables(self):
|
| + return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
|
| + and self.__create_summary_table() \
|
| + and self.__create_qps_table()
|
| +
|
| + def insert_summary_row(self, event_type, details):
|
| + row_values_dict = {
|
| + 'run_id': self.run_id,
|
| + 'image_type': self.image_type,
|
| + 'pod_name': self.pod_name,
|
| + 'event_date': datetime.datetime.now().isoformat(),
|
| + 'event_type': event_type,
|
| + 'details': details
|
| + }
|
| + # row_unique_id is something that uniquely identifies the row (BigQuery uses
|
| + # it for duplicate detection).
|
| + row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
|
| + row = bq_utils.make_row(row_unique_id, row_values_dict)
|
| + return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
|
| + self.summary_table_id, [row])
|
| +
|
| + def insert_qps_row(self, qps, recorded_at):
|
| + row_values_dict = {
|
| + 'run_id': self.run_id,
|
| + 'pod_name': self.pod_name,
|
| + 'recorded_at': recorded_at,
|
| + 'qps': qps
|
| + }
|
| +
|
| + # row_unique_id is something that uniquely identifies the row (BigQuery uses
|
| + # it for duplicate detection).
|
| + row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
|
| + row = bq_utils.make_row(row_unique_id, row_values_dict)
|
| + return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
|
| + self.qps_table_id, [row])
|
| +
|
| + def check_if_any_tests_failed(self, num_query_retries=3):
|
| + query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
|
| + 'event_type="%s"') % (self.dataset_id, self.summary_table_id,
|
| + self.run_id, EventType.FAILURE)
|
| + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
|
| + page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
|
| + num_retries=num_query_retries)
|
| + num_failures = int(page['totalRows'])
|
| + print 'num rows: ', num_failures
|
| + return num_failures > 0
|
| +
|
| + def print_summary_records(self, num_query_retries=3):
|
| + line = '-' * 120
|
| + print line
|
| + print 'Summary records'
|
| + print 'Run Id: ', self.run_id
|
| + print 'Dataset Id: ', self.dataset_id
|
| + print line
|
| + query = ('SELECT pod_name, image_type, event_type, event_date, details'
|
| + ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
|
| + self.dataset_id, self.summary_table_id, self.run_id)
|
| + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
|
| +
|
| + print '{:<25} {:<12} {:<12} {:<30} {}'.format(
|
| + 'Pod name', 'Image type', 'Event type', 'Date', 'Details')
|
| + print line
|
| + page_token = None
|
| + while True:
|
| + page = self.bq.jobs().getQueryResults(
|
| + pageToken=page_token,
|
| + **query_job['jobReference']).execute(num_retries=num_query_retries)
|
| + rows = page.get('rows', [])
|
| + for row in rows:
|
| + print '{:<25} {:<12} {:<12} {:<30} {}'.format(
|
| + row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
|
| + row['f'][3]['v'], row['f'][4]['v'])
|
| + page_token = page.get('pageToken')
|
| + if not page_token:
|
| + break
|
| +
|
| + def print_qps_records(self, num_query_retries=3):
|
| + line = '-' * 80
|
| + print line
|
| + print 'QPS Summary'
|
| + print 'Run Id: ', self.run_id
|
| + print 'Dataset Id: ', self.dataset_id
|
| + print line
|
| + query = (
|
| + 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
|
| + 'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
|
| + self.run_id)
|
| + query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
|
| + print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
|
| + print line
|
| + page_token = None
|
| + while True:
|
| + page = self.bq.jobs().getQueryResults(
|
| + pageToken=page_token,
|
| + **query_job['jobReference']).execute(num_retries=num_query_retries)
|
| + rows = page.get('rows', [])
|
| + for row in rows:
|
| + print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
|
| + row['f'][2]['v'])
|
| + page_token = page.get('pageToken')
|
| + if not page_token:
|
| + break
|
| +
|
| + def __create_summary_table(self):
|
| + summary_table_schema = [
|
| + ('run_id', 'STRING', 'Test run id'),
|
| + ('image_type', 'STRING', 'Client or Server?'),
|
| + ('pod_name', 'STRING', 'GKE pod hosting this image'),
|
| + ('event_date', 'STRING', 'The date of this event'),
|
| + ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
|
| + ('details', 'STRING', 'Any other relevant details')
|
| + ]
|
| + desc = ('The table that contains START/SUCCESS/FAILURE events for '
|
| + ' the stress test clients and servers')
|
| + return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
|
| + self.summary_table_id, summary_table_schema,
|
| + desc)
|
| +
|
| + def __create_qps_table(self):
|
| + qps_table_schema = [
|
| + ('run_id', 'STRING', 'Test run id'),
|
| + ('pod_name', 'STRING', 'GKE pod hosting this image'),
|
| + ('recorded_at', 'STRING', 'Metrics recorded at time'),
|
| + ('qps', 'INTEGER', 'Queries per second')
|
| + ]
|
| + desc = 'The table that cointains the qps recorded at various intervals'
|
| + return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
|
| + self.qps_table_id, qps_table_schema, desc)
|
|
|