aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--test/cpp/interop/metrics_client.cc41
-rwxr-xr-xtools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh2
-rw-r--r--tools/gke/big_query_utils.py (renamed from tools/bigquery/big_query_utils.py)0
-rwxr-xr-xtools/gke/create_client.py108
-rwxr-xr-xtools/gke/create_server.py74
-rwxr-xr-xtools/gke/delete_client.py66
-rwxr-xr-xtools/gke/delete_server.py58
-rwxr-xr-xtools/gke/kubernetes_api.py35
-rwxr-xr-xtools/run_tests/stress_test_wrapper.py96
9 files changed, 451 insertions, 29 deletions
diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc
index 0c140ffd85..cc304f2e89 100644
--- a/test/cpp/interop/metrics_client.cc
+++ b/test/cpp/interop/metrics_client.cc
@@ -37,39 +37,45 @@
#include <gflags/gflags.h>
#include <grpc++/grpc++.h>
-#include "test/cpp/util/metrics_server.h"
-#include "test/cpp/util/test_config.h"
#include "src/proto/grpc/testing/metrics.grpc.pb.h"
#include "src/proto/grpc/testing/metrics.pb.h"
+#include "test/cpp/util/metrics_server.h"
+#include "test/cpp/util/test_config.h"
DEFINE_string(metrics_server_address, "",
"The metrics server addresses in the fomrat <hostname>:<port>");
+DEFINE_bool(total_only, false,
+ "If true, this prints only the total value of all gauges");
+
+int kDeadlineSecs = 10;
using grpc::testing::EmptyMessage;
using grpc::testing::GaugeResponse;
using grpc::testing::MetricsService;
using grpc::testing::MetricsServiceImpl;
-void PrintMetrics(const grpc::string& server_address) {
- gpr_log(GPR_INFO, "creating a channel to %s", server_address.c_str());
- std::shared_ptr<grpc::Channel> channel(
- grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()));
-
- std::unique_ptr<MetricsService::Stub> stub(MetricsService::NewStub(channel));
-
+// Prints the values of all Gauges (unless total_only is set to 'true' in which
+// case this only prints the sum of all gauge values).
+bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) {
grpc::ClientContext context;
EmptyMessage message;
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs);
+
+ context.set_deadline(deadline);
+
std::unique_ptr<grpc::ClientReader<GaugeResponse>> reader(
stub->GetAllGauges(&context, message));
GaugeResponse gauge_response;
long overall_qps = 0;
- int idx = 0;
while (reader->Read(&gauge_response)) {
if (gauge_response.value_case() == GaugeResponse::kLongValue) {
- gpr_log(GPR_INFO, "Gauge: %d (%s: %ld)", ++idx,
- gauge_response.name().c_str(), gauge_response.long_value());
+ if (!total_only) {
+ gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(),
+ gauge_response.long_value());
+ }
overall_qps += gauge_response.long_value();
} else {
gpr_log(GPR_INFO, "Gauge %s is not a long value",
@@ -77,12 +83,14 @@ void PrintMetrics(const grpc::string& server_address) {
}
}
- gpr_log(GPR_INFO, "OVERALL: %ld", overall_qps);
+ gpr_log(GPR_INFO, "%ld", overall_qps);
const grpc::Status status = reader->Finish();
if (!status.ok()) {
gpr_log(GPR_ERROR, "Error in getting metrics from the client");
}
+
+ return status.ok();
}
int main(int argc, char** argv) {
@@ -97,7 +105,12 @@ int main(int argc, char** argv) {
return 1;
}
- PrintMetrics(FLAGS_metrics_server_address);
+ std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel(
+ FLAGS_metrics_server_address, grpc::InsecureChannelCredentials()));
+
+ if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) {
+ return 1;
+ }
return 0;
}
diff --git a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
index 6ed3ccb3fa..392bdfccda 100755
--- a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
+++ b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh
@@ -42,4 +42,4 @@ cd /var/local/git/grpc
make install-certs
# build C++ interop stress client, interop client and server
-make stress_test interop_client interop_server
+make stress_test metrics_client interop_client interop_server
diff --git a/tools/bigquery/big_query_utils.py b/tools/gke/big_query_utils.py
index ebcf9d6ec3..ebcf9d6ec3 100644
--- a/tools/bigquery/big_query_utils.py
+++ b/tools/gke/big_query_utils.py
diff --git a/tools/gke/create_client.py b/tools/gke/create_client.py
new file mode 100755
index 0000000000..bc56ef0ef1
--- /dev/null
+++ b/tools/gke/create_client.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python2.7
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+
+import kubernetes_api
+
+argp = argparse.ArgumentParser(description='Launch Stress tests in GKE')
+
+argp.add_argument('-n',
+ '--num_instances',
+ required=True,
+ type=int,
+ help='The number of instances to launch in GKE')
+args = argp.parse_args()
+
+kubernetes_api_server="localhost"
+kubernetes_api_port=8001
+
+
+# Docker image
+image_name="gcr.io/sree-gce/grpc_stress_test_2"
+
+server_address = "stress-server.default.svc.cluster.local:8080"
+metrics_server_address = "localhost:8081"
+
+stress_test_arg_list=[
+ "--server_addresses=" + server_address,
+ "--test_cases=empty_unary:20,large_unary:20",
+ "--num_stubs_per_channel=10"
+]
+
+metrics_client_arg_list=[
+ "--metrics_server_address=" + metrics_server_address,
+ "--total_only=true"]
+
+env_dict={
+ "GPRC_ROOT": "/var/local/git/grpc",
+ "STRESS_TEST_IMAGE": "/var/local/git/grpc/bins/opt/stress_test",
+ "STRESS_TEST_ARGS_STR": ' '.join(stress_test_arg_list),
+ "METRICS_CLIENT_IMAGE": "/var/local/git/grpc/bins/opt/metrics_client",
+ "METRICS_CLIENT_ARGS_STR": ' '.join(metrics_client_arg_list)}
+
+cmd_list=["/var/local/git/grpc/bins/opt/stress_test"]
+arg_list=stress_test_arg_list # make this [] in future
+port_list=[8081]
+
+namespace = 'default'
+is_headless_service = False # Client is NOT headless service
+
+print('Creating %d instances of client..' % args.num_instances)
+
+for i in range(1, args.num_instances + 1):
+ service_name = 'stress-client-%d' % i
+ pod_name = service_name # Use the same name for kubernetes Service and Pod
+ is_success = kubernetes_api.create_pod(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ pod_name,
+ image_name,
+ port_list,
+ cmd_list,
+ arg_list,
+ env_dict)
+ if not is_success:
+ print("Error in creating pod %s" % pod_name)
+ else:
+ is_success = kubernetes_api.create_service(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ service_name,
+ pod_name,
+ port_list, # Service port list
+ port_list, # Container port list (same as service port list)
+ is_headless_service)
+ if not is_success:
+ print("Error in creating service %s" % service_name)
+ else:
+ print("Created client %s" % pod_name)
diff --git a/tools/gke/create_server.py b/tools/gke/create_server.py
new file mode 100755
index 0000000000..23ab62c205
--- /dev/null
+++ b/tools/gke/create_server.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python2.7
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+
+import kubernetes_api
+
+service_name = 'stress-server'
+pod_name = service_name # Use the same name for kubernetes Service and Pod
+namespace = 'default'
+is_headless_service = True
+cmd_list=['/var/local/git/grpc/bins/opt/interop_server']
+arg_list=['--port=8080']
+port_list=[8080]
+image_name='gcr.io/sree-gce/grpc_stress_test_2'
+env_dict={}
+
+# Make sure you run kubectl proxy --port=8001
+kubernetes_api_server='localhost'
+kubernetes_api_port=8001
+
+is_success = kubernetes_api.create_pod(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ pod_name,
+ image_name,
+ port_list,
+ cmd_list,
+ arg_list,
+ env_dict)
+if not is_success:
+ print("Error in creating pod")
+else:
+ is_success = kubernetes_api.create_service(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ service_name,
+ pod_name,
+ port_list, # Service port list
+ port_list, # Container port list (same as service port list)
+ is_headless_service)
+ if not is_success:
+ print("Error in creating service")
+ else:
+ print("Successfully created the Server")
diff --git a/tools/gke/delete_client.py b/tools/gke/delete_client.py
new file mode 100755
index 0000000000..aa519f26b8
--- /dev/null
+++ b/tools/gke/delete_client.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python2.7
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+
+import kubernetes_api
+
+argp = argparse.ArgumentParser(description='Delete Stress test clients in GKE')
+argp.add_argument('-n',
+ '--num_instances',
+ required=True,
+ type=int,
+ help='The number of instances currently running')
+
+args = argp.parse_args()
+for i in range(1, args.num_instances + 1):
+ service_name = 'stress-client-%d' % i
+ pod_name = service_name
+ namespace = 'default'
+ kubernetes_api_server="localhost"
+ kubernetes_api_port=8001
+
+ is_success=kubernetes_api.delete_pod(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ pod_name)
+ if not is_success:
+ print('Error in deleting Pod %s' % pod_name)
+ else:
+ is_success= kubernetes_api.delete_service(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ service_name)
+ if not is_success:
+ print('Error in deleting Service %s' % service_name)
+ else:
+ print('Deleted %s' % pod_name)
diff --git a/tools/gke/delete_server.py b/tools/gke/delete_server.py
new file mode 100755
index 0000000000..6e3fdcc33b
--- /dev/null
+++ b/tools/gke/delete_server.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python2.7
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import argparse
+
+import kubernetes_api
+
+service_name = 'stress-server'
+pod_name = service_name # Use the same name for kubernetes Service and Pod
+namespace = 'default'
+is_headless_service = True
+kubernetes_api_server="localhost"
+kubernetes_api_port=8001
+
+is_success = kubernetes_api.delete_pod(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ pod_name)
+if not is_success:
+ print("Error in deleting Pod %s" % pod_name)
+else:
+ is_success = kubernetes_api.delete_service(
+ kubernetes_api_server,
+ kubernetes_api_port,
+ namespace,
+ service_name)
+ if not is_success:
+ print("Error in deleting Service %d" % service_name)
+ else:
+ print("Deleted server %s" % service_name)
diff --git a/tools/gke/kubernetes_api.py b/tools/gke/kubernetes_api.py
index 7dd3015365..14d724bd31 100755
--- a/tools/gke/kubernetes_api.py
+++ b/tools/gke/kubernetes_api.py
@@ -33,8 +33,9 @@ import json
_REQUEST_TIMEOUT_SECS = 10
+
def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
- arg_list):
+ arg_list, env_dict):
"""Creates a string containing the Pod defintion as required by the Kubernetes API"""
body = {
'kind': 'Pod',
@@ -48,20 +49,21 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
{
'name': pod_name,
'image': image_name,
- 'ports': []
+ 'ports': [{'containerPort': port,
+ 'protocol': 'TCP'} for port in container_port_list]
}
]
}
}
- # Populate the 'ports' list
- for port in container_port_list:
- port_entry = {'containerPort': port, 'protocol': 'TCP'}
- body['spec']['containers'][0]['ports'].append(port_entry)
+
+ env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()]
+ if len(env_list) > 0:
+ body['spec']['containers'][0]['env'] = env_list
# Add the 'Command' and 'Args' attributes if they are passed.
# Note:
# - 'Command' overrides the ENTRYPOINT in the Docker Image
- # - 'Args' override the COMMAND in Docker image (yes, it is confusing!)
+ # - 'Args' override the CMD in Docker image (yes, it is confusing!)
if len(cmd_list) > 0:
body['spec']['containers'][0]['command'] = cmd_list
if len(arg_list) > 0:
@@ -70,7 +72,7 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list,
def _make_service_config(service_name, pod_name, service_port_list,
- container_port_list, is_headless):
+ container_port_list, is_headless):
"""Creates a string containing the Service definition as required by the Kubernetes API.
NOTE:
@@ -124,6 +126,7 @@ def _print_connection_error(msg):
print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on '
'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg)
+
def _do_post(post_url, api_name, request_body):
"""Helper to do HTTP POST.
@@ -135,7 +138,9 @@ def _do_post(post_url, api_name, request_body):
"""
is_success = True
try:
- r = requests.post(post_url, data=request_body, timeout=_REQUEST_TIMEOUT_SECS)
+ r = requests.post(post_url,
+ data=request_body,
+ timeout=_REQUEST_TIMEOUT_SECS)
if r.status_code == requests.codes.conflict:
print('WARN: Looks like the resource already exists. Api: %s, url: %s' %
(api_name, post_url))
@@ -143,7 +148,8 @@ def _do_post(post_url, api_name, request_body):
print('ERROR: %s API returned error. HTTP response: (%d) %s' %
(api_name, r.status_code, r.text))
is_success = False
- except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
+ except (requests.exceptions.Timeout,
+ requests.exceptions.ConnectionError) as e:
is_success = False
_print_connection_error(str(e))
return is_success
@@ -165,7 +171,8 @@ def _do_delete(del_url, api_name):
print('ERROR: %s API returned error. HTTP response: %s' %
(api_name, r.text))
is_success = False
- except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
+ except (requests.exceptions.Timeout,
+ requests.exceptions.ConnectionError) as e:
is_success = False
_print_connection_error(str(e))
return is_success
@@ -179,12 +186,12 @@ def create_service(kube_host, kube_port, namespace, service_name, pod_name,
post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % (
kube_host, kube_port, namespace)
request_body = _make_service_config(service_name, pod_name, service_port_list,
- container_port_list, is_headless)
+ container_port_list, is_headless)
return _do_post(post_url, 'Create Service', request_body)
def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
- container_port_list, cmd_list, arg_list):
+ container_port_list, cmd_list, arg_list, env_dict):
"""Creates a Kubernetes Pod.
Note that it is generally NOT considered a good practice to directly create
@@ -200,7 +207,7 @@ def create_pod(kube_host, kube_port, namespace, pod_name, image_name,
post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port,
namespace)
request_body = _make_pod_config(pod_name, image_name, container_port_list,
- cmd_list, arg_list)
+ cmd_list, arg_list, env_dict)
return _do_post(post_url, 'Create Pod', request_body)
diff --git a/tools/run_tests/stress_test_wrapper.py b/tools/run_tests/stress_test_wrapper.py
new file mode 100755
index 0000000000..8f1bd2024e
--- /dev/null
+++ b/tools/run_tests/stress_test_wrapper.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python2.7
+import os
+import re
+import select
+import subprocess
+import sys
+import time
+
+GRPC_ROOT = '/usr/local/google/home/sreek/workspace/grpc/'
+STRESS_TEST_IMAGE = GRPC_ROOT + 'bins/opt/stress_test'
+STRESS_TEST_ARGS_STR = ' '.join([
+ '--server_addresses=localhost:8000',
+ '--test_cases=empty_unary:1,large_unary:1', '--num_stubs_per_channel=10',
+ '--test_duration_secs=10'])
+METRICS_CLIENT_IMAGE = GRPC_ROOT + 'bins/opt/metrics_client'
+METRICS_CLIENT_ARGS_STR = ' '.join([
+ '--metrics_server_address=localhost:8081', '--total_only=true'])
+LOGFILE_NAME = 'stress_test.log'
+
+
+# TODO (sree): Write a python grpc client to directly query the metrics instead
+# of calling metrics_client
+def get_qps(metrics_cmd):
+ qps = 0
+ try:
+ # Note: gpr_log() writes even non-error messages to stderr stream. So it is
+ # important that we set stderr=subprocess.STDOUT
+ p = subprocess.Popen(args=metrics_cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ retcode = p.wait()
+ (out_str, err_str) = p.communicate()
+ if retcode != 0:
+ print 'Error in reading metrics information'
+ print 'Output: ', out_str
+ else:
+ # The overall qps is printed at the end of the line
+ m = re.search('\d+$', out_str)
+ qps = int(m.group()) if m else 0
+ except Exception as ex:
+ print 'Exception while reading metrics information: ' + str(ex)
+ return qps
+
+def main(argv):
+ # TODO(sree) Create BigQuery Tables
+ # (Summary table), (Metrics table)
+
+ # TODO(sree) Update status that the test is starting (in the status table)
+ #
+
+ metrics_cmd = [METRICS_CLIENT_IMAGE
+ ] + [x for x in METRICS_CLIENT_ARGS_STR.split()]
+
+ stress_cmd = [STRESS_TEST_IMAGE] + [x for x in STRESS_TEST_ARGS_STR.split()]
+ # TODO(sree): Add an option to print to stdout if logfilename is absent
+ logfile = open(LOGFILE_NAME, 'w')
+ stress_p = subprocess.Popen(args=arg_list,
+ stdout=logfile,
+ stderr=subprocess.STDOUT)
+
+ qps_history = [1, 1, 1] # Maintain the last 3 qps
+ qps_history_idx = 0 # Index into the qps_history list
+
+ is_error = False
+ while True:
+ # Check if stress_client is still running. If so, collect metrics and upload
+ # to BigQuery status table
+ #
+ if stress_p is not None:
+ # TODO(sree) Upload completion status to BiqQuery
+ is_error = (stress_p.returncode != 0)
+ break
+
+ # Stress client still running. Get metrics
+ qps = get_qps(metrics_cmd)
+
+ # If QPS has been zero for the last 3 iterations, flag it as error and exit
+ qps_history[qps_history_idx] = qps
+ qps_history_idx = (qps_histor_idx + 1) % len(qps_history)
+ if sum(a) == 0:
+ print ('QPS has been zero for the last 3 iterations. Not monitoring '
+ 'anymore. The stress test client may be stalled.')
+ is_error = True
+ break
+
+ #TODO(sree) Upload qps metrics to BiqQuery
+
+ if is_error:
+ print 'Waiting indefinitely..'
+ select.select([],[],[])
+
+ return 1
+
+
+if __name__ == '__main__':
+ main(sys.argv[1:])