aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-06-08 11:59:23 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-06-08 11:59:23 -0700
commite15377484d6ad13d862d08bde01e89e785c17c2a (patch)
tree51615701eb79e4465620d1ff5fad9125de8ec929 /src
parent09bf5f458a6be15e4d7737aa232ad33ecdf6891a (diff)
parent9a36e6c7cd4daca929d5d3457edd0060a93030ca (diff)
Merge pull request #6815 from kpayson64/python_perf_fixes
Remove AsyncStreaming client from python qps tests
Diffstat (limited to 'src')
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py88
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py5
-rw-r--r--src/python/grpcio/tests/qps/qps_worker.py4
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py5
4 files changed, 38 insertions, 64 deletions
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index b372ea01ad..e2922347f9 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -82,6 +82,7 @@ class BenchmarkClient:
self._response_callbacks = []
def add_response_callback(self, callback):
+ """callback will be invoked as callback(client, query_time)"""
self._response_callbacks.append(callback)
@abc.abstractmethod
@@ -95,10 +96,10 @@ class BenchmarkClient:
def stop(self):
pass
- def _handle_response(self, query_time):
+ def _handle_response(self, client, query_time):
self._hist.add(query_time * 1e9) # Report times in nanoseconds
for callback in self._response_callbacks:
- callback(query_time)
+ callback(client, query_time)
class UnarySyncBenchmarkClient(BenchmarkClient):
@@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient):
start_time = time.time()
self._stub.UnaryCall(self._request, _TIMEOUT)
end_time = time.time()
- self._handle_response(end_time - start_time)
+ self._handle_response(self, end_time - start_time)
class UnaryAsyncBenchmarkClient(BenchmarkClient):
@@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
def _response_received(self, start_time, resp):
resp.result()
end_time = time.time()
- self._handle_response(end_time - start_time)
+ self._handle_response(self, end_time - start_time)
def stop(self):
self._stub = None
-class StreamingSyncBenchmarkClient(BenchmarkClient):
+class _SyncStream(object):
- def __init__(self, server, config, hist):
- super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
+ def __init__(self, stub, generic, request, handle_response):
+ self._stub = stub
+ self._generic = generic
+ self._request = request
+ self._handle_response = handle_response
self._is_streaming = False
- self._pool = futures.ThreadPoolExecutor(max_workers=1)
- # Use a thread-safe queue to put requests on the stream
self._request_queue = queue.Queue()
self._send_time_queue = queue.Queue()
@@ -158,15 +160,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
def start(self):
self._is_streaming = True
- self._pool.submit(self._request_stream)
-
- def stop(self):
- self._is_streaming = False
- self._pool.shutdown(wait=True)
- self._stub = None
-
- def _request_stream(self):
- self._is_streaming = True
if self._generic:
stream_callable = self._stub.stream_stream(
'grpc.testing.BenchmarkService', 'StreamingCall')
@@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
- end_time = time.time()
- self._handle_response(end_time - self._send_time_queue.get_nowait())
+ self._handle_response(
+ self, time.time() - self._send_time_queue.get_nowait())
+
+ def stop(self):
+ self._is_streaming = False
def _request_generator(self):
while self._is_streaming:
@@ -187,46 +183,28 @@ class StreamingSyncBenchmarkClient(BenchmarkClient):
pass
-class AsyncReceiver(face.ResponseReceiver):
- """Receiver for async stream responses."""
-
- def __init__(self, send_time_queue, response_handler):
- self._send_time_queue = send_time_queue
- self._response_handler = response_handler
-
- def initial_metadata(self, initial_mdetadata):
- pass
-
- def response(self, response):
- end_time = time.time()
- self._response_handler(end_time - self._send_time_queue.get_nowait())
-
- def complete(self, terminal_metadata, code, details):
- pass
-
-
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
+class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
- self._send_time_queue = queue.Queue()
- self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
- self._rendezvous = None
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
+ self._pool = futures.ThreadPoolExecutor(
+ max_workers=config.outstanding_rpcs_per_channel)
+ self._streams = [_SyncStream(self._stub, self._generic,
+ self._request, self._handle_response)
+ for _ in xrange(config.outstanding_rpcs_per_channel)]
+ self._curr_stream = 0
def send_request(self):
- if self._rendezvous is not None:
- self._send_time_queue.put(time.time())
- self._rendezvous.consume(self._request)
+ # Use a round_robin scheduler to determine what stream to send on
+ self._streams[self._curr_stream].send_request()
+ self._curr_stream = (self._curr_stream + 1) % len(self._streams)
def start(self):
- if self._generic:
- stream_callable = self._stub.stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall')
- else:
- stream_callable = self._stub.StreamingCall
- self._rendezvous = stream_callable.event(
- self._receiver, lambda *args: None, _TIMEOUT)
+ for stream in self._streams:
+ self._pool.submit(stream.start)
def stop(self):
- self._rendezvous.terminate()
- self._rendezvous = None
+ for stream in self._streams:
+ stream.stop()
+ self._pool.shutdown(wait=True)
+ self._stub = None
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
index 1ede7d2af1..2d1d981733 100644
--- a/src/python/grpcio/tests/qps/client_runner.py
+++ b/src/python/grpcio/tests/qps/client_runner.py
@@ -98,7 +98,6 @@ class ClosedLoopClientRunner(ClientRunner):
self._client.stop()
self._client = None
- def _send_request(self, response_time):
+ def _send_request(self, client, response_time):
if self._is_running:
- self._client.send_request()
-
+ client.send_request()
diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py
index 3dda718638..16926379a5 100644
--- a/src/python/grpcio/tests/qps/qps_worker.py
+++ b/src/python/grpcio/tests/qps/qps_worker.py
@@ -43,9 +43,7 @@ def run_worker_server(port):
server.add_insecure_port('[::]:{}'.format(port))
server.start()
servicer.wait_for_quit()
- # Drain outstanding requests for clean exit
- time.sleep(2)
- server.stop(0)
+ server.stop(2)
if __name__ == '__main__':
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
index 1f9af5482c..d41f8377c2 100644
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
server, config, qps_data)
- elif config.rpc_type == control_pb2.STREAMING:
- client = benchmark_client.StreamingAsyncBenchmarkClient(
- server, config, qps_data)
+ else:
+ raise Exception('Async streaming client not supported')
else:
raise Exception('Unsupported client type {}'.format(config.client_type))