From 1eb8d54a1967edbc234d8abae5b1c4216c8bd867 Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 10 May 2016 16:57:14 -0700 Subject: Added true async qps client --- src/python/grpcio/tests/qps/benchmark_client.py | 60 ++++++++++++++++++++++--- src/python/grpcio/tests/qps/client_runner.py | 2 +- src/python/grpcio/tests/qps/worker_server.py | 5 ++- tools/run_tests/performance/scenario_config.py | 49 ++++++++++---------- 4 files changed, 81 insertions(+), 35 deletions(-) diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index eed0b0c6da..b372ea01ad 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -39,6 +39,7 @@ except ImportError: from concurrent import futures from grpc.beta import implementations +from grpc.framework.interfaces.face import face from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import services_pb2 from tests.unit import resources @@ -141,10 +142,10 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient): self._stub = None -class StreamingAsyncBenchmarkClient(BenchmarkClient): +class StreamingSyncBenchmarkClient(BenchmarkClient): def __init__(self, server, config, hist): - super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) + super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) self._is_streaming = False self._pool = futures.ThreadPoolExecutor(max_workers=1) # Use a thread-safe queue to put requests on the stream @@ -167,12 +168,12 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient): def _request_stream(self): self._is_streaming = True if self._generic: - response_stream = self._stub.inline_stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall', - self._request_generator(), _TIMEOUT) + stream_callable = self._stub.stream_stream( + 'grpc.testing.BenchmarkService', 'StreamingCall') else: - response_stream = self._stub.StreamingCall(self._request_generator(), - _TIMEOUT) + stream_callable = self._stub.StreamingCall + + response_stream = stream_callable(self._request_generator(), _TIMEOUT) for _ in response_stream: end_time = time.time() self._handle_response(end_time - self._send_time_queue.get_nowait()) @@ -184,3 +185,48 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient): yield request except queue.Empty: pass + + +class AsyncReceiver(face.ResponseReceiver): + """Receiver for async stream responses.""" + + def __init__(self, send_time_queue, response_handler): + self._send_time_queue = send_time_queue + self._response_handler = response_handler + + def initial_metadata(self, initial_mdetadata): + pass + + def response(self, response): + end_time = time.time() + self._response_handler(end_time - self._send_time_queue.get_nowait()) + + def complete(self, terminal_metadata, code, details): + pass + + +class StreamingAsyncBenchmarkClient(BenchmarkClient): + + def __init__(self, server, config, hist): + super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) + self._send_time_queue = queue.Queue() + self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) + self._rendezvous = None + + def send_request(self): + if self._rendezvous is not None: + self._send_time_queue.put(time.time()) + self._rendezvous.consume(self._request) + + def start(self): + if self._generic: + stream_callable = self._stub.stream_stream( + 'grpc.testing.BenchmarkService', 'StreamingCall') + else: + stream_callable = self._stub.StreamingCall + self._rendezvous = stream_callable.event( + self._receiver, lambda *args: None, _TIMEOUT) + + def stop(self): + self._rendezvous.terminate() + self._rendezvous = None diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index a36c30ccc0..1ede7d2af1 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -89,9 +89,9 @@ class ClosedLoopClientRunner(ClientRunner): def start(self): self._is_running = True + self._client.start() for _ in xrange(self._request_count): self._client.send_request() - self._client.start() def stop(self): self._is_running = False diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py index 0b3acc14e7..1f9af5482c 100644 --- a/src/python/grpcio/tests/qps/worker_server.py +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -146,8 +146,9 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnarySyncBenchmarkClient( server, config, qps_data) - else: - raise Exception('STREAMING SYNC client not supported') + elif config.rpc_type == control_pb2.STREAMING: + client = benchmark_client.StreamingSyncBenchmarkClient( + server, config, qps_data) elif config.client_type == control_pb2.ASYNC_CLIENT: if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnaryAsyncBenchmarkClient( diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index d393709623..30db85cf48 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -355,39 +355,39 @@ class PythonLanguage: return 500 def scenarios(self): - # TODO(jtattermusch): this scenario reports QPS 0.0 - yield _ping_pong_scenario( - 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING', - client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', - use_generic_payload=True, - categories=[SMOKETEST]) - - # TODO(jtattermusch): make this scenario work + # TODO(issue #6522): Empty streaming requests does not work for python #yield _ping_pong_scenario( - # 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', - # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') + # 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING', + # client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + # use_generic_payload=True, + # categories=[SMOKETEST]) - # TODO(jtattermusch): make this scenario work - #yield _ping_pong_scenario( - # 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', - # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') + yield _ping_pong_scenario( + 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', + client_type='ASYNC_CLIENT', server_type='SYNC_SERVER') + + yield _ping_pong_scenario( + 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY', + client_type='ASYNC_CLIENT', server_type='SYNC_SERVER') yield _ping_pong_scenario( 'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY', client_type='SYNC_CLIENT', server_type='SYNC_SERVER', categories=[SMOKETEST]) - # TODO(jtattermusch): make this scenario work + # TODO(jtattermusch): + # The qps_worker server gets thread starved with ~6400 threads, the GIL + # enforces that a single thread runs at a time, with no way to set thread + # priority. Re-evaluate after changing DEEP and WIDE. #yield _ping_pong_scenario( # 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY', # client_type='SYNC_CLIENT', server_type='SYNC_SERVER', # use_unconstrained_client=True) - # TODO(jtattermusch): make this scenario work - #yield _ping_pong_scenario( - # 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', - # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', - # use_unconstrained_client=True) + yield _ping_pong_scenario( + 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', + client_type='ASYNC_CLIENT', server_type='SYNC_SERVER', + use_unconstrained_client=True) yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', @@ -395,11 +395,10 @@ class PythonLanguage: server_language='c++', server_core_limit=1, async_server_threads=1, categories=[SMOKETEST]) - # TODO(jtattermusch): make this scenario work - #yield _ping_pong_scenario( - # 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', - # client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - # server_language='c++', server_core_limit=1, async_server_threads=1) + yield _ping_pong_scenario( + 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING', + client_type='SYNC_CLIENT', server_type='SYNC_SERVER', + server_language='c++', server_core_limit=1, async_server_threads=1) def __str__(self): return 'python' -- cgit v1.2.3