aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-03-01 23:35:04 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-03-01 23:35:04 -0800
commit5f17aaea1bcf1e92d144a8c7a91417059a0cc379 (patch)
tree64d47cd8507cbc326fd9a8b7b10f56895eb61564 /tools
parentaa1c3392091c6cb9930067e005e5d19814a0cb76 (diff)
parent2a04f63d59f32dcf3fd5270f242a2d70653ce098 (diff)
Merge github.com:grpc/grpc into esan
Diffstat (limited to 'tools')
-rw-r--r--tools/dockerfile/grpc_interop_stress_cxx/Dockerfile5
-rwxr-xr-xtools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh2
-rwxr-xr-xtools/gcp/stress_test/run_client.py187
-rwxr-xr-xtools/gcp/stress_test/run_server.py120
-rwxr-xr-xtools/gcp/stress_test/stress_test_utils.py197
-rwxr-xr-xtools/gcp/utils/big_query_utils.py140
-rwxr-xr-xtools/gcp/utils/kubernetes_api.py (renamed from tools/gke/kubernetes_api.py)83
-rwxr-xr-xtools/jenkins/build_interop_stress_image.sh3
-rw-r--r--tools/run_tests/distribtest_targets.py11
-rwxr-xr-xtools/run_tests/jobset.py3
-rwxr-xr-xtools/run_tests/stress_test/run_stress_tests_on_gke.py556
11 files changed, 1290 insertions, 17 deletions
diff --git a/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile b/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
index 58a8c32e34..4123cc1a26 100644
--- a/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
+++ b/tools/dockerfile/grpc_interop_stress_cxx/Dockerfile
@@ -59,6 +59,8 @@ RUN apt-get update && apt-get install -y \
wget \
zip && apt-get clean
+RUN easy_install -U pip
+
# Prepare ccache
RUN ln -s /usr/bin/ccache /usr/local/bin/gcc
RUN ln -s /usr/bin/ccache /usr/local/bin/g++
@@ -71,5 +73,8 @@ RUN ln -s /usr/bin/ccache /usr/local/bin/clang++
# C++ dependencies
RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev libc++-dev clang
+# Google Cloud platform API libraries (for BigQuery)
+RUN pip install --upgrade google-api-python-client
+
# Define the default command.
CMD ["bash"]
diff --git a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
index 6ed3ccb3fa..392bdfccda 100755
--- a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
+++ b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
@@ -42,4 +42,4 @@ cd /var/local/git/grpc
make install-certs
# build C++ interop stress client, interop client and server
-make stress_test interop_client interop_server
+make stress_test metrics_client interop_client interop_server
diff --git a/tools/gcp/stress_test/run_client.py b/tools/gcp/stress_test/run_client.py
new file mode 100755
index 0000000000..0fa1bf1cb9
--- /dev/null
+++ b/tools/gcp/stress_test/run_client.py
@@ -0,0 +1,187 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import EventType
+from stress_test_utils import BigQueryHelper
+
+
+# TODO (sree): Write a python grpc client to directly query the metrics instead
+# of calling metrics_client
+def _get_qps(metrics_cmd):
+ qps = 0
+ try:
+ # Note: gpr_log() writes even non-error messages to stderr stream. So it is
+ # important that we set stderr=subprocess.STDOUT
+ p = subprocess.Popen(args=metrics_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ retcode = p.wait()
+ (out_str, err_str) = p.communicate()
+ if retcode != 0:
+ print 'Error in reading metrics information'
+ print 'Output: ', out_str
+ else:
+ # The overall qps is printed at the end of the line
+ m = re.search('\d+$', out_str)
+ qps = int(m.group()) if m else 0
+ except Exception as ex:
+ print 'Exception while reading metrics information: ' + str(ex)
+ return qps
+
+
+def run_client():
+ """This is a wrapper around the stress test client and performs the following:
+ 1) Create the following two tables in Big Query:
+ (i) Summary table: To record events like the test started, completed
+ successfully or failed
+ (ii) Qps table: To periodically record the QPS sent by this client
+ 2) Start the stress test client and add a row in the Big Query summary
+ table
+ 3) Once every few seconds (as specificed by the poll_interval_secs) poll
+ the status of the stress test client process and perform the
+ following:
+ 3.1) If the process is still running, get the current qps by invoking
+ the metrics client program and add a row in the Big Query
+ Qps table. Sleep for a duration specified by poll_interval_secs
+ 3.2) If the process exited successfully, add a row in the Big Query
+ Summary table and exit
+ 3.3) If the process failed, add a row in Big Query summary table and
+ wait forever.
+ NOTE: This script typically runs inside a GKE pod which means
+ that the pod gets destroyed when the script exits. However, in
+ case the stress test client fails, we would not want the pod to
+ be destroyed (since we might want to connect to the pod for
+ examining logs). This is the reason why the script waits forever
+ in case of failures
+ """
+ env = dict(os.environ)
+ image_type = env['STRESS_TEST_IMAGE_TYPE']
+ image_name = env['STRESS_TEST_IMAGE']
+ args_str = env['STRESS_TEST_ARGS_STR']
+ metrics_client_image = env['METRICS_CLIENT_IMAGE']
+ metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR']
+ run_id = env['RUN_ID']
+ pod_name = env['POD_NAME']
+ logfile_name = env.get('LOGFILE_NAME')
+ poll_interval_secs = float(env['POLL_INTERVAL_SECS'])
+ project_id = env['GCP_PROJECT_ID']
+ dataset_id = env['DATASET_ID']
+ summary_table_id = env['SUMMARY_TABLE_ID']
+ qps_table_id = env['QPS_TABLE_ID']
+
+ bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+ dataset_id, summary_table_id, qps_table_id)
+ bq_helper.initialize()
+
+ # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+ if not bq_helper.setup_tables():
+ print 'Error in creating BigQuery tables'
+ return
+
+ start_time = datetime.datetime.now()
+
+ logfile = None
+ details = 'Logging to stdout'
+ if logfile_name is not None:
+ print 'Opening logfile: %s ...' % logfile_name
+ details = 'Logfile: %s' % logfile_name
+ logfile = open(logfile_name, 'w')
+
+ # Update status that the test is starting (in the status table)
+ bq_helper.insert_summary_row(EventType.STARTING, details)
+
+ metrics_cmd = [metrics_client_image
+ ] + [x for x in metrics_client_args_str.split()]
+ stress_cmd = [image_name] + [x for x in args_str.split()]
+
+ print 'Launching process %s ...' % stress_cmd
+ stress_p = subprocess.Popen(args=stress_cmd,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ qps_history = [1, 1, 1] # Maintain the last 3 qps readings
+ qps_history_idx = 0 # Index into the qps_history list
+
+ is_error = False
+ while True:
+ # Check if stress_client is still running. If so, collect metrics and upload
+ # to BigQuery status table
+ if stress_p.poll() is not None:
+ end_time = datetime.datetime.now().isoformat()
+ event_type = EventType.SUCCESS
+ details = 'End time: %s' % end_time
+ if stress_p.returncode != 0:
+ event_type = EventType.FAILURE
+ details = 'Return code = %d. End time: %s' % (stress_p.returncode,
+ end_time)
+ is_error = True
+ bq_helper.insert_summary_row(event_type, details)
+ print details
+ break
+
+ # Stress client still running. Get metrics
+ qps = _get_qps(metrics_cmd)
+ qps_recorded_at = datetime.datetime.now().isoformat()
+ print 'qps: %d at %s' % (qps, qps_recorded_at)
+
+ # If QPS has been zero for the last 3 iterations, flag it as error and exit
+ qps_history[qps_history_idx] = qps
+ qps_history_idx = (qps_history_idx + 1) % len(qps_history)
+ if sum(qps_history) == 0:
+ details = 'QPS has been zero for the last %d seconds - as of : %s' % (
+ poll_interval_secs * 3, qps_recorded_at)
+ is_error = True
+ bq_helper.insert_summary_row(EventType.FAILURE, details)
+ print details
+ break
+
+ # Upload qps metrics to BiqQuery
+ bq_helper.insert_qps_row(qps, qps_recorded_at)
+
+ time.sleep(poll_interval_secs)
+
+ if is_error:
+ print 'Waiting indefinitely..'
+ select.select([], [], [])
+
+ print 'Completed'
+ return
+
+
+if __name__ == '__main__':
+ run_client()
diff --git a/tools/gcp/stress_test/run_server.py b/tools/gcp/stress_test/run_server.py
new file mode 100755
index 0000000000..64322f6100
--- /dev/null
+++ b/tools/gcp/stress_test/run_server.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import BigQueryHelper
+from stress_test_utils import EventType
+
+
+def run_server():
+ """This is a wrapper around the interop server and performs the following:
+ 1) Create a 'Summary table' in Big Query to record events like the server
+ started, completed successfully or failed. NOTE: This also creates
+ another table called the QPS table which is currently NOT needed on the
+ server (it is needed on the stress test clients)
+ 2) Start the server process and add a row in Big Query summary table
+ 3) Wait for the server process to terminate. The server process does not
+ terminate unless there is an error.
+ If the server process terminated with a failure, add a row in Big Query
+ and wait forever.
+ NOTE: This script typically runs inside a GKE pod which means that the
+ pod gets destroyed when the script exits. However, in case the server
+ process fails, we would not want the pod to be destroyed (since we
+ might want to connect to the pod for examining logs). This is the
+ reason why the script waits forever in case of failures.
+ """
+
+ # Read the parameters from environment variables
+ env = dict(os.environ)
+
+ run_id = env['RUN_ID'] # The unique run id for this test
+ image_type = env['STRESS_TEST_IMAGE_TYPE']
+ image_name = env['STRESS_TEST_IMAGE']
+ args_str = env['STRESS_TEST_ARGS_STR']
+ pod_name = env['POD_NAME']
+ project_id = env['GCP_PROJECT_ID']
+ dataset_id = env['DATASET_ID']
+ summary_table_id = env['SUMMARY_TABLE_ID']
+ qps_table_id = env['QPS_TABLE_ID']
+
+ logfile_name = env.get('LOGFILE_NAME')
+
+ print('pod_name: %s, project_id: %s, run_id: %s, dataset_id: %s, '
+ 'summary_table_id: %s, qps_table_id: %s') % (
+ pod_name, project_id, run_id, dataset_id, summary_table_id,
+ qps_table_id)
+
+ bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+ dataset_id, summary_table_id, qps_table_id)
+ bq_helper.initialize()
+
+ # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+ if not bq_helper.setup_tables():
+ print 'Error in creating BigQuery tables'
+ return
+
+ start_time = datetime.datetime.now()
+
+ logfile = None
+ details = 'Logging to stdout'
+ if logfile_name is not None:
+ print 'Opening log file: ', logfile_name
+ logfile = open(logfile_name, 'w')
+ details = 'Logfile: %s' % logfile_name
+
+ # Update status that the test is starting (in the status table)
+ bq_helper.insert_summary_row(EventType.STARTING, details)
+
+ stress_cmd = [image_name] + [x for x in args_str.split()]
+
+ print 'Launching process %s ...' % stress_cmd
+ stress_p = subprocess.Popen(args=stress_cmd,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ returncode = stress_p.wait()
+ if returncode != 0:
+ end_time = datetime.datetime.now().isoformat()
+ event_type = EventType.FAILURE
+ details = 'Returncode: %d; End time: %s' % (returncode, end_time)
+ bq_helper.insert_summary_row(event_type, details)
+ print 'Waiting indefinitely..'
+ select.select([], [], [])
+ return returncode
+
+
+if __name__ == '__main__':
+ run_server()
diff --git a/tools/gcp/stress_test/stress_test_utils.py b/tools/gcp/stress_test/stress_test_utils.py
new file mode 100755
index 0000000000..c4b437e345
--- /dev/null
+++ b/tools/gcp/stress_test/stress_test_utils.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import json
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+# Import big_query_utils module
+bq_utils_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '../utils'))
+sys.path.append(bq_utils_dir)
+import big_query_utils as bq_utils
+
+
+class EventType:
+ STARTING = 'STARTING'
+ SUCCESS = 'SUCCESS'
+ FAILURE = 'FAILURE'
+
+
+class BigQueryHelper:
+ """Helper class for the stress test wrappers to interact with BigQuery.
+ """
+
+ def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
+ summary_table_id, qps_table_id):
+ self.run_id = run_id
+ self.image_type = image_type
+ self.pod_name = pod_name
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.summary_table_id = summary_table_id
+ self.qps_table_id = qps_table_id
+
+ def initialize(self):
+ self.bq = bq_utils.create_big_query()
+
+ def setup_tables(self):
+ return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
+ and self.__create_summary_table() \
+ and self.__create_qps_table()
+
+ def insert_summary_row(self, event_type, details):
+ row_values_dict = {
+ 'run_id': self.run_id,
+ 'image_type': self.image_type,
+ 'pod_name': self.pod_name,
+ 'event_date': datetime.datetime.now().isoformat(),
+ 'event_type': event_type,
+ 'details': details
+ }
+ # row_unique_id is something that uniquely identifies the row (BigQuery uses
+ # it for duplicate detection).
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
+ row = bq_utils.make_row(row_unique_id, row_values_dict)
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+ self.summary_table_id, [row])
+
+ def insert_qps_row(self, qps, recorded_at):
+ row_values_dict = {
+ 'run_id': self.run_id,
+ 'pod_name': self.pod_name,
+ 'recorded_at': recorded_at,
+ 'qps': qps
+ }
+
+ # row_unique_id is something that uniquely identifies the row (BigQuery uses
+ # it for duplicate detection).
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
+ row = bq_utils.make_row(row_unique_id, row_values_dict)
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+ self.qps_table_id, [row])
+
+ def check_if_any_tests_failed(self, num_query_retries=3):
+ query = ('SELECT event_type FROM %s.%s WHERE run_id = \'%s\' AND '
+ 'event_type="%s"') % (self.dataset_id, self.summary_table_id,
+ self.run_id, EventType.FAILURE)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+ page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
+ num_retries=num_query_retries)
+ num_failures = int(page['totalRows'])
+ print 'num rows: ', num_failures
+ return num_failures > 0
+
+ def print_summary_records(self, num_query_retries=3):
+ line = '-' * 120
+ print line
+ print 'Summary records'
+ print 'Run Id: ', self.run_id
+ print 'Dataset Id: ', self.dataset_id
+ print line
+ query = ('SELECT pod_name, image_type, event_type, event_date, details'
+ ' FROM %s.%s WHERE run_id = \'%s\' ORDER by event_date;') % (
+ self.dataset_id, self.summary_table_id, self.run_id)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+ 'Pod name', 'Image type', 'Event type', 'Date', 'Details')
+ print line
+ page_token = None
+ while True:
+ page = self.bq.jobs().getQueryResults(
+ pageToken=page_token,
+ **query_job['jobReference']).execute(num_retries=num_query_retries)
+ rows = page.get('rows', [])
+ for row in rows:
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+ row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
+ row['f'][3]['v'], row['f'][4]['v'])
+ page_token = page.get('pageToken')
+ if not page_token:
+ break
+
+ def print_qps_records(self, num_query_retries=3):
+ line = '-' * 80
+ print line
+ print 'QPS Summary'
+ print 'Run Id: ', self.run_id
+ print 'Dataset Id: ', self.dataset_id
+ print line
+ query = (
+ 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = \'%s\' '
+ 'ORDER by recorded_at;') % (self.dataset_id, self.qps_table_id,
+ self.run_id)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+ print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
+ print line
+ page_token = None
+ while True:
+ page = self.bq.jobs().getQueryResults(
+ pageToken=page_token,
+ **query_job['jobReference']).execute(num_retries=num_query_retries)
+ rows = page.get('rows', [])
+ for row in rows:
+ print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
+ row['f'][2]['v'])
+ page_token = page.get('pageToken')
+ if not page_token:
+ break
+
+ def __create_summary_table(self):
+ summary_table_schema = [
+ ('run_id', 'STRING', 'Test run id'),
+ ('image_type', 'STRING', 'Client or Server?'),
+ ('pod_name', 'STRING', 'GKE pod hosting this image'),
+ ('event_date', 'STRING', 'The date of this event'),
+ ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
+ ('details', 'STRING', 'Any other relevant details')
+ ]
+ desc = ('The table that contains START/SUCCESS/FAILURE events for '
+ ' the stress test clients and servers')
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+ self.summary_table_id, summary_table_schema,
+ desc)
+
+ def __create_qps_table(self):
+ qps_table_schema = [
+ ('run_id', 'STRING', 'Test run id'),
+ ('pod_name', 'STRING', 'GKE pod hosting this image'),
+ ('recorded_at', 'STRING', 'Metrics recorded at time'),
+ ('qps', 'INTEGER', 'Queries per second')
+ ]
+ desc = 'The table that cointains the qps recorded at various intervals'
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+ self.qps_table_id, qps_table_schema, desc)
diff --git a/tools/gcp/utils/big_query_utils.py b/tools/gcp/utils/big_query_utils.py
new file mode 100755
index 0000000000..7bb1e14354
--- /dev/null
+++ b/tools/gcp/utils/big_query_utils.py
@@ -0,0 +1,140 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+import json
+import uuid
+import httplib2
+
+from apiclient import discovery
+from apiclient.errors import HttpError
+from oauth2client.client import GoogleCredentials
+
+NUM_RETRIES = 3
+
+
+def create_big_query():
+ """Authenticates with cloud platform and gets a BiqQuery service object
+ """
+ creds = GoogleCredentials.get_application_default()
+ return discovery.build('bigquery', 'v2', credentials=creds)
+
+
+def create_dataset(biq_query, project_id, dataset_id):
+ is_success = True
+ body = {
+ 'datasetReference': {
+ 'projectId': project_id,
+ 'datasetId': dataset_id
+ }
+ }
+
+ try:
+ dataset_req = biq_query.datasets().insert(projectId=project_id, body=body)
+ dataset_req.execute(num_retries=NUM_RETRIES)
+ except HttpError as http_error:
+ if http_error.resp.status == 409:
+ print 'Warning: The dataset %s already exists' % dataset_id
+ else:
+ # Note: For more debugging info, print "http_error.content"
+ print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error)
+ is_success = False
+ return is_success
+
+
+def create_table(big_query, project_id, dataset_id, table_id, table_schema,
+ description):
+ is_success = True
+
+ body = {
+ 'description': description,
+ 'schema': {
+ 'fields': [{
+ 'name': field_name,
+ 'type': field_type,
+ 'description': field_description
+ } for (field_name, field_type, field_description) in table_schema]
+ },
+ 'tableReference': {
+ 'datasetId': dataset_id,
+ 'projectId': project_id,
+ 'tableId': table_id
+ }
+ }
+
+ try:
+ table_req = big_query.tables().insert(projectId=project_id,
+ datasetId=dataset_id,
+ body=body)
+ res = table_req.execute(num_retries=NUM_RETRIES)
+ print 'Successfully created %s "%s"' % (res['kind'], res['id'])
+ except HttpError as http_error:
+ if http_error.resp.status == 409:
+ print 'Warning: Table %s already exists' % table_id
+ else:
+ print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
+ is_success = False
+ return is_success
+
+
+def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
+ is_success = True
+ body = {'rows': rows_list}
+ try:
+ insert_req = big_query.tabledata().insertAll(projectId=project_id,
+ datasetId=dataset_id,
+ tableId=table_id,
+ body=body)
+ print body
+ res = insert_req.execute(num_retries=NUM_RETRIES)
+ print res
+ except HttpError as http_error:
+ print 'Error in inserting rows in the table %s' % table_id
+ is_success = False
+ return is_success
+
+
+def sync_query_job(big_query, project_id, query, timeout=5000):
+ query_data = {'query': query, 'timeoutMs': timeout}
+ query_job = None
+ try:
+ query_job = big_query.jobs().query(
+ projectId=project_id,
+ body=query_data).execute(num_retries=NUM_RETRIES)
+ except HttpError as http_error:
+ print 'Query execute job failed with error: %s' % http_error
+ print http_error.content
+ return query_job
+
+ # List of (column name, column type, description) tuples
+def make_row(unique_row_id, row_values_dict):
+ """row_values_dict is a dictionary of column name and column value.
+ """
+ return {'insertId': unique_row_id, 'json': row_values_dict}
diff --git a/tools/gke/kubernetes_api.py b/tools/gcp/utils/kubernetes_api.py
index 7dd3015365..e8ddd2f1b3 100755
--- a/tools/gke/kubernetes_api.py
+++ b/tools/gcp/utils/kubernetes_api.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -33,8 +33,9 @@ import json
_REQUEST_TIMEOUT_SECS = 10
+
def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
- arg_list):
+ arg_list, env_dict):
"""Creates a string containing the Pod defintion as required by the Kubernetes API"""
body = {
'kind': 'Pod',
@@ -48,20 +49,23 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
{
'name': pod_name,
'image': image_name,
- 'ports': []
+ 'ports': [{'containerPort': port,
+ 'protocol': 'TCP'}
+ for port in container_port_list],
+ 'imagePullPolicy': 'Always'
}
]
}
}
- # Populate the 'ports' list
- for port in container_port_list:
- port_entry = {'containerPort': port, 'protocol': 'TCP'}
- body['spec']['containers'][0]['ports'].append(port_entry)
+
+ env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()]
+ if len(env_list) > 0:
+ body['spec']['containers'][0]['env'] = env_list
# Add the 'Command' and 'Args' attributes if they are passed.
# Note:
# - 'Command' overrides the ENTRYPOINT in the Docker Image
- # - 'Args' override the COMMAND in Docker image (yes, it is confusing!)
+ # - 'Args' override the CMD in Docker image (yes, it is confusing!)
if len(cmd_list) > 0:
body['spec']['containers'][0]['command'] = cmd_list
if len(arg_list) > 0:
@@ -70,7 +74,7 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
def _make_service_config(service_name, pod_name, service_port_list,
- container_port_list, is_headless):
+ container_port_list, is_headless):
"""Creates a string containing the Service definition as required by the Kubernetes API.
NOTE:
@@ -124,6 +128,7 @@ def _print_connection_error(msg):
print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on '
'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg)
+
def _do_post(post_url, api_name, request_body):
"""Helper to do HTTP POST.
@@ -135,7 +140,9 @@ def _do_post(post_url, api_name, request_body):
"""
is_success = True
try:
- r = requests.post(post_url, data=request_body, timeout=_REQUEST_TIMEOUT_SECS)
+ r = requests.post(post_url,
+ data=request_body,
+ timeout=_REQUEST_TIMEOUT_SECS)
if r.status_code == requests.codes.conflict:
print('WARN: Looks like the resource already exists. Api: %s, url: %s' %
(api_name, post_url))
@@ -143,7 +150,8 @@ def _do_post(post_url, api_name, request_body):
print('ERROR: %s API returned error. HTTP response: (%d) %s' %
(api_name, r.status_code, r.text))
is_success = False
- except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
+ except (requests.exceptions.Timeout,
+ requests.exceptions.ConnectionError) as e:
is_success = False
_print_connection_error(str(e))
return is_success
@@ -165,7 +173,8 @@ def _do_delete(del_url, api_name):
print('ERROR: %s API returned error. HTTP response: %s' %
(api_name, r.text))
is_success = False
- except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
+ except (requests.exceptions.Timeout,
+ requests.exceptions.ConnectionError) as e:
is_success = False
_print_connection_error(str(e))
return is_success
@@ -179,12 +188,12 @@ def create_service(kube_host, kube_port, namespace, service_name, pod_name,
post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % (
kube_host, kube_port, namespace)
request_body = _make_service_config(service_name, pod_name, service_port_list,
- container_port_list, is_headless)
+ container_port_list, is_headless)
return _do_post(post_url, 'Create Service', request_body)
def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
- container_port_list, cmd_list, arg_list):
+ container_port_list, cmd_list, arg_list, env_dict):
"""Creates a Kubernetes Pod.
Note that it is generally NOT considered a good practice to directly create
@@ -200,7 +209,7 @@ def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port,
namespace)
request_body = _make_pod_config(pod_name, image_name, container_port_list,
- cmd_list, arg_list)
+ cmd_list, arg_list, env_dict)
return _do_post(post_url, 'Create Pod', request_body)
@@ -214,3 +223,47 @@ def delete_pod(kube_host, kube_port, namespace, pod_name):
del_url = 'http://%s:%d/api/v1/namespaces/%s/pods/%s' % (kube_host, kube_port,
namespace, pod_name)
return _do_delete(del_url, 'Delete Pod')
+
+
+def create_pod_and_service(kube_host, kube_port, namespace, pod_name,
+ image_name, container_port_list, cmd_list, arg_list,
+ env_dict, is_headless_service):
+ """A helper function that creates a pod and a service (if pod creation was successful)."""
+ is_success = create_pod(kube_host, kube_port, namespace, pod_name, image_name,
+ container_port_list, cmd_list, arg_list, env_dict)
+ if not is_success:
+ print 'Error in creating Pod'
+ return False
+
+ is_success = create_service(
+ kube_host,
+ kube_port,
+ namespace,
+ pod_name, # Use pod_name for service
+ pod_name,
+ container_port_list, # Service port list same as container port list
+ container_port_list,
+ is_headless_service)
+ if not is_success:
+ print 'Error in creating Service'
+ return False
+
+ print 'Successfully created the pod/service %s' % pod_name
+ return True
+
+
+def delete_pod_and_service(kube_host, kube_port, namespace, pod_name):
+ """ A helper function that calls delete_pod and delete_service """
+ is_success = delete_pod(kube_host, kube_port, namespace, pod_name)
+ if not is_success:
+ print 'Error in deleting pod %s' % pod_name
+ return False
+
+ # Note: service name assumed to the the same as pod name
+ is_success = delete_service(kube_host, kube_port, namespace, pod_name)
+ if not is_success:
+ print 'Error in deleting service %s' % pod_name
+ return False
+
+ print 'Successfully deleted the Pod/Service: %s' % pod_name
+ return True
diff --git a/tools/jenkins/build_interop_stress_image.sh b/tools/jenkins/build_interop_stress_image.sh
index 92f2dab5e3..501dc5b7ca 100755
--- a/tools/jenkins/build_interop_stress_image.sh
+++ b/tools/jenkins/build_interop_stress_image.sh
@@ -35,6 +35,8 @@ set -x
# Params:
# INTEROP_IMAGE - name of tag of the final interop image
+# INTEROP_IMAGE_TAG - Optional. If set, the created image will be tagged using
+# the command: 'docker tag $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG'
# BASE_NAME - base name used to locate the base Dockerfile and build script
# TTY_FLAG - optional -t flag to make docker allocate tty
# BUILD_INTEROP_DOCKER_EXTRA_ARGS - optional args to be passed to the
@@ -77,6 +79,7 @@ CONTAINER_NAME="build_${BASE_NAME}_$(uuidgen)"
$BASE_IMAGE \
bash -l /var/local/jenkins/grpc/tools/dockerfile/$BASE_NAME/build_interop_stress.sh \
&& docker commit $CONTAINER_NAME $INTEROP_IMAGE \
+ && ( if [ -n "$INTEROP_IMAGE_REPOSITORY_TAG" ]; then docker tag -f $INTEROP_IMAGE $INTEROP_IMAGE_REPOSITORY_TAG ; fi ) \
&& echo "Successfully built image $INTEROP_IMAGE")
EXITCODE=$?
diff --git a/tools/run_tests/distribtest_targets.py b/tools/run_tests/distribtest_targets.py
index 933103f0a0..34cc1cd710 100644
--- a/tools/run_tests/distribtest_targets.py
+++ b/tools/run_tests/distribtest_targets.py
@@ -96,6 +96,15 @@ class CSharpDistribTest(object):
return create_jobspec(self.name,
['test/distrib/csharp/run_distrib_test.sh'],
environ={'EXTERNAL_GIT_ROOT': '../../..'})
+ elif self.platform == 'windows':
+ if self.arch == 'x64':
+ environ={'MSBUILD_EXTRA_ARGS': '/p:Platform=x64',
+ 'DISTRIBTEST_OUTPATH': 'DistribTest\\bin\\x64\\Debug'}
+ else:
+ environ={'DISTRIBTEST_OUTPATH': 'DistribTest\\bin\\\Debug'}
+ return create_jobspec(self.name,
+ ['test\\distrib\\csharp\\run_distrib_test.bat'],
+ environ=environ)
else:
raise Exception("Not supported yet.")
@@ -240,6 +249,8 @@ def targets():
CSharpDistribTest('linux', 'x64', 'ubuntu1510'),
CSharpDistribTest('linux', 'x64', 'ubuntu1604'),
CSharpDistribTest('macos', 'x86'),
+ CSharpDistribTest('windows', 'x86'),
+ CSharpDistribTest('windows', 'x64'),
PythonDistribTest('linux', 'x64', 'wheezy'),
PythonDistribTest('linux', 'x64', 'jessie'),
PythonDistribTest('linux', 'x86', 'jessie'),
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index adf178bb3c..a3b246dc08 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -384,7 +384,8 @@ class Jobset(object):
self._travis,
self._add_env)
self._running.add(job)
- self.resultset[job.GetSpec().shortname] = []
+ if not self.resultset.has_key(job.GetSpec().shortname):
+ self.resultset[job.GetSpec().shortname] = []
return True
def reap(self):
diff --git a/tools/run_tests/stress_test/run_stress_tests_on_gke.py b/tools/run_tests/stress_test/run_stress_tests_on_gke.py
new file mode 100755
index 0000000000..634eb1aca5
--- /dev/null
+++ b/tools/run_tests/stress_test/run_stress_tests_on_gke.py
@@ -0,0 +1,556 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import argparse
+import datetime
+import os
+import subprocess
+import sys
+import time
+
+stress_test_utils_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '../../gcp/stress_test'))
+sys.path.append(stress_test_utils_dir)
+from stress_test_utils import BigQueryHelper
+
+kubernetes_api_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '../../gcp/utils'))
+sys.path.append(kubernetes_api_dir)
+
+import kubernetes_api
+
+_GRPC_ROOT = os.path.abspath(os.path.join(
+ os.path.dirname(sys.argv[0]), '../../..'))
+os.chdir(_GRPC_ROOT)
+
+# num of seconds to wait for the GKE image to start and warmup
+_GKE_IMAGE_WARMUP_WAIT_SECS = 60
+
+_SERVER_POD_NAME = 'stress-server'
+_CLIENT_POD_NAME_PREFIX = 'stress-client'
+_DATASET_ID_PREFIX = 'stress_test'
+_SUMMARY_TABLE_ID = 'summary'
+_QPS_TABLE_ID = 'qps'
+
+_DEFAULT_DOCKER_IMAGE_NAME = 'grpc_stress_test'
+
+# The default port on which the kubernetes proxy server is started on localhost
+# (i.e kubectl proxy --port=<port>)
+_DEFAULT_KUBERNETES_PROXY_PORT = 8001
+
+# How frequently should the stress client wrapper script (running inside a GKE
+# container) poll the health of the stress client (also running inside the GKE
+# container) and upload metrics to BigQuery
+_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS = 60
+
+# The default setting for stress test server and client
+_DEFAULT_STRESS_SERVER_PORT = 8080
+_DEFAULT_METRICS_PORT = 8081
+_DEFAULT_TEST_CASES_STR = 'empty_unary:1,large_unary:1,client_streaming:1,server_streaming:1,empty_stream:1'
+_DEFAULT_NUM_CHANNELS_PER_SERVER = 5
+_DEFAULT_NUM_STUBS_PER_CHANNEL = 10
+_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS = 30
+
+# Number of stress client instances to launch
+_DEFAULT_NUM_CLIENTS = 3
+
+# How frequently should this test monitor the health of Stress clients and
+# Servers running in GKE
+_DEFAULT_TEST_POLL_INTERVAL_SECS = 60
+
+# Default run time for this test (2 hour)
+_DEFAULT_TEST_DURATION_SECS = 7200
+
+# The number of seconds it would take a GKE pod to warm up (i.e get to 'Running'
+# state from the time of creation). Ideally this is something the test should
+# automatically determine by using Kubernetes API to poll the pods status.
+_DEFAULT_GKE_WARMUP_SECS = 60
+
+
+class KubernetesProxy:
+ """ Class to start a proxy on localhost to the Kubernetes API server """
+
+ def __init__(self, api_port):
+ self.port = api_port
+ self.p = None
+ self.started = False
+
+ def start(self):
+ cmd = ['kubectl', 'proxy', '--port=%d' % self.port]
+ self.p = subprocess.Popen(args=cmd)
+ self.started = True
+ time.sleep(2)
+ print '..Started'
+
+ def get_port(self):
+ return self.port
+
+ def is_started(self):
+ return self.started
+
+ def __del__(self):
+ if self.p is not None:
+ print 'Shutting down Kubernetes proxy..'
+ self.p.kill()
+
+
+class TestSettings:
+
+ def __init__(self, build_docker_image, test_poll_interval_secs,
+ test_duration_secs, kubernetes_proxy_port):
+ self.build_docker_image = build_docker_image
+ self.test_poll_interval_secs = test_poll_interval_secs
+ self.test_duration_secs = test_duration_secs
+ self.kubernetes_proxy_port = kubernetes_proxy_port
+
+
+class GkeSettings:
+
+ def __init__(self, project_id, docker_image_name):
+ self.project_id = project_id
+ self.docker_image_name = docker_image_name
+ self.tag_name = 'gcr.io/%s/%s' % (project_id, docker_image_name)
+
+
+class BigQuerySettings:
+
+ def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id):
+ self.run_id = run_id
+ self.dataset_id = dataset_id
+ self.summary_table_id = summary_table_id
+ self.qps_table_id = qps_table_id
+
+
+class StressServerSettings:
+
+ def __init__(self, server_pod_name, server_port):
+ self.server_pod_name = server_pod_name
+ self.server_port = server_port
+
+
+class StressClientSettings:
+
+ def __init__(self, num_clients, client_pod_name_prefix, server_pod_name,
+ server_port, metrics_port, metrics_collection_interval_secs,
+ stress_client_poll_interval_secs, num_channels_per_server,
+ num_stubs_per_channel, test_cases_str):
+ self.num_clients = num_clients
+ self.client_pod_name_prefix = client_pod_name_prefix
+ self.server_pod_name = server_pod_name
+ self.server_port = server_port
+ self.metrics_port = metrics_port
+ self.metrics_collection_interval_secs = metrics_collection_interval_secs
+ self.stress_client_poll_interval_secs = stress_client_poll_interval_secs
+ self.num_channels_per_server = num_channels_per_server
+ self.num_stubs_per_channel = num_stubs_per_channel
+ self.test_cases_str = test_cases_str
+
+ # == Derived properties ==
+ # Note: Client can accept a list of server addresses (a comma separated list
+ # of 'server_name:server_port'). In this case, we only have one server
+ # address to pass
+ self.server_addresses = '%s.default.svc.cluster.local:%d' % (
+ server_pod_name, server_port)
+ self.client_pod_names_list = ['%s-%d' % (client_pod_name_prefix, i)
+ for i in range(1, num_clients + 1)]
+
+
+def _build_docker_image(image_name, tag_name):
+ """ Build the docker image and add tag it to the GKE repository """
+ print 'Building docker image: %s' % image_name
+ os.environ['INTEROP_IMAGE'] = image_name
+ os.environ['INTEROP_IMAGE_REPOSITORY_TAG'] = tag_name
+ # Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script
+ # build_interop_stress_image.sh invokes the following script:
+ # tools/dockerfile/$BASE_NAME/build_interop_stress.sh
+ os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx'
+ cmd = ['tools/jenkins/build_interop_stress_image.sh']
+ retcode = subprocess.call(args=cmd)
+ if retcode != 0:
+ print 'Error in building docker image'
+ return False
+ return True
+
+
+def _push_docker_image_to_gke_registry(docker_tag_name):
+ """Executes 'gcloud docker push <docker_tag_name>' to push the image to GKE registry"""
+ cmd = ['gcloud', 'docker', 'push', docker_tag_name]
+ print 'Pushing %s to GKE registry..' % docker_tag_name
+ retcode = subprocess.call(args=cmd)
+ if retcode != 0:
+ print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name
+ return False
+ return True
+
+
+def _launch_server(gke_settings, stress_server_settings, bq_settings,
+ kubernetes_proxy):
+ """ Launches a stress test server instance in GKE cluster """
+ if not kubernetes_proxy.is_started:
+ print 'Kubernetes proxy must be started before calling this function'
+ return False
+
+ # This is the wrapper script that is run in the container. This script runs
+ # the actual stress test server
+ server_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_server.py']
+
+ # run_server.py does not take any args from the command line. The args are
+ # instead passed via environment variables (see server_env below)
+ server_arg_list = []
+
+ # The parameters to the script run_server.py are injected into the container
+ # via environment variables
+ server_env = {
+ 'STRESS_TEST_IMAGE_TYPE': 'SERVER',
+ 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server',
+ 'STRESS_TEST_ARGS_STR': '--port=%s' % stress_server_settings.server_port,
+ 'RUN_ID': bq_settings.run_id,
+ 'POD_NAME': stress_server_settings.server_pod_name,
+ 'GCP_PROJECT_ID': gke_settings.project_id,
+ 'DATASET_ID': bq_settings.dataset_id,
+ 'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
+ 'QPS_TABLE_ID': bq_settings.qps_table_id
+ }
+
+ # Launch Server
+ is_success = kubernetes_api.create_pod_and_service(
+ 'localhost',
+ kubernetes_proxy.get_port(),
+ 'default', # Use 'default' namespace
+ stress_server_settings.server_pod_name,
+ gke_settings.tag_name,
+ [stress_server_settings.server_port], # Port that should be exposed
+ server_cmd_list,
+ server_arg_list,
+ server_env,
+ True # Headless = True for server. Since we want DNS records to be created by GKE
+ )
+
+ return is_success
+
+
+def _launch_client(gke_settings, stress_server_settings, stress_client_settings,
+ bq_settings, kubernetes_proxy):
+ """ Launches a configurable number of stress test clients on GKE cluster """
+ if not kubernetes_proxy.is_started:
+ print 'Kubernetes proxy must be started before calling this function'
+ return False
+
+ stress_client_arg_list = [
+ '--server_addresses=%s' % stress_client_settings.server_addresses,
+ '--test_cases=%s' % stress_client_settings.test_cases_str,
+ '--num_stubs_per_channel=%d' %
+ stress_client_settings.num_stubs_per_channel
+ ]
+
+ # This is the wrapper script that is run in the container. This script runs
+ # the actual stress client
+ client_cmd_list = ['/var/local/git/grpc/tools/gcp/stress_test/run_client.py']
+
+ # run_client.py takes no args. All args are passed as env variables (see
+ # client_env)
+ client_arg_list = []
+
+ metrics_server_address = 'localhost:%d' % stress_client_settings.metrics_port
+ metrics_client_arg_list = [
+ '--metrics_server_address=%s' % metrics_server_address,
+ '--total_only=true'
+ ]
+
+ # The parameters to the script run_client.py are injected into the container
+ # via environment variables
+ client_env = {
+ 'STRESS_TEST_IMAGE_TYPE': 'CLIENT',
+ 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test',
+ 'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list),
+ 'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client',
+ 'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list),
+ 'RUN_ID': bq_settings.run_id,
+ 'POLL_INTERVAL_SECS':
+ str(stress_client_settings.stress_client_poll_interval_secs),
+ 'GCP_PROJECT_ID': gke_settings.project_id,
+ 'DATASET_ID': bq_settings.dataset_id,
+ 'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
+ 'QPS_TABLE_ID': bq_settings.qps_table_id
+ }
+
+ for pod_name in stress_client_settings.client_pod_names_list:
+ client_env['POD_NAME'] = pod_name
+ is_success = kubernetes_api.create_pod_and_service(
+ 'localhost', # Since proxy is running on localhost
+ kubernetes_proxy.get_port(),
+ 'default', # default namespace
+ pod_name,
+ gke_settings.tag_name,
+ [stress_client_settings.metrics_port
+ ], # Client pods expose metrics port
+ client_cmd_list,
+ client_arg_list,
+ client_env,
+ False # Client is not a headless service
+ )
+ if not is_success:
+ print 'Error in launching client %s' % pod_name
+ return False
+
+ return True
+
+
+def _launch_server_and_client(gke_settings, stress_server_settings,
+ stress_client_settings, bq_settings,
+ kubernetes_proxy_port):
+ # Start kubernetes proxy
+ print 'Kubernetes proxy'
+ kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
+ kubernetes_proxy.start()
+
+ print 'Launching server..'
+ is_success = _launch_server(gke_settings, stress_server_settings, bq_settings,
+ kubernetes_proxy)
+ if not is_success:
+ print 'Error in launching server'
+ return False
+
+ # Server takes a while to start.
+ # TODO(sree) Use Kubernetes API to query the status of the server instead of
+ # sleeping
+ print 'Waiting for %s seconds for the server to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
+ time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
+
+ # Launch client
+ client_pod_name_prefix = 'stress-client'
+ is_success = _launch_client(gke_settings, stress_server_settings,
+ stress_client_settings, bq_settings,
+ kubernetes_proxy)
+
+ if not is_success:
+ print 'Error in launching client(s)'
+ return False
+
+ print 'Waiting for %s seconds for the client images to start...' % _GKE_IMAGE_WARMUP_WAIT_SECS
+ time.sleep(_GKE_IMAGE_WARMUP_WAIT_SECS)
+ return True
+
+
+def _delete_server_and_client(stress_server_settings, stress_client_settings,
+ kubernetes_proxy_port):
+ kubernetes_proxy = KubernetesProxy(kubernetes_proxy_port)
+ kubernetes_proxy.start()
+
+ # Delete clients first
+ is_success = True
+ for pod_name in stress_client_settings.client_pod_names_list:
+ is_success = kubernetes_api.delete_pod_and_service(
+ 'localhost', kubernetes_proxy_port, 'default', pod_name)
+ if not is_success:
+ return False
+
+ # Delete server
+ is_success = kubernetes_api.delete_pod_and_service(
+ 'localhost', kubernetes_proxy_port, 'default',
+ stress_server_settings.server_pod_name)
+ return is_success
+
+
+def run_test_main(test_settings, gke_settings, stress_server_settings,
+ stress_client_clients):
+ is_success = True
+
+ if test_settings.build_docker_image:
+ is_success = _build_docker_image(gke_settings.docker_image_name,
+ gke_settings.tag_name)
+ if not is_success:
+ return False
+
+ is_success = _push_docker_image_to_gke_registry(gke_settings.tag_name)
+ if not is_success:
+ return False
+
+ # Create a unique id for this run (Note: Using timestamp instead of UUID to
+ # make it easier to deduce the date/time of the run just by looking at the run
+ # run id. This is useful in debugging when looking at records in Biq query)
+ run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
+ dataset_id = '%s_%s' % (_DATASET_ID_PREFIX, run_id)
+
+ # Big Query settings (common for both Stress Server and Client)
+ bq_settings = BigQuerySettings(run_id, dataset_id, _SUMMARY_TABLE_ID,
+ _QPS_TABLE_ID)
+
+ bq_helper = BigQueryHelper(run_id, '', '', args.project_id, dataset_id,
+ _SUMMARY_TABLE_ID, _QPS_TABLE_ID)
+ bq_helper.initialize()
+
+ try:
+ is_success = _launch_server_and_client(gke_settings, stress_server_settings,
+ stress_client_settings, bq_settings,
+ test_settings.kubernetes_proxy_port)
+ if not is_success:
+ return False
+
+ start_time = datetime.datetime.now()
+ end_time = start_time + datetime.timedelta(
+ seconds=test_settings.test_duration_secs)
+ print 'Running the test until %s' % end_time.isoformat()
+
+ while True:
+ if datetime.datetime.now() > end_time:
+ print 'Test was run for %d seconds' % test_settings.test_duration_secs
+ break
+
+ # Check if either stress server or clients have failed
+ if bq_helper.check_if_any_tests_failed():
+ is_success = False
+ print 'Some tests failed.'
+ break
+
+ # Things seem to be running fine. Wait until next poll time to check the
+ # status
+ print 'Sleeping for %d seconds..' % test_settings.test_poll_interval_secs
+ time.sleep(test_settings.test_poll_interval_secs)
+
+ # Print BiqQuery tables
+ bq_helper.print_summary_records()
+ bq_helper.print_qps_records()
+
+ finally:
+ # If is_success is False at this point, it means that the stress tests were
+ # started successfully but failed while running the tests. In this case we
+ # do should not delete the pods (since they contain all the failure
+ # information)
+ if is_success:
+ _delete_server_and_client(stress_server_settings, stress_client_settings,
+ test_settings.kubernetes_proxy_port)
+
+ return is_success
+
+
+argp = argparse.ArgumentParser(
+ description='Launch stress tests in GKE',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+argp.add_argument('--project_id',
+ required=True,
+ help='The Google Cloud Platform Project Id')
+argp.add_argument('--num_clients',
+ default=1,
+ type=int,
+ help='Number of client instances to start')
+argp.add_argument('--docker_image_name',
+ default=_DEFAULT_DOCKER_IMAGE_NAME,
+ help='The name of the docker image containing stress client '
+ 'and stress servers')
+argp.add_argument('--build_docker_image',
+ dest='build_docker_image',
+ action='store_true',
+ help='Build a docker image and push to Google Container '
+ 'Registry')
+argp.add_argument('--do_not_build_docker_image',
+ dest='build_docker_image',
+ action='store_false',
+ help='Do not build and push docker image to Google Container '
+ 'Registry')
+argp.set_defaults(build_docker_image=True)
+
+argp.add_argument('--test_poll_interval_secs',
+ default=_DEFAULT_TEST_POLL_INTERVAL_SECS,
+ type=int,
+ help='How frequently should this script should monitor the '
+ 'health of stress clients and servers running in the GKE '
+ 'cluster')
+argp.add_argument('--test_duration_secs',
+ default=_DEFAULT_TEST_DURATION_SECS,
+ type=int,
+ help='How long should this test be run')
+argp.add_argument('--kubernetes_proxy_port',
+ default=_DEFAULT_KUBERNETES_PROXY_PORT,
+ type=int,
+ help='The port on which the kubernetes proxy (on localhost)'
+ ' is started')
+argp.add_argument('--stress_server_port',
+ default=_DEFAULT_STRESS_SERVER_PORT,
+ type=int,
+ help='The port on which the stress server (in GKE '
+ 'containers) listens')
+argp.add_argument('--stress_client_metrics_port',
+ default=_DEFAULT_METRICS_PORT,
+ type=int,
+ help='The port on which the stress clients (in GKE '
+ 'containers) expose metrics')
+argp.add_argument('--stress_client_poll_interval_secs',
+ default=_DEFAULT_STRESS_CLIENT_POLL_INTERVAL_SECS,
+ type=int,
+ help='How frequently should the stress client wrapper script'
+ ' running inside GKE should monitor health of the actual '
+ ' stress client process and upload the metrics to BigQuery')
+argp.add_argument('--stress_client_metrics_collection_interval_secs',
+ default=_DEFAULT_METRICS_COLLECTION_INTERVAL_SECS,
+ type=int,
+ help='How frequently should metrics be collected in-memory on'
+ ' the stress clients (running inside GKE containers). Note '
+ 'that this is NOT the same as the upload-to-BigQuery '
+ 'frequency. The metrics upload frequency is controlled by the'
+ ' --stress_client_poll_interval_secs flag')
+argp.add_argument('--stress_client_num_channels_per_server',
+ default=_DEFAULT_NUM_CHANNELS_PER_SERVER,
+ type=int,
+ help='The number of channels created to each server from a '
+ 'stress client')
+argp.add_argument('--stress_client_num_stubs_per_channel',
+ default=_DEFAULT_NUM_STUBS_PER_CHANNEL,
+ type=int,
+ help='The number of stubs created per channel. This number '
+ 'indicates the max number of RPCs that can be made in '
+ 'parallel on each channel at any given time')
+argp.add_argument('--stress_client_test_cases',
+ default=_DEFAULT_TEST_CASES_STR,
+ help='List of test cases (with weights) to be executed by the'
+ ' stress test client. The list is in the following format:\n'
+ ' <testcase_1:w_1,<test_case2:w_2>..<testcase_n:w_n>\n'
+ ' (Note: The weights do not have to add up to 100)')
+
+if __name__ == '__main__':
+ args = argp.parse_args()
+
+ test_settings = TestSettings(
+ args.build_docker_image, args.test_poll_interval_secs,
+ args.test_duration_secs, args.kubernetes_proxy_port)
+
+ gke_settings = GkeSettings(args.project_id, args.docker_image_name)
+
+ stress_server_settings = StressServerSettings(_SERVER_POD_NAME,
+ args.stress_server_port)
+ stress_client_settings = StressClientSettings(
+ args.num_clients, _CLIENT_POD_NAME_PREFIX, _SERVER_POD_NAME,
+ args.stress_server_port, args.stress_client_metrics_port,
+ args.stress_client_metrics_collection_interval_secs,
+ args.stress_client_poll_interval_secs,
+ args.stress_client_num_channels_per_server,
+ args.stress_client_num_stubs_per_channel, args.stress_client_test_cases)
+
+ run_test_main(test_settings, gke_settings, stress_server_settings,
+ stress_client_settings)