aboutsummaryrefslogtreecommitdiffhomepage
path: root/tools
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2016-02-19 03:02:16 -0800
committerGravatar Sree Kuchibhotla <sreek@google.com>2016-02-24 11:15:53 -0800
commit559e45becd0a50bd6af850900abbb2b5759f8719 (patch)
tree3b8b052fd7d0b6bcff102601ef857c43d0b17ad5 /tools
parent44ca2c26409a172b80bc9f40f7578f3eaf1d135d (diff)
Scripts to launch stress tests in GKE
Diffstat (limited to 'tools')
-rwxr-xr-x[-rw-r--r--]tools/big_query/big_query_utils.py (renamed from tools/gke/big_query_utils.py)135
-rwxr-xr-xtools/gke/create_client.py108
-rwxr-xr-xtools/gke/create_server.py74
-rwxr-xr-xtools/gke/delete_client.py66
-rwxr-xr-xtools/gke/delete_server.py58
-rwxr-xr-xtools/gke/kubernetes_api.py5
-rwxr-xr-xtools/gke/run_stress_tests_on_gke.py389
-rwxr-xr-xtools/run_tests/stress_test/run_client.py188
-rwxr-xr-xtools/run_tests/stress_test/run_server.py115
-rwxr-xr-xtools/run_tests/stress_test/stress_test_utils.py192
-rwxr-xr-xtools/run_tests/stress_test_wrapper.py96
11 files changed, 934 insertions, 492 deletions
diff --git a/tools/gke/big_query_utils.py b/tools/big_query/big_query_utils.py
index ebcf9d6ec3..267d019850 100644..100755
--- a/tools/gke/big_query_utils.py
+++ b/tools/big_query/big_query_utils.py
@@ -1,3 +1,33 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016 Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
import argparse
import json
import uuid
@@ -10,14 +40,14 @@ from oauth2client.client import GoogleCredentials
NUM_RETRIES = 3
-def create_bq():
+def create_big_query():
"""Authenticates with cloud platform and gets a BiqQuery service object
"""
creds = GoogleCredentials.get_application_default()
return discovery.build('bigquery', 'v2', credentials=creds)
-def create_ds(biq_query, project_id, dataset_id):
+def create_dataset(biq_query, project_id, dataset_id):
is_success = True
body = {
'datasetReference': {
@@ -25,6 +55,7 @@ def create_ds(biq_query, project_id, dataset_id):
'datasetId': dataset_id
}
}
+
try:
dataset_req = biq_query.datasets().insert(projectId=project_id, body=body)
dataset_req.execute(num_retries=NUM_RETRIES)
@@ -38,21 +69,18 @@ def create_ds(biq_query, project_id, dataset_id):
return is_success
-def make_field(field_name, field_type, field_description):
- return {
- 'name': field_name,
- 'type': field_type,
- 'description': field_description
- }
-
-
-def create_table(big_query, project_id, dataset_id, table_id, fields_list,
+def create_table(big_query, project_id, dataset_id, table_id, table_schema,
description):
is_success = True
+
body = {
'description': description,
'schema': {
- 'fields': fields_list
+ 'fields': [{
+ 'name': field_name,
+ 'type': field_type,
+ 'description': field_description
+ } for (field_name, field_type, field_description) in table_schema]
},
'tableReference': {
'datasetId': dataset_id,
@@ -60,6 +88,7 @@ def create_table(big_query, project_id, dataset_id, table_id, fields_list,
'tableId': table_id
}
}
+
try:
table_req = big_query.tables().insert(projectId=project_id,
datasetId=dataset_id,
@@ -91,43 +120,8 @@ def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
is_success = False
return is_success
-#####################
-
-def make_emp_row(emp_id, emp_name, emp_email):
- return {
- 'insertId': str(emp_id),
- 'json': {
- 'emp_id': emp_id,
- 'emp_name': emp_name,
- 'emp_email_id': emp_email
- }
- }
-
-
-def get_emp_table_fields_list():
- return [
- make_field('emp_id', 'INTEGER', 'Employee id'),
- make_field('emp_name', 'STRING', 'Employee name'),
- make_field('emp_email_id', 'STRING', 'Employee email id')
- ]
-
-
-def insert_emp_rows(big_query, project_id, dataset_id, table_id, start_idx,
- num_rows):
- rows_list = [make_emp_row(i, 'sree_%d' % i, 'sreecha_%d@gmail.com' % i)
- for i in range(start_idx, start_idx + num_rows)]
- insert_rows(big_query, project_id, dataset_id, table_id, rows_list)
-
-
-def create_emp_table(big_query, project_id, dataset_id, table_id):
- fields_list = get_emp_table_fields_list()
- description = 'Test table created by sree'
- create_table(big_query, project_id, dataset_id, table_id, fields_list,
- description)
-
-
-def sync_query(big_query, project_id, query, timeout=5000):
+def sync_query_job(big_query, project_id, query, timeout=5000):
query_data = {'query': query, 'timeoutMs': timeout}
query_job = None
try:
@@ -139,43 +133,8 @@ def sync_query(big_query, project_id, query, timeout=5000):
print http_error.content
return query_job
-#[Start query_emp_records]
-def query_emp_records(big_query, project_id, dataset_id, table_id):
- query = 'SELECT emp_id, emp_name FROM %s.%s ORDER BY emp_id;' % (dataset_id, table_id)
- print query
- query_job = sync_query(big_query, project_id, query, 5000)
- job_id = query_job['jobReference']
-
- print query_job
- print '**Starting paging **'
- #[Start Paging]
- page_token = None
- while True:
- page = big_query.jobs().getQueryResults(
- pageToken=page_token,
- **query_job['jobReference']).execute(num_retries=NUM_RETRIES)
- rows = page['rows']
- for row in rows:
- print row['f'][0]['v'], "---", row['f'][1]['v']
- page_token = page.get('pageToken')
- if not page_token:
- break
- #[End Paging]
-#[End query_emp_records]
-
-#########################
-DATASET_SEQ_NUM = 1
-TABLE_SEQ_NUM = 11
-
-PROJECT_ID = 'sree-gce'
-DATASET_ID = 'sree_test_dataset_%d' % DATASET_SEQ_NUM
-TABLE_ID = 'sree_test_table_%d' % TABLE_SEQ_NUM
-
-EMP_ROW_IDX = 10
-EMP_NUM_ROWS = 5
-
-bq = create_bq()
-create_ds(bq, PROJECT_ID, DATASET_ID)
-create_emp_table(bq, PROJECT_ID, DATASET_ID, TABLE_ID)
-insert_emp_rows(bq, PROJECT_ID, DATASET_ID, TABLE_ID, EMP_ROW_IDX, EMP_NUM_ROWS)
-query_emp_records(bq, PROJECT_ID, DATASET_ID, TABLE_ID)
+ # List of (column name, column type, description) tuples
+def make_row(unique_row_id, row_values_dict):
+ """row_values_dict is a dictionar of column name and column value.
+ """
+ return {'insertId': unique_row_id, 'json': row_values_dict}
diff --git a/tools/gke/create_client.py b/tools/gke/create_client.py
deleted file mode 100755
index bc56ef0ef1..0000000000
--- a/tools/gke/create_client.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import argparse
-
-import kubernetes_api
-
-argp = argparse.ArgumentParser(description='Launch Stress tests in GKE')
-
-argp.add_argument('-n',
- '--num_instances',
- required=True,
- type=int,
- help='The number of instances to launch in GKE')
-args = argp.parse_args()
-
-kubernetes_api_server="localhost"
-kubernetes_api_port=8001
-
-
-# Docker image
-image_name="gcr.io/sree-gce/grpc_stress_test_2"
-
-server_address = "stress-server.default.svc.cluster.local:8080"
-metrics_server_address = "localhost:8081"
-
-stress_test_arg_list=[
- "--server_addresses=" + server_address,
- "--test_cases=empty_unary:20,large_unary:20",
- "--num_stubs_per_channel=10"
-]
-
-metrics_client_arg_list=[
- "--metrics_server_address=" + metrics_server_address,
- "--total_only=true"]
-
-env_dict={
- "GPRC_ROOT": "/var/local/git/grpc",
- "STRESS_TEST_IMAGE": "/var/local/git/grpc/bins/opt/stress_test",
- "STRESS_TEST_ARGS_STR": ' '.join(stress_test_arg_list),
- "METRICS_CLIENT_IMAGE": "/var/local/git/grpc/bins/opt/metrics_client",
- "METRICS_CLIENT_ARGS_STR": ' '.join(metrics_client_arg_list)}
-
-cmd_list=["/var/local/git/grpc/bins/opt/stress_test"]
-arg_list=stress_test_arg_list # make this [] in future
-port_list=[8081]
-
-namespace = 'default'
-is_headless_service = False # Client is NOT headless service
-
-print('Creating %d instances of client..' % args.num_instances)
-
-for i in range(1, args.num_instances + 1):
- service_name = 'stress-client-%d' % i
- pod_name = service_name # Use the same name for kubernetes Service and Pod
- is_success = kubernetes_api.create_pod(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- pod_name,
- image_name,
- port_list,
- cmd_list,
- arg_list,
- env_dict)
- if not is_success:
- print("Error in creating pod %s" % pod_name)
- else:
- is_success = kubernetes_api.create_service(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- service_name,
- pod_name,
- port_list, # Service port list
- port_list, # Container port list (same as service port list)
- is_headless_service)
- if not is_success:
- print("Error in creating service %s" % service_name)
- else:
- print("Created client %s" % pod_name)
diff --git a/tools/gke/create_server.py b/tools/gke/create_server.py
deleted file mode 100755
index 23ab62c205..0000000000
--- a/tools/gke/create_server.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import argparse
-
-import kubernetes_api
-
-service_name = 'stress-server'
-pod_name = service_name # Use the same name for kubernetes Service and Pod
-namespace = 'default'
-is_headless_service = True
-cmd_list=['/var/local/git/grpc/bins/opt/interop_server']
-arg_list=['--port=8080']
-port_list=[8080]
-image_name='gcr.io/sree-gce/grpc_stress_test_2'
-env_dict={}
-
-# Make sure you run kubectl proxy --port=8001
-kubernetes_api_server='localhost'
-kubernetes_api_port=8001
-
-is_success = kubernetes_api.create_pod(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- pod_name,
- image_name,
- port_list,
- cmd_list,
- arg_list,
- env_dict)
-if not is_success:
- print("Error in creating pod")
-else:
- is_success = kubernetes_api.create_service(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- service_name,
- pod_name,
- port_list, # Service port list
- port_list, # Container port list (same as service port list)
- is_headless_service)
- if not is_success:
- print("Error in creating service")
- else:
- print("Successfully created the Server")
diff --git a/tools/gke/delete_client.py b/tools/gke/delete_client.py
deleted file mode 100755
index aa519f26b8..0000000000
--- a/tools/gke/delete_client.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import argparse
-
-import kubernetes_api
-
-argp = argparse.ArgumentParser(description='Delete Stress test clients in GKE')
-argp.add_argument('-n',
- '--num_instances',
- required=True,
- type=int,
- help='The number of instances currently running')
-
-args = argp.parse_args()
-for i in range(1, args.num_instances + 1):
- service_name = 'stress-client-%d' % i
- pod_name = service_name
- namespace = 'default'
- kubernetes_api_server="localhost"
- kubernetes_api_port=8001
-
- is_success=kubernetes_api.delete_pod(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- pod_name)
- if not is_success:
- print('Error in deleting Pod %s' % pod_name)
- else:
- is_success= kubernetes_api.delete_service(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- service_name)
- if not is_success:
- print('Error in deleting Service %s' % service_name)
- else:
- print('Deleted %s' % pod_name)
diff --git a/tools/gke/delete_server.py b/tools/gke/delete_server.py
deleted file mode 100755
index 6e3fdcc33b..0000000000
--- a/tools/gke/delete_server.py
+++ /dev/null
@@ -1,58 +0,0 @@
-#!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import argparse
-
-import kubernetes_api
-
-service_name = 'stress-server'
-pod_name = service_name # Use the same name for kubernetes Service and Pod
-namespace = 'default'
-is_headless_service = True
-kubernetes_api_server="localhost"
-kubernetes_api_port=8001
-
-is_success = kubernetes_api.delete_pod(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- pod_name)
-if not is_success:
- print("Error in deleting Pod %s" % pod_name)
-else:
- is_success = kubernetes_api.delete_service(
- kubernetes_api_server,
- kubernetes_api_port,
- namespace,
- service_name)
- if not is_success:
- print("Error in deleting Service %d" % service_name)
- else:
- print("Deleted server %s" % service_name)
diff --git a/tools/gke/kubernetes_api.py b/tools/gke/kubernetes_api.py
index 14d724bd31..d14c26ad6a 100755
--- a/tools/gke/kubernetes_api.py
+++ b/tools/gke/kubernetes_api.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python2.7
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016 Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -50,7 +50,8 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
'name': pod_name,
'image': image_name,
'ports': [{'containerPort': port,
- 'protocol': 'TCP'} for port in container_port_list]
+ 'protocol': 'TCP'} for port in container_port_list],
+ 'imagePullPolicy': 'Always'
}
]
}
diff --git a/tools/gke/run_stress_tests_on_gke.py b/tools/gke/run_stress_tests_on_gke.py
new file mode 100755
index 0000000000..d0c3887a42
--- /dev/null
+++ b/tools/gke/run_stress_tests_on_gke.py
@@ -0,0 +1,389 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+import datetime
+import os
+import subprocess
+import sys
+import time
+
+import kubernetes_api
+
+GRPC_ROOT = os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]), '../..'))
+os.chdir(GRPC_ROOT)
+
+class BigQuerySettings:
+
+ def __init__(self, run_id, dataset_id, summary_table_id, qps_table_id):
+ self.run_id = run_id
+ self.dataset_id = dataset_id
+ self.summary_table_id = summary_table_id
+ self.qps_table_id = qps_table_id
+
+
+class KubernetesProxy:
+ """ Class to start a proxy on localhost to the Kubernetes API server """
+
+ def __init__(self, api_port):
+ self.port = api_port
+ self.p = None
+ self.started = False
+
+ def start(self):
+ cmd = ['kubectl', 'proxy', '--port=%d' % self.port]
+ self.p = subprocess.Popen(args=cmd)
+ self.started = True
+ time.sleep(2)
+ print '..Started'
+
+ def get_port(self):
+ return self.port
+
+ def is_started(self):
+ return self.started
+
+ def __del__(self):
+ if self.p is not None:
+ self.p.kill()
+
+
+def _build_docker_image(image_name, tag_name):
+ """ Build the docker image and add a tag """
+ os.environ['INTEROP_IMAGE'] = image_name
+ # Note that 'BASE_NAME' HAS to be 'grpc_interop_stress_cxx' since the script
+ # build_interop_stress_image.sh invokes the following script:
+ # tools/dockerfile/$BASE_NAME/build_interop_stress.sh
+ os.environ['BASE_NAME'] = 'grpc_interop_stress_cxx'
+ cmd = ['tools/jenkins/build_interop_stress_image.sh']
+ p = subprocess.Popen(args=cmd)
+ retcode = p.wait()
+ if retcode != 0:
+ print 'Error in building docker image'
+ return False
+
+ cmd = ['docker', 'tag', '-f', image_name, tag_name]
+ p = subprocess.Popen(args=cmd)
+ retcode = p.wait()
+ if retcode != 0:
+ print 'Error in creating the tag %s for %s' % (tag_name, image_name)
+ return False
+
+ return True
+
+
+def _push_docker_image_to_gke_registry(docker_tag_name):
+ """Executes 'gcloud docker push <docker_tag_name>' to push the image to GKE registry"""
+ cmd = ['gcloud', 'docker', 'push', docker_tag_name]
+ print 'Pushing %s to GKE registry..' % docker_tag_name
+ p = subprocess.Popen(args=cmd)
+ retcode = p.wait()
+ if retcode != 0:
+ print 'Error in pushing docker image %s to the GKE registry' % docker_tag_name
+ return False
+ return True
+
+
+def _launch_image_on_gke(kubernetes_api_server, kubernetes_api_port, namespace,
+ pod_name, image_name, port_list, cmd_list, arg_list,
+ env_dict, is_headless_service):
+ """Creates a GKE Pod and a Service object for a given image by calling Kubernetes API"""
+ is_success = kubernetes_api.create_pod(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ pod_name,
+ image_name,
+ port_list, # The ports to be exposed on this container/pod
+ cmd_list, # The command that launches the stress server
+ arg_list,
+ env_dict # Environment variables to be passed to the pod
+ )
+ if not is_success:
+ print 'Error in creating Pod'
+ return False
+
+ is_success = kubernetes_api.create_service(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ pod_name, # Use the pod name for service name as well
+ pod_name,
+ port_list, # Service port list
+ port_list, # Container port list (same as service port list)
+ is_headless_service)
+ if not is_success:
+ print 'Error in creating Service'
+ return False
+
+ print 'Successfully created the pod/service %s' % pod_name
+ return True
+
+
+def _delete_image_on_gke(kubernetes_proxy, pod_name_list):
+ """Deletes a GKE Pod and Service object for given list of Pods by calling Kubernetes API"""
+ if not kubernetes_proxy.is_started:
+ print 'Kubernetes proxy must be started before calling this function'
+ return False
+
+ is_success = True
+ for pod_name in pod_name_list:
+ is_success = kubernetes_api.delete_pod(
+ 'localhost', kubernetes_proxy.get_port(), 'default', pod_name)
+ if not is_success:
+ print 'Error in deleting pod %s' % pod_name
+ break
+
+ is_success = kubernetes_api.delete_service(
+ 'localhost', kubernetes_proxy.get_port(), 'default',
+ pod_name) # service name same as pod name
+ if not is_success:
+ print 'Error in deleting service %s' % pod_name
+ break
+
+ if is_success:
+ print 'Successfully deleted the Pods/Services: %s' % ','.join(pod_name_list)
+
+ return is_success
+
+
+def _launch_server(gcp_project_id, docker_image_name, bq_settings,
+ kubernetes_proxy, server_pod_name, server_port):
+ """ Launches a stress test server instance in GKE cluster """
+ if not kubernetes_proxy.is_started:
+ print 'Kubernetes proxy must be started before calling this function'
+ return False
+
+ server_cmd_list = [
+ '/var/local/git/grpc/tools/run_tests/stress_test/run_server.py'
+ ] # Process that is launched
+ server_arg_list = [] # run_server.py does not take any args (for now)
+
+ # == Parameters to the server process launched in GKE ==
+ server_env = {
+ 'STRESS_TEST_IMAGE_TYPE': 'SERVER',
+ 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/interop_server',
+ 'STRESS_TEST_ARGS_STR': '--port=%s' % server_port,
+ 'RUN_ID': bq_settings.run_id,
+ 'POD_NAME': server_pod_name,
+ 'GCP_PROJECT_ID': gcp_project_id,
+ 'DATASET_ID': bq_settings.dataset_id,
+ 'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
+ 'QPS_TABLE_ID': bq_settings.qps_table_id
+ }
+
+ # Launch Server
+ is_success = _launch_image_on_gke(
+ 'localhost',
+ kubernetes_proxy.get_port(),
+ 'default',
+ server_pod_name,
+ docker_image_name,
+ [server_port], # Port that should be exposed on the container
+ server_cmd_list,
+ server_arg_list,
+ server_env,
+ True # Headless = True for server. Since we want DNS records to be greated by GKE
+ )
+
+ return is_success
+
+
+def _launch_client(gcp_project_id, docker_image_name, bq_settings,
+ kubernetes_proxy, num_instances, client_pod_name_prefix,
+ server_pod_name, server_port):
+ """ Launches a configurable number of stress test clients on GKE cluster """
+ if not kubernetes_proxy.is_started:
+ print 'Kubernetes proxy must be started before calling this function'
+ return False
+
+ server_address = '%s.default.svc.cluster.local:%d' % (server_pod_name,
+ server_port)
+ #TODO(sree) Make the whole client args configurable
+ test_cases_str = 'empty_unary:1,large_unary:1'
+ stress_client_arg_list = [
+ '--server_addresses=%s' % server_address,
+ '--test_cases=%s' % test_cases_str, '--num_stubs_per_channel=10'
+ ]
+
+ client_cmd_list = [
+ '/var/local/git/grpc/tools/run_tests/stress_test/run_client.py'
+ ]
+ # run_client.py takes no args. All args are passed as env variables
+ client_arg_list = []
+
+ # TODO(sree) Make this configurable (and also less frequent)
+ poll_interval_secs = 5
+
+ metrics_port = 8081
+ metrics_server_address = 'localhost:%d' % metrics_port
+ metrics_client_arg_list = [
+ '--metrics_server_address=%s' % metrics_server_address,
+ '--total_only=true'
+ ]
+
+ client_env = {
+ 'STRESS_TEST_IMAGE_TYPE': 'CLIENT',
+ 'STRESS_TEST_IMAGE': '/var/local/git/grpc/bins/opt/stress_test',
+ 'STRESS_TEST_ARGS_STR': ' '.join(stress_client_arg_list),
+ 'METRICS_CLIENT_IMAGE': '/var/local/git/grpc/bins/opt/metrics_client',
+ 'METRICS_CLIENT_ARGS_STR': ' '.join(metrics_client_arg_list),
+ 'RUN_ID': bq_settings.run_id,
+ 'POLL_INTERVAL_SECS': str(poll_interval_secs),
+ 'GCP_PROJECT_ID': gcp_project_id,
+ 'DATASET_ID': bq_settings.dataset_id,
+ 'SUMMARY_TABLE_ID': bq_settings.summary_table_id,
+ 'QPS_TABLE_ID': bq_settings.qps_table_id
+ }
+
+ for i in range(1, num_instances + 1):
+ pod_name = '%s-%d' % (client_pod_name_prefix, i)
+ client_env['POD_NAME'] = pod_name
+ is_success = _launch_image_on_gke(
+ 'localhost',
+ kubernetes_proxy.get_port(),
+ 'default',
+ pod_name,
+ docker_image_name,
+ [metrics_port], # Client pods expose metrics port
+ client_cmd_list,
+ client_arg_list,
+ client_env,
+ False # Client is not a headless service.
+ )
+ if not is_success:
+ print 'Error in launching client %s' % pod_name
+ return False
+
+ return True
+
+
+def _launch_server_and_client(gcp_project_id, docker_image_name,
+ num_client_instances):
+ # == Big Query tables related settings (Common for both server and client) ==
+
+ # Create a unique id for this run (Note: Using timestamp instead of UUID to
+ # make it easier to deduce the date/time of the run just by looking at the run
+ # run id. This is useful in debugging when looking at records in Biq query)
+ run_id = datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')
+
+ dataset_id = 'stress_test_%s' % run_id
+ summary_table_id = 'summary'
+ qps_table_id = 'qps'
+
+ bq_settings = BigQuerySettings(run_id, dataset_id, summary_table_id,
+ qps_table_id)
+
+ # Start kubernetes proxy
+ kubernetes_api_port = 9001
+ kubernetes_proxy = KubernetesProxy(kubernetes_api_port)
+ kubernetes_proxy.start()
+
+ server_pod_name = 'stress-server'
+ server_port = 8080
+ is_success = _launch_server(gcp_project_id, docker_image_name, bq_settings,
+ kubernetes_proxy, server_pod_name, server_port)
+ if not is_success:
+ print 'Error in launching server'
+ return False
+
+ # Server takes a while to start.
+ # TODO(sree) Use Kubernetes API to query the status of the server instead of
+ # sleeping
+ time.sleep(60)
+
+ # Launch client
+ server_address = '%s.default.svc.cluster.local:%d' % (server_pod_name,
+ server_port)
+ client_pod_name_prefix = 'stress-client'
+ is_success = _launch_client(gcp_project_id, docker_image_name, bq_settings,
+ kubernetes_proxy, num_client_instances,
+ client_pod_name_prefix, server_pod_name,
+ server_port)
+ if not is_success:
+ print 'Error in launching client(s)'
+ return False
+
+ return True
+
+
+def _delete_server_and_client(num_client_instances):
+ kubernetes_api_port = 9001
+ kubernetes_proxy = KubernetesProxy(kubernetes_api_port)
+ kubernetes_proxy.start()
+
+ # Delete clients first
+ client_pod_names = ['stress-client-%d' % i
+ for i in range(1, num_client_instances + 1)]
+
+ is_success = _delete_image_on_gke(kubernetes_proxy, client_pod_names)
+ if not is_success:
+ return False
+
+ # Delete server
+ server_pod_name = 'stress-server'
+ return _delete_image_on_gke(kubernetes_proxy, [server_pod_name])
+
+
+def _build_and_push_docker_image(gcp_project_id, docker_image_name, tag_name):
+ is_success = _build_docker_image(docker_image_name, tag_name)
+ if not is_success:
+ return False
+ return _push_docker_image_to_gke_registry(tag_name)
+
+
+# TODO(sree): This is just to test the above APIs. Rewrite this to make
+# everything configurable (like image names / number of instances etc)
+def test_run():
+ image_name = 'grpc_stress_test'
+ gcp_project_id = 'sree-gce'
+ tag_name = 'gcr.io/%s/%s' % (gcp_project_id, image_name)
+ num_client_instances = 3
+
+ is_success = _build_docker_image(image_name, tag_name)
+ if not is_success:
+ return
+
+ is_success = _push_docker_image_to_gke_registry(tag_name)
+ if not is_success:
+ return
+
+ is_success = _launch_server_and_client(gcp_project_id, tag_name,
+ num_client_instances)
+
+ # Run the test for 2 mins
+ time.sleep(120)
+
+ is_success = _delete_server_and_client(num_client_instances)
+
+ if not is_success:
+ return
+
+
+if __name__ == '__main__':
+ test_run()
diff --git a/tools/run_tests/stress_test/run_client.py b/tools/run_tests/stress_test/run_client.py
new file mode 100755
index 0000000000..33958bce49
--- /dev/null
+++ b/tools/run_tests/stress_test/run_client.py
@@ -0,0 +1,188 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import EventType
+from stress_test_utils import BigQueryHelper
+
+
+# TODO (sree): Write a python grpc client to directly query the metrics instead
+# of calling metrics_client
+def _get_qps(metrics_cmd):
+ qps = 0
+ try:
+ # Note: gpr_log() writes even non-error messages to stderr stream. So it is
+ # important that we set stderr=subprocess.STDOUT
+ p = subprocess.Popen(args=metrics_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ retcode = p.wait()
+ (out_str, err_str) = p.communicate()
+ if retcode != 0:
+ print 'Error in reading metrics information'
+ print 'Output: ', out_str
+ else:
+ # The overall qps is printed at the end of the line
+ m = re.search('\d+$', out_str)
+ qps = int(m.group()) if m else 0
+ except Exception as ex:
+ print 'Exception while reading metrics information: ' + str(ex)
+ return qps
+
+
+def run_client():
+ """This is a wrapper around the stress test client and performs the following:
+ 1) Create the following two tables in Big Query:
+ (i) Summary table: To record events like the test started, completed
+ successfully or failed
+ (ii) Qps table: To periodically record the QPS sent by this client
+ 2) Start the stress test client and add a row in the Big Query summary
+ table
+ 3) Once every few seconds (as specificed by the poll_interval_secs) poll
+ the status of the stress test client process and perform the
+ following:
+ 3.1) If the process is still running, get the current qps by invoking
+ the metrics client program and add a row in the Big Query
+ Qps table. Sleep for a duration specified by poll_interval_secs
+ 3.2) If the process exited successfully, add a row in the Big Query
+ Summary table and exit
+ 3.3) If the process failed, add a row in Big Query summary table and
+ wait forever.
+ NOTE: This script typically runs inside a GKE pod which means
+ that the pod gets destroyed when the script exits. However, in
+ case the stress test client fails, we would not want the pod to
+ be destroyed (since we might want to connect to the pod for
+ examining logs). This is the reason why the script waits forever
+ in case of failures
+ """
+ env = dict(os.environ)
+ image_type = env['STRESS_TEST_IMAGE_TYPE']
+ image_name = env['STRESS_TEST_IMAGE']
+ args_str = env['STRESS_TEST_ARGS_STR']
+ metrics_client_image = env['METRICS_CLIENT_IMAGE']
+ metrics_client_args_str = env['METRICS_CLIENT_ARGS_STR']
+ run_id = env['RUN_ID']
+ pod_name = env['POD_NAME']
+ logfile_name = env.get('LOGFILE_NAME')
+ poll_interval_secs = float(env['POLL_INTERVAL_SECS'])
+ project_id = env['GCP_PROJECT_ID']
+ dataset_id = env['DATASET_ID']
+ summary_table_id = env['SUMMARY_TABLE_ID']
+ qps_table_id = env['QPS_TABLE_ID']
+
+ bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+ dataset_id, summary_table_id, qps_table_id)
+ bq_helper.initialize()
+
+ # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+ if not bq_helper.setup_tables():
+ print 'Error in creating BigQuery tables'
+ return
+
+ start_time = datetime.datetime.now()
+
+ logfile = None
+ details = 'Logging to stdout'
+ if logfile_name is not None:
+ print 'Opening logfile: %s ...' % logfile_name
+ details = 'Logfile: %s' % logfile_name
+ logfile = open(logfile_name, 'w')
+
+ # Update status that the test is starting (in the status table)
+ bq_helper.insert_summary_row(EventType.STARTING, details)
+
+ metrics_cmd = [metrics_client_image
+ ] + [x for x in metrics_client_args_str.split()]
+ stress_cmd = [image_name] + [x for x in args_str.split()]
+
+ print 'Launching process %s ...' % stress_cmd
+ stress_p = subprocess.Popen(args=stress_cmd,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ qps_history = [1, 1, 1] # Maintain the last 3 qps readings
+ qps_history_idx = 0 # Index into the qps_history list
+
+ is_error = False
+ while True:
+ # Check if stress_client is still running. If so, collect metrics and upload
+ # to BigQuery status table
+ if stress_p.poll() is not None:
+ # TODO(sree) Upload completion status to BigQuery
+ end_time = datetime.datetime.now().isoformat()
+ event_type = EventType.SUCCESS
+ details = 'End time: %s' % end_time
+ if stress_p.returncode != 0:
+ event_type = EventType.FAILURE
+ details = 'Return code = %d. End time: %s' % (stress_p.returncode,
+ end_time)
+ is_error = True
+ bq_helper.insert_summary_row(event_type, details)
+ print details
+ break
+
+ # Stress client still running. Get metrics
+ qps = _get_qps(metrics_cmd)
+ qps_recorded_at = datetime.datetime.now().isoformat()
+ print 'qps: %d at %s' % (qps, qps_recorded_at)
+
+ # If QPS has been zero for the last 3 iterations, flag it as error and exit
+ qps_history[qps_history_idx] = qps
+ qps_history_idx = (qps_history_idx + 1) % len(qps_history)
+ if sum(qps_history) == 0:
+ details = 'QPS has been zero for the last %d seconds - as of : %s' % (
+ poll_interval_secs * 3, qps_recorded_at)
+ is_error = True
+ bq_helper.insert_summary_row(EventType.FAILURE, details)
+ print details
+ break
+
+ # Upload qps metrics to BiqQuery
+ bq_helper.insert_qps_row(qps, qps_recorded_at)
+
+ time.sleep(poll_interval_secs)
+
+ if is_error:
+ print 'Waiting indefinitely..'
+ select.select([], [], [])
+
+ print 'Completed'
+ return
+
+
+if __name__ == '__main__':
+ run_client()
diff --git a/tools/run_tests/stress_test/run_server.py b/tools/run_tests/stress_test/run_server.py
new file mode 100755
index 0000000000..9ad8d63638
--- /dev/null
+++ b/tools/run_tests/stress_test/run_server.py
@@ -0,0 +1,115 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import os
+import select
+import subprocess
+import sys
+import time
+
+from stress_test_utils import BigQueryHelper
+from stress_test_utils import EventType
+
+
+def run_server():
+ """This is a wrapper around the interop server and performs the following:
+ 1) Create a 'Summary table' in Big Query to record events like the server
+ started, completed successfully or failed. NOTE: This also creates
+ another table called the QPS table which is currently NOT needed on the
+ server (it is needed on the stress test clients)
+ 2) Start the server process and add a row in Big Query summary table
+ 3) Wait for the server process to terminate. The server process does not
+ terminate unless there is an error.
+ If the server process terminated with a failure, add a row in Big Query
+ and wait forever.
+ NOTE: This script typically runs inside a GKE pod which means that the
+ pod gets destroyed when the script exits. However, in case the server
+ process fails, we would not want the pod to be destroyed (since we
+ might want to connect to the pod for examining logs). This is the
+ reason why the script waits forever in case of failures.
+ """
+
+ # Read the parameters from environment variables
+ env = dict(os.environ)
+
+ run_id = env['RUN_ID'] # The unique run id for this test
+ image_type = env['STRESS_TEST_IMAGE_TYPE']
+ image_name = env['STRESS_TEST_IMAGE']
+ args_str = env['STRESS_TEST_ARGS_STR']
+ pod_name = env['POD_NAME']
+ project_id = env['GCP_PROJECT_ID']
+ dataset_id = env['DATASET_ID']
+ summary_table_id = env['SUMMARY_TABLE_ID']
+ qps_table_id = env['QPS_TABLE_ID']
+
+ logfile_name = env.get('LOGFILE_NAME')
+
+ bq_helper = BigQueryHelper(run_id, image_type, pod_name, project_id,
+ dataset_id, summary_table_id, qps_table_id)
+ bq_helper.initialize()
+
+ # Create BigQuery Dataset and Tables: Summary Table and Metrics Table
+ if not bq_helper.setup_tables():
+ print 'Error in creating BigQuery tables'
+ return
+
+ start_time = datetime.datetime.now()
+
+ logfile = None
+ details = 'Logging to stdout'
+ if logfile_name is not None:
+ print 'Opening log file: ', logfile_name
+ logfile = open(logfile_name, 'w')
+ details = 'Logfile: %s' % logfile_name
+
+ # Update status that the test is starting (in the status table)
+ bq_helper.insert_summary_row(EventType.STARTING, details)
+
+ stress_cmd = [image_name] + [x for x in args_str.split()]
+
+ print 'Launching process %s ...' % stress_cmd
+ stress_p = subprocess.Popen(args=stress_cmd,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ returncode = stress_p.wait()
+ if returncode != 0:
+ end_time = datetime.datetime.now().isoformat()
+ event_type = EventType.FAILURE
+ details = 'Returncode: %d; End time: %s' % (returncode, end_time)
+ bq_helper.insert_summary_row(event_type, details)
+ print 'Waiting indefinitely..'
+ select.select([], [], [])
+ return returncode
+
+
+if __name__ == '__main__':
+ run_server()
diff --git a/tools/run_tests/stress_test/stress_test_utils.py b/tools/run_tests/stress_test/stress_test_utils.py
new file mode 100755
index 0000000000..a0626ce3ac
--- /dev/null
+++ b/tools/run_tests/stress_test/stress_test_utils.py
@@ -0,0 +1,192 @@
+#!/usr/bin/env python2.7
+# Copyright 2015-2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import datetime
+import json
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+# Import big_query_utils module
+bq_utils_dir = os.path.abspath(os.path.join(
+ os.path.dirname(__file__), '../../big_query'))
+sys.path.append(bq_utils_dir)
+import big_query_utils as bq_utils
+
+class EventType:
+ STARTING = 'STARTING'
+ SUCCESS = 'SUCCESS'
+ FAILURE = 'FAILURE'
+
+class BigQueryHelper:
+ """Helper class for the stress test wrappers to interact with BigQuery.
+ """
+
+ def __init__(self, run_id, image_type, pod_name, project_id, dataset_id,
+ summary_table_id, qps_table_id):
+ self.run_id = run_id
+ self.image_type = image_type
+ self.pod_name = pod_name
+ self.project_id = project_id
+ self.dataset_id = dataset_id
+ self.summary_table_id = summary_table_id
+ self.qps_table_id = qps_table_id
+
+ def initialize(self):
+ self.bq = bq_utils.create_big_query()
+
+ def setup_tables(self):
+ return bq_utils.create_dataset(self.bq, self.project_id, self.dataset_id) \
+ and self.__create_summary_table() \
+ and self.__create_qps_table()
+
+ def insert_summary_row(self, event_type, details):
+ row_values_dict = {
+ 'run_id': self.run_id,
+ 'image_type': self.image_type,
+ 'pod_name': self.pod_name,
+ 'event_date': datetime.datetime.now().isoformat(),
+ 'event_type': event_type,
+ 'details': details
+ }
+ # Something that uniquely identifies the row (Biquery needs it for duplicate
+ # detection).
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, event_type)
+
+ row = bq_utils.make_row(row_unique_id, row_values_dict)
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+ self.summary_table_id, [row])
+
+ def insert_qps_row(self, qps, recorded_at):
+ row_values_dict = {
+ 'run_id': self.run_id,
+ 'pod_name': self.pod_name,
+ 'recorded_at': recorded_at,
+ 'qps': qps
+ }
+
+ row_unique_id = '%s_%s_%s' % (self.run_id, self.pod_name, recorded_at)
+ row = bq_utils.make_row(row_unique_id, row_values_dict)
+ return bq_utils.insert_rows(self.bq, self.project_id, self.dataset_id,
+ self.qps_table_id, [row])
+
+ def check_if_any_tests_failed(self, num_query_retries=3):
+ query = ('SELECT event_type FROM %s.%s WHERE run_id = %s AND '
+ 'event_type="%s"') % (self.dataset_id, self.summary_table_id,
+ self.run_id, EventType.FAILURE)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+ page = self.bq.jobs().getQueryResults(**query_job['jobReference']).execute(
+ num_retries=num_query_retries)
+ print page
+ num_failures = int(page['totalRows'])
+ print 'num rows: ', num_failures
+ return num_failures > 0
+
+ def print_summary_records(self, num_query_retries=3):
+ line = '-' * 120
+ print line
+ print 'Summary records'
+ print 'Run Id', self.run_id
+ print line
+ query = ('SELECT pod_name, image_type, event_type, event_date, details'
+ ' FROM %s.%s WHERE run_id = %s ORDER by event_date;') % (
+ self.dataset_id, self.summary_table_id, self.run_id)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+ 'Pod name', 'Image type', 'Event type', 'Date', 'Details')
+ print line
+ page_token = None
+ while True:
+ page = self.bq.jobs().getQueryResults(
+ pageToken=page_token,
+ **query_job['jobReference']).execute(num_retries=num_query_retries)
+ rows = page.get('rows', [])
+ for row in rows:
+ print '{:<25} {:<12} {:<12} {:<30} {}'.format(
+ row['f'][0]['v'], row['f'][1]['v'], row['f'][2]['v'],
+ row['f'][3]['v'], row['f'][4]['v'])
+ page_token = page.get('pageToken')
+ if not page_token:
+ break
+
+ def print_qps_records(self, num_query_retries=3):
+ line = '-' * 80
+ print line
+ print 'QPS Summary'
+ print 'Run Id: ', self.run_id
+ print line
+ query = (
+ 'SELECT pod_name, recorded_at, qps FROM %s.%s WHERE run_id = %s ORDER '
+ 'by recorded_at;') % (self.dataset_id, self.qps_table_id, self.run_id)
+ query_job = bq_utils.sync_query_job(self.bq, self.project_id, query)
+ print '{:<25} {:30} {}'.format('Pod name', 'Recorded at', 'Qps')
+ print line
+ page_token = None
+ while True:
+ page = self.bq.jobs().getQueryResults(
+ pageToken=page_token,
+ **query_job['jobReference']).execute(num_retries=num_query_retries)
+ rows = page.get('rows', [])
+ for row in rows:
+ print '{:<25} {:30} {}'.format(row['f'][0]['v'], row['f'][1]['v'],
+ row['f'][2]['v'])
+ page_token = page.get('pageToken')
+ if not page_token:
+ break
+
+ def __create_summary_table(self):
+ summary_table_schema = [
+ ('run_id', 'INTEGER', 'Test run id'),
+ ('image_type', 'STRING', 'Client or Server?'),
+ ('pod_name', 'STRING', 'GKE pod hosting this image'),
+ ('event_date', 'STRING', 'The date of this event'),
+ ('event_type', 'STRING', 'STARTED/SUCCESS/FAILURE'),
+ ('details', 'STRING', 'Any other relevant details')
+ ]
+ desc = ('The table that contains START/SUCCESS/FAILURE events for '
+ ' the stress test clients and servers')
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+ self.summary_table_id, summary_table_schema,
+ desc)
+
+ def __create_qps_table(self):
+ qps_table_schema = [
+ ('run_id', 'INTEGER', 'Test run id'),
+ ('pod_name', 'STRING', 'GKE pod hosting this image'),
+ ('recorded_at', 'STRING', 'Metrics recorded at time'),
+ ('qps', 'INTEGER', 'Queries per second')
+ ]
+ desc = 'The table that cointains the qps recorded at various intervals'
+ return bq_utils.create_table(self.bq, self.project_id, self.dataset_id,
+ self.qps_table_id, qps_table_schema, desc)
diff --git a/tools/run_tests/stress_test_wrapper.py b/tools/run_tests/stress_test_wrapper.py
deleted file mode 100755
index 8f1bd2024e..0000000000
--- a/tools/run_tests/stress_test_wrapper.py
+++ /dev/null
@@ -1,96 +0,0 @@
-#!/usr/bin/env python2.7
-import os
-import re
-import select
-import subprocess
-import sys
-import time
-
-GRPC_ROOT = '/usr/local/google/home/sreek/workspace/grpc/'
-STRESS_TEST_IMAGE = GRPC_ROOT + 'bins/opt/stress_test'
-STRESS_TEST_ARGS_STR = ' '.join([
- '--server_addresses=localhost:8000',
- '--test_cases=empty_unary:1,large_unary:1', '--num_stubs_per_channel=10',
- '--test_duration_secs=10'])
-METRICS_CLIENT_IMAGE = GRPC_ROOT + 'bins/opt/metrics_client'
-METRICS_CLIENT_ARGS_STR = ' '.join([
- '--metrics_server_address=localhost:8081', '--total_only=true'])
-LOGFILE_NAME = 'stress_test.log'
-
-
-# TODO (sree): Write a python grpc client to directly query the metrics instead
-# of calling metrics_client
-def get_qps(metrics_cmd):
- qps = 0
- try:
- # Note: gpr_log() writes even non-error messages to stderr stream. So it is
- # important that we set stderr=subprocess.STDOUT
- p = subprocess.Popen(args=metrics_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- retcode = p.wait()
- (out_str, err_str) = p.communicate()
- if retcode != 0:
- print 'Error in reading metrics information'
- print 'Output: ', out_str
- else:
- # The overall qps is printed at the end of the line
- m = re.search('\d+$', out_str)
- qps = int(m.group()) if m else 0
- except Exception as ex:
- print 'Exception while reading metrics information: ' + str(ex)
- return qps
-
-def main(argv):
- # TODO(sree) Create BigQuery Tables
- # (Summary table), (Metrics table)
-
- # TODO(sree) Update status that the test is starting (in the status table)
- #
-
- metrics_cmd = [METRICS_CLIENT_IMAGE
- ] + [x for x in METRICS_CLIENT_ARGS_STR.split()]
-
- stress_cmd = [STRESS_TEST_IMAGE] + [x for x in STRESS_TEST_ARGS_STR.split()]
- # TODO(sree): Add an option to print to stdout if logfilename is absent
- logfile = open(LOGFILE_NAME, 'w')
- stress_p = subprocess.Popen(args=arg_list,
- stdout=logfile,
- stderr=subprocess.STDOUT)
-
- qps_history = [1, 1, 1] # Maintain the last 3 qps
- qps_history_idx = 0 # Index into the qps_history list
-
- is_error = False
- while True:
- # Check if stress_client is still running. If so, collect metrics and upload
- # to BigQuery status table
- #
- if stress_p is not None:
- # TODO(sree) Upload completion status to BiqQuery
- is_error = (stress_p.returncode != 0)
- break
-
- # Stress client still running. Get metrics
- qps = get_qps(metrics_cmd)
-
- # If QPS has been zero for the last 3 iterations, flag it as error and exit
- qps_history[qps_history_idx] = qps
- qps_history_idx = (qps_histor_idx + 1) % len(qps_history)
- if sum(a) == 0:
- print ('QPS has been zero for the last 3 iterations. Not monitoring '
- 'anymore. The stress test client may be stalled.')
- is_error = True
- break
-
- #TODO(sree) Upload qps metrics to BiqQuery
-
- if is_error:
- print 'Waiting indefinitely..'
- select.select([],[],[])
-
- return 1
-
-
-if __name__ == '__main__':
- main(sys.argv[1:])