diff options
Diffstat (limited to 'src/python/grpcio/tests/qps/worker_server.py')
-rw-r--r-- | src/python/grpcio/tests/qps/worker_server.py | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py new file mode 100644 index 0000000000..0b3acc14e7 --- /dev/null +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -0,0 +1,184 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import multiprocessing +import random +import threading +import time + +from grpc.beta import implementations +from grpc.framework.interfaces.face import utilities +from src.proto.grpc.testing import control_pb2 +from src.proto.grpc.testing import services_pb2 +from src.proto.grpc.testing import stats_pb2 + +from tests.qps import benchmark_client +from tests.qps import benchmark_server +from tests.qps import client_runner +from tests.qps import histogram +from tests.unit import resources + + +class WorkerServer(services_pb2.BetaWorkerServiceServicer): + """Python Worker Server implementation.""" + + def __init__(self): + self._quit_event = threading.Event() + + def RunServer(self, request_iterator, context): + config = next(request_iterator).setup + server, port = self._create_server(config) + cores = multiprocessing.cpu_count() + server.start() + start_time = time.time() + yield self._get_server_status(start_time, start_time, port, cores) + + for request in request_iterator: + end_time = time.time() + status = self._get_server_status(start_time, end_time, port, cores) + if request.mark.reset: + start_time = end_time + yield status + server.stop(0) + + def _get_server_status(self, start_time, end_time, port, cores): + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ServerStats(time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) + + def _create_server(self, config): + if config.server_type == control_pb2.SYNC_SERVER: + servicer = benchmark_server.BenchmarkServer() + server = services_pb2.beta_create_BenchmarkService_server(servicer) + elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: + resp_size = config.payload_config.bytebuf_params.resp_size + servicer = benchmark_server.GenericBenchmarkServer(resp_size) + method_implementations = { + ('grpc.testing.BenchmarkService', 'StreamingCall'): + utilities.stream_stream_inline(servicer.StreamingCall), + ('grpc.testing.BenchmarkService', 'UnaryCall'): + utilities.unary_unary_inline(servicer.UnaryCall), + } + server = implementations.server(method_implementations) + else: + raise Exception('Unsupported server type {}'.format(config.server_type)) + + if config.HasField('security_params'): # Use SSL + server_creds = implementations.ssl_server_credentials([( + resources.private_key(), resources.certificate_chain())]) + port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) + else: + port = server.add_insecure_port('[::]:{}'.format(config.port)) + + return (server, port) + + def RunClient(self, request_iterator, context): + config = next(request_iterator).setup + client_runners = [] + qps_data = histogram.Histogram(config.histogram_params.resolution, + config.histogram_params.max_possible) + start_time = time.time() + + # Create a client for each channel + for i in xrange(config.client_channels): + server = config.server_targets[i % len(config.server_targets)] + runner = self._create_client_runner(server, config, qps_data) + client_runners.append(runner) + runner.start() + + end_time = time.time() + yield self._get_client_status(start_time, end_time, qps_data) + + # Respond to stat requests + for request in request_iterator: + end_time = time.time() + status = self._get_client_status(start_time, end_time, qps_data) + if request.mark.reset: + qps_data.reset() + start_time = time.time() + yield status + + # Cleanup the clients + for runner in client_runners: + runner.stop() + + def _get_client_status(self, start_time, end_time, qps_data): + latencies = qps_data.get_data() + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ClientStats(latencies=latencies, + time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ClientStatus(stats=stats) + + def _create_client_runner(self, server, config, qps_data): + if config.client_type == control_pb2.SYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client = benchmark_client.UnarySyncBenchmarkClient( + server, config, qps_data) + else: + raise Exception('STREAMING SYNC client not supported') + elif config.client_type == control_pb2.ASYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client = benchmark_client.UnaryAsyncBenchmarkClient( + server, config, qps_data) + elif config.rpc_type == control_pb2.STREAMING: + client = benchmark_client.StreamingAsyncBenchmarkClient( + server, config, qps_data) + else: + raise Exception('Unsupported client type {}'.format(config.client_type)) + + # In multi-channel tests, we split the load across all channels + load_factor = float(config.client_channels) + if config.load_params.WhichOneof('load') == 'closed_loop': + runner = client_runner.ClosedLoopClientRunner( + client, config.outstanding_rpcs_per_channel) + else: # Open loop Poisson + alpha = config.load_params.poisson.offered_load / load_factor + def poisson(): + while True: + yield random.expovariate(alpha) + + runner = client_runner.OpenLoopClientRunner(client, poisson()) + + return runner + + def CoreCount(self, request, context): + return control_pb2.CoreResponse(cores=multiprocessing.cpu_count()) + + def QuitWorker(self, request, context): + self._quit_event.set() + return control_pb2.Void() + + def wait_for_quit(self): + self._quit_event.wait() |