diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/qps/worker_server.py')
-rw-r--r-- | src/python/grpcio_tests/tests/qps/worker_server.py | 297 |
1 files changed, 152 insertions, 145 deletions
diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 46d542940f..1deb7ed698 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -46,149 +46,156 @@ from tests.unit import resources class WorkerServer(services_pb2.WorkerServiceServicer): - """Python Worker Server implementation.""" - - def __init__(self): - self._quit_event = threading.Event() - - def RunServer(self, request_iterator, context): - config = next(request_iterator).setup - server, port = self._create_server(config) - cores = multiprocessing.cpu_count() - server.start() - start_time = time.time() - yield self._get_server_status(start_time, start_time, port, cores) - - for request in request_iterator: - end_time = time.time() - status = self._get_server_status(start_time, end_time, port, cores) - if request.mark.reset: - start_time = end_time - yield status - server.stop(None) - - def _get_server_status(self, start_time, end_time, port, cores): - end_time = time.time() - elapsed_time = end_time - start_time - stats = stats_pb2.ServerStats(time_elapsed=elapsed_time, - time_user=elapsed_time, - time_system=elapsed_time) - return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) - - def _create_server(self, config): - if config.async_server_threads == 0: - # This is the default concurrent.futures thread pool size, but - # None doesn't seem to work - server_threads = multiprocessing.cpu_count() * 5 - else: - server_threads = config.async_server_threads - server = grpc.server(futures.ThreadPoolExecutor( - max_workers=server_threads)) - if config.server_type == control_pb2.ASYNC_SERVER: - servicer = benchmark_server.BenchmarkServer() - services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server) - elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: - resp_size = config.payload_config.bytebuf_params.resp_size - servicer = benchmark_server.GenericBenchmarkServer(resp_size) - method_implementations = { - 'StreamingCall': - grpc.stream_stream_rpc_method_handler(servicer.StreamingCall), - 'UnaryCall': - grpc.unary_unary_rpc_method_handler(servicer.UnaryCall), - } - handler = grpc.method_handlers_generic_handler( - 'grpc.testing.BenchmarkService', method_implementations) - server.add_generic_rpc_handlers((handler,)) - else: - raise Exception('Unsupported server type {}'.format(config.server_type)) - - if config.HasField('security_params'): # Use SSL - server_creds = grpc.ssl_server_credentials( - ((resources.private_key(), resources.certificate_chain()),)) - port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) - else: - port = server.add_insecure_port('[::]:{}'.format(config.port)) - - return (server, port) - - def RunClient(self, request_iterator, context): - config = next(request_iterator).setup - client_runners = [] - qps_data = histogram.Histogram(config.histogram_params.resolution, - config.histogram_params.max_possible) - start_time = time.time() - - # Create a client for each channel - for i in xrange(config.client_channels): - server = config.server_targets[i % len(config.server_targets)] - runner = self._create_client_runner(server, config, qps_data) - client_runners.append(runner) - runner.start() - - end_time = time.time() - yield self._get_client_status(start_time, end_time, qps_data) - - # Respond to stat requests - for request in request_iterator: - end_time = time.time() - status = self._get_client_status(start_time, end_time, qps_data) - if request.mark.reset: - qps_data.reset() + """Python Worker Server implementation.""" + + def __init__(self): + self._quit_event = threading.Event() + + def RunServer(self, request_iterator, context): + config = next(request_iterator).setup + server, port = self._create_server(config) + cores = multiprocessing.cpu_count() + server.start() start_time = time.time() - yield status - - # Cleanup the clients - for runner in client_runners: - runner.stop() - - def _get_client_status(self, start_time, end_time, qps_data): - latencies = qps_data.get_data() - end_time = time.time() - elapsed_time = end_time - start_time - stats = stats_pb2.ClientStats(latencies=latencies, - time_elapsed=elapsed_time, - time_user=elapsed_time, - time_system=elapsed_time) - return control_pb2.ClientStatus(stats=stats) - - def _create_client_runner(self, server, config, qps_data): - if config.client_type == control_pb2.SYNC_CLIENT: - if config.rpc_type == control_pb2.UNARY: - client = benchmark_client.UnarySyncBenchmarkClient( - server, config, qps_data) - elif config.rpc_type == control_pb2.STREAMING: - client = benchmark_client.StreamingSyncBenchmarkClient( - server, config, qps_data) - elif config.client_type == control_pb2.ASYNC_CLIENT: - if config.rpc_type == control_pb2.UNARY: - client = benchmark_client.UnaryAsyncBenchmarkClient( - server, config, qps_data) - else: - raise Exception('Async streaming client not supported') - else: - raise Exception('Unsupported client type {}'.format(config.client_type)) - - # In multi-channel tests, we split the load across all channels - load_factor = float(config.client_channels) - if config.load_params.WhichOneof('load') == 'closed_loop': - runner = client_runner.ClosedLoopClientRunner( - client, config.outstanding_rpcs_per_channel) - else: # Open loop Poisson - alpha = config.load_params.poisson.offered_load / load_factor - def poisson(): - while True: - yield random.expovariate(alpha) - - runner = client_runner.OpenLoopClientRunner(client, poisson()) - - return runner - - def CoreCount(self, request, context): - return control_pb2.CoreResponse(cores=multiprocessing.cpu_count()) - - def QuitWorker(self, request, context): - self._quit_event.set() - return control_pb2.Void() - - def wait_for_quit(self): - self._quit_event.wait() + yield self._get_server_status(start_time, start_time, port, cores) + + for request in request_iterator: + end_time = time.time() + status = self._get_server_status(start_time, end_time, port, cores) + if request.mark.reset: + start_time = end_time + yield status + server.stop(None) + + def _get_server_status(self, start_time, end_time, port, cores): + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ServerStats( + time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) + + def _create_server(self, config): + if config.async_server_threads == 0: + # This is the default concurrent.futures thread pool size, but + # None doesn't seem to work + server_threads = multiprocessing.cpu_count() * 5 + else: + server_threads = config.async_server_threads + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=server_threads)) + if config.server_type == control_pb2.ASYNC_SERVER: + servicer = benchmark_server.BenchmarkServer() + services_pb2.add_BenchmarkServiceServicer_to_server(servicer, + server) + elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: + resp_size = config.payload_config.bytebuf_params.resp_size + servicer = benchmark_server.GenericBenchmarkServer(resp_size) + method_implementations = { + 'StreamingCall': + grpc.stream_stream_rpc_method_handler(servicer.StreamingCall), + 'UnaryCall': + grpc.unary_unary_rpc_method_handler(servicer.UnaryCall), + } + handler = grpc.method_handlers_generic_handler( + 'grpc.testing.BenchmarkService', method_implementations) + server.add_generic_rpc_handlers((handler,)) + else: + raise Exception('Unsupported server type {}'.format( + config.server_type)) + + if config.HasField('security_params'): # Use SSL + server_creds = grpc.ssl_server_credentials(( + (resources.private_key(), resources.certificate_chain()),)) + port = server.add_secure_port('[::]:{}'.format(config.port), + server_creds) + else: + port = server.add_insecure_port('[::]:{}'.format(config.port)) + + return (server, port) + + def RunClient(self, request_iterator, context): + config = next(request_iterator).setup + client_runners = [] + qps_data = histogram.Histogram(config.histogram_params.resolution, + config.histogram_params.max_possible) + start_time = time.time() + + # Create a client for each channel + for i in xrange(config.client_channels): + server = config.server_targets[i % len(config.server_targets)] + runner = self._create_client_runner(server, config, qps_data) + client_runners.append(runner) + runner.start() + + end_time = time.time() + yield self._get_client_status(start_time, end_time, qps_data) + + # Respond to stat requests + for request in request_iterator: + end_time = time.time() + status = self._get_client_status(start_time, end_time, qps_data) + if request.mark.reset: + qps_data.reset() + start_time = time.time() + yield status + + # Cleanup the clients + for runner in client_runners: + runner.stop() + + def _get_client_status(self, start_time, end_time, qps_data): + latencies = qps_data.get_data() + end_time = time.time() + elapsed_time = end_time - start_time + stats = stats_pb2.ClientStats( + latencies=latencies, + time_elapsed=elapsed_time, + time_user=elapsed_time, + time_system=elapsed_time) + return control_pb2.ClientStatus(stats=stats) + + def _create_client_runner(self, server, config, qps_data): + if config.client_type == control_pb2.SYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client = benchmark_client.UnarySyncBenchmarkClient( + server, config, qps_data) + elif config.rpc_type == control_pb2.STREAMING: + client = benchmark_client.StreamingSyncBenchmarkClient( + server, config, qps_data) + elif config.client_type == control_pb2.ASYNC_CLIENT: + if config.rpc_type == control_pb2.UNARY: + client = benchmark_client.UnaryAsyncBenchmarkClient( + server, config, qps_data) + else: + raise Exception('Async streaming client not supported') + else: + raise Exception('Unsupported client type {}'.format( + config.client_type)) + + # In multi-channel tests, we split the load across all channels + load_factor = float(config.client_channels) + if config.load_params.WhichOneof('load') == 'closed_loop': + runner = client_runner.ClosedLoopClientRunner( + client, config.outstanding_rpcs_per_channel) + else: # Open loop Poisson + alpha = config.load_params.poisson.offered_load / load_factor + + def poisson(): + while True: + yield random.expovariate(alpha) + + runner = client_runner.OpenLoopClientRunner(client, poisson()) + + return runner + + def CoreCount(self, request, context): + return control_pb2.CoreResponse(cores=multiprocessing.cpu_count()) + + def QuitWorker(self, request, context): + self._quit_event.set() + return control_pb2.Void() + + def wait_for_quit(self): + self._quit_event.wait() |