aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py45
-rw-r--r--src/python/grpcio/tests/qps/qps_worker.py4
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py5
3 files changed, 3 insertions, 51 deletions
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index b372ea01ad..aac218ed81 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -185,48 +185,3 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
yield request
except queue.Empty:
pass
-
-
-class AsyncReceiver(face.ResponseReceiver):
- """Receiver for async stream responses."""
-
- def __init__(self, send_time_queue, response_handler):
- self._send_time_queue = send_time_queue
- self._response_handler = response_handler
-
- def initial_metadata(self, initial_mdetadata):
- pass
-
- def response(self, response):
- end_time = time.time()
- self._response_handler(end_time - self._send_time_queue.get_nowait())
-
- def complete(self, terminal_metadata, code, details):
- pass
-
-
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
-
- def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
- self._send_time_queue = queue.Queue()
- self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
- self._rendezvous = None
-
- def send_request(self):
- if self._rendezvous is not None:
- self._send_time_queue.put(time.time())
- self._rendezvous.consume(self._request)
-
- def start(self):
- if self._generic:
- stream_callable = self._stub.stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall')
- else:
- stream_callable = self._stub.StreamingCall
- self._rendezvous = stream_callable.event(
- self._receiver, lambda *args: None, _TIMEOUT)
-
- def stop(self):
- self._rendezvous.terminate()
- self._rendezvous = None
diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py
index 3dda718638..16926379a5 100644
--- a/src/python/grpcio/tests/qps/qps_worker.py
+++ b/src/python/grpcio/tests/qps/qps_worker.py
@@ -43,9 +43,7 @@ def run_worker_server(port):
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
- # Drain outstanding requests for clean exit
- time.sleep(2)
- server.stop(0)
+ server.stop(2)
if __name__ == '__main__':
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
index 1f9af5482c..d41f8377c2 100644
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
server, config, qps_data)
- elif config.rpc_type == control_pb2.STREAMING:
- client = benchmark_client.StreamingAsyncBenchmarkClient(
- server, config, qps_data)
+ else:
+ raise Exception('Async streaming client not supported')
else:
raise Exception('Unsupported client type {}'.format(config.client_type))