diff options
-rw-r--r-- | test/cpp/interop/metrics_client.cc | 41 | ||||
-rwxr-xr-x | tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh | 2 | ||||
-rw-r--r-- | tools/gke/big_query_utils.py (renamed from tools/bigquery/big_query_utils.py) | 0 | ||||
-rwxr-xr-x | tools/gke/create_client.py | 108 | ||||
-rwxr-xr-x | tools/gke/create_server.py | 74 | ||||
-rwxr-xr-x | tools/gke/delete_client.py | 66 | ||||
-rwxr-xr-x | tools/gke/delete_server.py | 58 | ||||
-rwxr-xr-x | tools/gke/kubernetes_api.py | 35 | ||||
-rwxr-xr-x | tools/run_tests/stress_test_wrapper.py | 96 |
9 files changed, 451 insertions, 29 deletions
diff --git a/test/cpp/interop/metrics_client.cc b/test/cpp/interop/metrics_client.cc index 0c140ffd85..cc304f2e89 100644 --- a/test/cpp/interop/metrics_client.cc +++ b/test/cpp/interop/metrics_client.cc @@ -37,39 +37,45 @@ #include <gflags/gflags.h> #include <grpc++/grpc++.h> -#include "test/cpp/util/metrics_server.h" -#include "test/cpp/util/test_config.h" #include "src/proto/grpc/testing/metrics.grpc.pb.h" #include "src/proto/grpc/testing/metrics.pb.h" +#include "test/cpp/util/metrics_server.h" +#include "test/cpp/util/test_config.h" DEFINE_string(metrics_server_address, "", "The metrics server addresses in the fomrat <hostname>:<port>"); +DEFINE_bool(total_only, false, + "If true, this prints only the total value of all gauges"); + +int kDeadlineSecs = 10; using grpc::testing::EmptyMessage; using grpc::testing::GaugeResponse; using grpc::testing::MetricsService; using grpc::testing::MetricsServiceImpl; -void PrintMetrics(const grpc::string& server_address) { - gpr_log(GPR_INFO, "creating a channel to %s", server_address.c_str()); - std::shared_ptr<grpc::Channel> channel( - grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials())); - - std::unique_ptr<MetricsService::Stub> stub(MetricsService::NewStub(channel)); - +// Prints the values of all Gauges (unless total_only is set to 'true' in which +// case this only prints the sum of all gauge values). +bool PrintMetrics(std::unique_ptr<MetricsService::Stub> stub, bool total_only) { grpc::ClientContext context; EmptyMessage message; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(kDeadlineSecs); + + context.set_deadline(deadline); + std::unique_ptr<grpc::ClientReader<GaugeResponse>> reader( stub->GetAllGauges(&context, message)); GaugeResponse gauge_response; long overall_qps = 0; - int idx = 0; while (reader->Read(&gauge_response)) { if (gauge_response.value_case() == GaugeResponse::kLongValue) { - gpr_log(GPR_INFO, "Gauge: %d (%s: %ld)", ++idx, - gauge_response.name().c_str(), gauge_response.long_value()); + if (!total_only) { + gpr_log(GPR_INFO, "%s: %ld", gauge_response.name().c_str(), + gauge_response.long_value()); + } overall_qps += gauge_response.long_value(); } else { gpr_log(GPR_INFO, "Gauge %s is not a long value", @@ -77,12 +83,14 @@ void PrintMetrics(const grpc::string& server_address) { } } - gpr_log(GPR_INFO, "OVERALL: %ld", overall_qps); + gpr_log(GPR_INFO, "%ld", overall_qps); const grpc::Status status = reader->Finish(); if (!status.ok()) { gpr_log(GPR_ERROR, "Error in getting metrics from the client"); } + + return status.ok(); } int main(int argc, char** argv) { @@ -97,7 +105,12 @@ int main(int argc, char** argv) { return 1; } - PrintMetrics(FLAGS_metrics_server_address); + std::shared_ptr<grpc::Channel> channel(grpc::CreateChannel( + FLAGS_metrics_server_address, grpc::InsecureChannelCredentials())); + + if (!PrintMetrics(MetricsService::NewStub(channel), FLAGS_total_only)) { + return 1; + } return 0; } diff --git a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh index 6ed3ccb3fa..392bdfccda 100755 --- a/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh +++ b/tools/dockerfile/grpc_interop_stress_cxx/build_interop_stress.sh @@ -42,4 +42,4 @@ cd /var/local/git/grpc make install-certs # build C++ interop stress client, interop client and server -make stress_test interop_client interop_server +make stress_test metrics_client interop_client interop_server diff --git a/tools/bigquery/big_query_utils.py b/tools/gke/big_query_utils.py index ebcf9d6ec3..ebcf9d6ec3 100644 --- a/tools/bigquery/big_query_utils.py +++ b/tools/gke/big_query_utils.py diff --git a/tools/gke/create_client.py b/tools/gke/create_client.py new file mode 100755 index 0000000000..bc56ef0ef1 --- /dev/null +++ b/tools/gke/create_client.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python2.7 +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse + +import kubernetes_api + +argp = argparse.ArgumentParser(description='Launch Stress tests in GKE') + +argp.add_argument('-n', + '--num_instances', + required=True, + type=int, + help='The number of instances to launch in GKE') +args = argp.parse_args() + +kubernetes_api_server="localhost" +kubernetes_api_port=8001 + + +# Docker image +image_name="gcr.io/sree-gce/grpc_stress_test_2" + +server_address = "stress-server.default.svc.cluster.local:8080" +metrics_server_address = "localhost:8081" + +stress_test_arg_list=[ + "--server_addresses=" + server_address, + "--test_cases=empty_unary:20,large_unary:20", + "--num_stubs_per_channel=10" +] + +metrics_client_arg_list=[ + "--metrics_server_address=" + metrics_server_address, + "--total_only=true"] + +env_dict={ + "GPRC_ROOT": "/var/local/git/grpc", + "STRESS_TEST_IMAGE": "/var/local/git/grpc/bins/opt/stress_test", + "STRESS_TEST_ARGS_STR": ' '.join(stress_test_arg_list), + "METRICS_CLIENT_IMAGE": "/var/local/git/grpc/bins/opt/metrics_client", + "METRICS_CLIENT_ARGS_STR": ' '.join(metrics_client_arg_list)} + +cmd_list=["/var/local/git/grpc/bins/opt/stress_test"] +arg_list=stress_test_arg_list # make this [] in future +port_list=[8081] + +namespace = 'default' +is_headless_service = False # Client is NOT headless service + +print('Creating %d instances of client..' % args.num_instances) + +for i in range(1, args.num_instances + 1): + service_name = 'stress-client-%d' % i + pod_name = service_name # Use the same name for kubernetes Service and Pod + is_success = kubernetes_api.create_pod( + kubernetes_api_server, + kubernetes_api_port, + namespace, + pod_name, + image_name, + port_list, + cmd_list, + arg_list, + env_dict) + if not is_success: + print("Error in creating pod %s" % pod_name) + else: + is_success = kubernetes_api.create_service( + kubernetes_api_server, + kubernetes_api_port, + namespace, + service_name, + pod_name, + port_list, # Service port list + port_list, # Container port list (same as service port list) + is_headless_service) + if not is_success: + print("Error in creating service %s" % service_name) + else: + print("Created client %s" % pod_name) diff --git a/tools/gke/create_server.py b/tools/gke/create_server.py new file mode 100755 index 0000000000..23ab62c205 --- /dev/null +++ b/tools/gke/create_server.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python2.7 +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse + +import kubernetes_api + +service_name = 'stress-server' +pod_name = service_name # Use the same name for kubernetes Service and Pod +namespace = 'default' +is_headless_service = True +cmd_list=['/var/local/git/grpc/bins/opt/interop_server'] +arg_list=['--port=8080'] +port_list=[8080] +image_name='gcr.io/sree-gce/grpc_stress_test_2' +env_dict={} + +# Make sure you run kubectl proxy --port=8001 +kubernetes_api_server='localhost' +kubernetes_api_port=8001 + +is_success = kubernetes_api.create_pod( + kubernetes_api_server, + kubernetes_api_port, + namespace, + pod_name, + image_name, + port_list, + cmd_list, + arg_list, + env_dict) +if not is_success: + print("Error in creating pod") +else: + is_success = kubernetes_api.create_service( + kubernetes_api_server, + kubernetes_api_port, + namespace, + service_name, + pod_name, + port_list, # Service port list + port_list, # Container port list (same as service port list) + is_headless_service) + if not is_success: + print("Error in creating service") + else: + print("Successfully created the Server") diff --git a/tools/gke/delete_client.py b/tools/gke/delete_client.py new file mode 100755 index 0000000000..aa519f26b8 --- /dev/null +++ b/tools/gke/delete_client.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python2.7 +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse + +import kubernetes_api + +argp = argparse.ArgumentParser(description='Delete Stress test clients in GKE') +argp.add_argument('-n', + '--num_instances', + required=True, + type=int, + help='The number of instances currently running') + +args = argp.parse_args() +for i in range(1, args.num_instances + 1): + service_name = 'stress-client-%d' % i + pod_name = service_name + namespace = 'default' + kubernetes_api_server="localhost" + kubernetes_api_port=8001 + + is_success=kubernetes_api.delete_pod( + kubernetes_api_server, + kubernetes_api_port, + namespace, + pod_name) + if not is_success: + print('Error in deleting Pod %s' % pod_name) + else: + is_success= kubernetes_api.delete_service( + kubernetes_api_server, + kubernetes_api_port, + namespace, + service_name) + if not is_success: + print('Error in deleting Service %s' % service_name) + else: + print('Deleted %s' % pod_name) diff --git a/tools/gke/delete_server.py b/tools/gke/delete_server.py new file mode 100755 index 0000000000..6e3fdcc33b --- /dev/null +++ b/tools/gke/delete_server.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python2.7 +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import argparse + +import kubernetes_api + +service_name = 'stress-server' +pod_name = service_name # Use the same name for kubernetes Service and Pod +namespace = 'default' +is_headless_service = True +kubernetes_api_server="localhost" +kubernetes_api_port=8001 + +is_success = kubernetes_api.delete_pod( + kubernetes_api_server, + kubernetes_api_port, + namespace, + pod_name) +if not is_success: + print("Error in deleting Pod %s" % pod_name) +else: + is_success = kubernetes_api.delete_service( + kubernetes_api_server, + kubernetes_api_port, + namespace, + service_name) + if not is_success: + print("Error in deleting Service %d" % service_name) + else: + print("Deleted server %s" % service_name) diff --git a/tools/gke/kubernetes_api.py b/tools/gke/kubernetes_api.py index 7dd3015365..14d724bd31 100755 --- a/tools/gke/kubernetes_api.py +++ b/tools/gke/kubernetes_api.py @@ -33,8 +33,9 @@ import json _REQUEST_TIMEOUT_SECS = 10 + def _make_pod_config(pod_name, image_name, container_port_list, cmd_list, - arg_list): + arg_list, env_dict): """Creates a string containing the Pod defintion as required by the Kubernetes API""" body = { 'kind': 'Pod', @@ -48,20 +49,21 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list, { 'name': pod_name, 'image': image_name, - 'ports': [] + 'ports': [{'containerPort': port, + 'protocol': 'TCP'} for port in container_port_list] } ] } } - # Populate the 'ports' list - for port in container_port_list: - port_entry = {'containerPort': port, 'protocol': 'TCP'} - body['spec']['containers'][0]['ports'].append(port_entry) + + env_list = [{'name': k, 'value': v} for (k, v) in env_dict.iteritems()] + if len(env_list) > 0: + body['spec']['containers'][0]['env'] = env_list # Add the 'Command' and 'Args' attributes if they are passed. # Note: # - 'Command' overrides the ENTRYPOINT in the Docker Image - # - 'Args' override the COMMAND in Docker image (yes, it is confusing!) + # - 'Args' override the CMD in Docker image (yes, it is confusing!) if len(cmd_list) > 0: body['spec']['containers'][0]['command'] = cmd_list if len(arg_list) > 0: @@ -70,7 +72,7 @@ def _make_pod_config(pod_name, image_name, container_port_list, cmd_list, def _make_service_config(service_name, pod_name, service_port_list, - container_port_list, is_headless): + container_port_list, is_headless): """Creates a string containing the Service definition as required by the Kubernetes API. NOTE: @@ -124,6 +126,7 @@ def _print_connection_error(msg): print('ERROR: Connection failed. Did you remember to run Kubenetes proxy on ' 'localhost (i.e kubectl proxy --port=<proxy_port>) ?. Error: %s' % msg) + def _do_post(post_url, api_name, request_body): """Helper to do HTTP POST. @@ -135,7 +138,9 @@ def _do_post(post_url, api_name, request_body): """ is_success = True try: - r = requests.post(post_url, data=request_body, timeout=_REQUEST_TIMEOUT_SECS) + r = requests.post(post_url, + data=request_body, + timeout=_REQUEST_TIMEOUT_SECS) if r.status_code == requests.codes.conflict: print('WARN: Looks like the resource already exists. Api: %s, url: %s' % (api_name, post_url)) @@ -143,7 +148,8 @@ def _do_post(post_url, api_name, request_body): print('ERROR: %s API returned error. HTTP response: (%d) %s' % (api_name, r.status_code, r.text)) is_success = False - except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError) as e: is_success = False _print_connection_error(str(e)) return is_success @@ -165,7 +171,8 @@ def _do_delete(del_url, api_name): print('ERROR: %s API returned error. HTTP response: %s' % (api_name, r.text)) is_success = False - except(requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + except (requests.exceptions.Timeout, + requests.exceptions.ConnectionError) as e: is_success = False _print_connection_error(str(e)) return is_success @@ -179,12 +186,12 @@ def create_service(kube_host, kube_port, namespace, service_name, pod_name, post_url = 'http://%s:%d/api/v1/namespaces/%s/services' % ( kube_host, kube_port, namespace) request_body = _make_service_config(service_name, pod_name, service_port_list, - container_port_list, is_headless) + container_port_list, is_headless) return _do_post(post_url, 'Create Service', request_body) def create_pod(kube_host, kube_port, namespace, pod_name, image_name, - container_port_list, cmd_list, arg_list): + container_port_list, cmd_list, arg_list, env_dict): """Creates a Kubernetes Pod. Note that it is generally NOT considered a good practice to directly create @@ -200,7 +207,7 @@ def create_pod(kube_host, kube_port, namespace, pod_name, image_name, post_url = 'http://%s:%d/api/v1/namespaces/%s/pods' % (kube_host, kube_port, namespace) request_body = _make_pod_config(pod_name, image_name, container_port_list, - cmd_list, arg_list) + cmd_list, arg_list, env_dict) return _do_post(post_url, 'Create Pod', request_body) diff --git a/tools/run_tests/stress_test_wrapper.py b/tools/run_tests/stress_test_wrapper.py new file mode 100755 index 0000000000..8f1bd2024e --- /dev/null +++ b/tools/run_tests/stress_test_wrapper.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python2.7 +import os +import re +import select +import subprocess +import sys +import time + +GRPC_ROOT = '/usr/local/google/home/sreek/workspace/grpc/' +STRESS_TEST_IMAGE = GRPC_ROOT + 'bins/opt/stress_test' +STRESS_TEST_ARGS_STR = ' '.join([ + '--server_addresses=localhost:8000', + '--test_cases=empty_unary:1,large_unary:1', '--num_stubs_per_channel=10', + '--test_duration_secs=10']) +METRICS_CLIENT_IMAGE = GRPC_ROOT + 'bins/opt/metrics_client' +METRICS_CLIENT_ARGS_STR = ' '.join([ + '--metrics_server_address=localhost:8081', '--total_only=true']) +LOGFILE_NAME = 'stress_test.log' + + +# TODO (sree): Write a python grpc client to directly query the metrics instead +# of calling metrics_client +def get_qps(metrics_cmd): + qps = 0 + try: + # Note: gpr_log() writes even non-error messages to stderr stream. So it is + # important that we set stderr=subprocess.STDOUT + p = subprocess.Popen(args=metrics_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + retcode = p.wait() + (out_str, err_str) = p.communicate() + if retcode != 0: + print 'Error in reading metrics information' + print 'Output: ', out_str + else: + # The overall qps is printed at the end of the line + m = re.search('\d+$', out_str) + qps = int(m.group()) if m else 0 + except Exception as ex: + print 'Exception while reading metrics information: ' + str(ex) + return qps + +def main(argv): + # TODO(sree) Create BigQuery Tables + # (Summary table), (Metrics table) + + # TODO(sree) Update status that the test is starting (in the status table) + # + + metrics_cmd = [METRICS_CLIENT_IMAGE + ] + [x for x in METRICS_CLIENT_ARGS_STR.split()] + + stress_cmd = [STRESS_TEST_IMAGE] + [x for x in STRESS_TEST_ARGS_STR.split()] + # TODO(sree): Add an option to print to stdout if logfilename is absent + logfile = open(LOGFILE_NAME, 'w') + stress_p = subprocess.Popen(args=arg_list, + stdout=logfile, + stderr=subprocess.STDOUT) + + qps_history = [1, 1, 1] # Maintain the last 3 qps + qps_history_idx = 0 # Index into the qps_history list + + is_error = False + while True: + # Check if stress_client is still running. If so, collect metrics and upload + # to BigQuery status table + # + if stress_p is not None: + # TODO(sree) Upload completion status to BiqQuery + is_error = (stress_p.returncode != 0) + break + + # Stress client still running. Get metrics + qps = get_qps(metrics_cmd) + + # If QPS has been zero for the last 3 iterations, flag it as error and exit + qps_history[qps_history_idx] = qps + qps_history_idx = (qps_histor_idx + 1) % len(qps_history) + if sum(a) == 0: + print ('QPS has been zero for the last 3 iterations. Not monitoring ' + 'anymore. The stress test client may be stalled.') + is_error = True + break + + #TODO(sree) Upload qps metrics to BiqQuery + + if is_error: + print 'Waiting indefinitely..' + select.select([],[],[]) + + return 1 + + +if __name__ == '__main__': + main(sys.argv[1:]) |