aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-05-11 09:11:24 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-05-11 09:11:24 -0700
commit594ff2054addd146cd3da1492217441555635714 (patch)
treee3cc94567ed2b8ad08eb07821038d2084e1b09e7
parent1fc79fccf4f78e39032dc51d2fba26ba0f706bef (diff)
parent1eb8d54a1967edbc234d8abae5b1c4216c8bd867 (diff)
Merge pull request #6524 from kpayson64/qps_improvements
Added true async qps streaming client
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py60
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py2
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py5
-rw-r--r--tools/run_tests/performance/scenario_config.py49
4 files changed, 81 insertions, 35 deletions
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index eed0b0c6da..b372ea01ad 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -39,6 +39,7 @@ except ImportError:
from concurrent import futures
from grpc.beta import implementations
+from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
@@ -141,10 +142,10 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
self._stub = None
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
+class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
self._is_streaming = False
self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Use a thread-safe queue to put requests on the stream
@@ -167,12 +168,12 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
def _request_stream(self):
self._is_streaming = True
if self._generic:
- response_stream = self._stub.inline_stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall',
- self._request_generator(), _TIMEOUT)
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
else:
- response_stream = self._stub.StreamingCall(self._request_generator(),
- _TIMEOUT)
+ stream_callable = self._stub.StreamingCall
+
+ response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
end_time = time.time()
self._handle_response(end_time - self._send_time_queue.get_nowait())
@@ -184,3 +185,48 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
yield request
except queue.Empty:
pass
+
+
+class AsyncReceiver(face.ResponseReceiver):
+ """Receiver for async stream responses."""
+
+ def __init__(self, send_time_queue, response_handler):
+ self._send_time_queue = send_time_queue
+ self._response_handler = response_handler
+
+ def initial_metadata(self, initial_mdetadata):
+ pass
+
+ def response(self, response):
+ end_time = time.time()
+ self._response_handler(end_time - self._send_time_queue.get_nowait())
+
+ def complete(self, terminal_metadata, code, details):
+ pass
+
+
+class StreamingAsyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ self._send_time_queue = queue.Queue()
+ self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
+ self._rendezvous = None
+
+ def send_request(self):
+ if self._rendezvous is not None:
+ self._send_time_queue.put(time.time())
+ self._rendezvous.consume(self._request)
+
+ def start(self):
+ if self._generic:
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
+ else:
+ stream_callable = self._stub.StreamingCall
+ self._rendezvous = stream_callable.event(
+ self._receiver, lambda *args: None, _TIMEOUT)
+
+ def stop(self):
+ self._rendezvous.terminate()
+ self._rendezvous = None
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
index a36c30ccc0..1ede7d2af1 100644
--- a/src/python/grpcio/tests/qps/client_runner.py
+++ b/src/python/grpcio/tests/qps/client_runner.py
@@ -89,9 +89,9 @@ class ClosedLoopClientRunner(ClientRunner):
def start(self):
self._is_running = True
+ self._client.start()
for _ in xrange(self._request_count):
self._client.send_request()
- self._client.start()
def stop(self):
self._is_running = False
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
index 0b3acc14e7..1f9af5482c 100644
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -146,8 +146,9 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnarySyncBenchmarkClient(
server, config, qps_data)
- else:
- raise Exception('STREAMING SYNC client not supported')
+ elif config.rpc_type == control_pb2.STREAMING:
+ client = benchmark_client.StreamingSyncBenchmarkClient(
+ server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py
index 8f76d0a02e..a13e8b739f 100644
--- a/tools/run_tests/performance/scenario_config.py
+++ b/tools/run_tests/performance/scenario_config.py
@@ -349,39 +349,39 @@ class PythonLanguage:
return 500
def scenarios(self):
- # TODO(jtattermusch): this scenario reports QPS 0.0
- yield _ping_pong_scenario(
- 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING',
- client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
- use_generic_payload=True,
- categories=[SMOKETEST])
-
- # TODO(jtattermusch): make this scenario work
+ # TODO(issue #6522): Empty streaming requests does not work for python
#yield _ping_pong_scenario(
- # 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
- # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+ # 'python_generic_async_streaming_ping_pong', rpc_type='STREAMING',
+ # client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER',
+ # use_generic_payload=True,
+ # categories=[SMOKETEST])
- # TODO(jtattermusch): make this scenario work
- #yield _ping_pong_scenario(
- # 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
- # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER')
+ yield _ping_pong_scenario(
+ 'python_protobuf_async_streaming_ping_pong', rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
+
+ yield _ping_pong_scenario(
+ 'python_protobuf_async_unary_ping_pong', rpc_type='UNARY',
+ client_type='ASYNC_CLIENT', server_type='SYNC_SERVER')
yield _ping_pong_scenario(
'python_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
categories=[SMOKETEST])
- # TODO(jtattermusch): make this scenario work
+ # TODO(jtattermusch):
+ # The qps_worker server gets thread starved with ~6400 threads, the GIL
+ # enforces that a single thread runs at a time, with no way to set thread
+ # priority. Re-evaluate after changing DEEP and WIDE.
#yield _ping_pong_scenario(
# 'python_protobuf_sync_unary_qps_unconstrained', rpc_type='UNARY',
# client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
# use_unconstrained_client=True)
- # TODO(jtattermusch): make this scenario work
- #yield _ping_pong_scenario(
- # 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
- # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER',
- # use_unconstrained_client=True)
+ yield _ping_pong_scenario(
+ 'python_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING',
+ client_type='ASYNC_CLIENT', server_type='SYNC_SERVER',
+ use_unconstrained_client=True)
yield _ping_pong_scenario(
'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY',
@@ -389,11 +389,10 @@ class PythonLanguage:
server_language='c++', server_core_limit=1, async_server_threads=1,
categories=[SMOKETEST])
- # TODO(jtattermusch): make this scenario work
- #yield _ping_pong_scenario(
- # 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
- # client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
- # server_language='c++', server_core_limit=1, async_server_threads=1)
+ yield _ping_pong_scenario(
+ 'python_to_cpp_protobuf_sync_streaming_ping_pong', rpc_type='STREAMING',
+ client_type='SYNC_CLIENT', server_type='SYNC_SERVER',
+ server_language='c++', server_core_limit=1, async_server_threads=1)
def __str__(self):
return 'python'