From 9a36e6c7cd4daca929d5d3457edd0060a93030ca Mon Sep 17 00:00:00 2001 From: Ken Payson Date: Tue, 7 Jun 2016 17:49:03 -0700 Subject: Changed Python Sync streaming qps to follow spec --- src/python/grpcio/tests/qps/benchmark_client.py | 63 +++++++++++++++++-------- src/python/grpcio/tests/qps/client_runner.py | 5 +- tools/run_tests/performance/scenario_config.py | 2 +- 3 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index aac218ed81..e2922347f9 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -82,6 +82,7 @@ class BenchmarkClient: self._response_callbacks = [] def add_response_callback(self, callback): + """callback will be invoked as callback(client, query_time)""" self._response_callbacks.append(callback) @abc.abstractmethod @@ -95,10 +96,10 @@ class BenchmarkClient: def stop(self): pass - def _handle_response(self, query_time): + def _handle_response(self, client, query_time): self._hist.add(query_time * 1e9) # Report times in nanoseconds for callback in self._response_callbacks: - callback(query_time) + callback(client, query_time) class UnarySyncBenchmarkClient(BenchmarkClient): @@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient): start_time = time.time() self._stub.UnaryCall(self._request, _TIMEOUT) end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) class UnaryAsyncBenchmarkClient(BenchmarkClient): @@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient): def _response_received(self, start_time, resp): resp.result() end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) def stop(self): self._stub = None -class StreamingSyncBenchmarkClient(BenchmarkClient): +class _SyncStream(object): - def __init__(self, server, config, hist): - super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + def __init__(self, stub, generic, request, handle_response): + self._stub = stub + self._generic = generic + self._request = request + self._handle_response = handle_response self._is_streaming = False - self._pool = futures.ThreadPoolExecutor(max_workers=1) - # Use a thread-safe queue to put requests on the stream self._request_queue = queue.Queue() self._send_time_queue = queue.Queue() @@ -157,15 +159,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): self._request_queue.put(self._request) def start(self): - self._is_streaming = True - self._pool.submit(self._request_stream) - - def stop(self): - self._is_streaming = False - self._pool.shutdown(wait=True) - self._stub = None - - def _request_stream(self): self._is_streaming = True if self._generic: stream_callable = self._stub.stream_stream( @@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): response_stream = stream_callable(self._request_generator(), _TIMEOUT) for _ in response_stream: - end_time = time.time() - self._handle_response(end_time - self._send_time_queue.get_nowait()) + self._handle_response( + self, time.time() - self._send_time_queue.get_nowait()) + + def stop(self): + self._is_streaming = False def _request_generator(self): while self._is_streaming: @@ -185,3 +181,30 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): yield request except queue.Empty: pass + + +class StreamingSyncBenchmarkClient(BenchmarkClient): + + def __init__(self, server, config, hist): + super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) + self._streams = [_SyncStream(self._stub, self._generic, + self._request, self._handle_response) + for _ in xrange(config.outstanding_rpcs_per_channel)] + self._curr_stream = 0 + + def send_request(self): + # Use a round_robin scheduler to determine what stream to send on + self._streams[self._curr_stream].send_request() + self._curr_stream = (self._curr_stream + 1) % len(self._streams) + + def start(self): + for stream in self._streams: + self._pool.submit(stream.start) + + def stop(self): + for stream in self._streams: + stream.stop() + self._pool.shutdown(wait=True) + self._stub = None diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index 1ede7d2af1..2d1d981733 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -98,7 +98,6 @@ class ClosedLoopClientRunner(ClientRunner): self._client.stop() self._client = None - def _send_request(self, response_time): + def _send_request(self, client, response_time): if self._is_running: - self._client.send_request() - + client.send_request() diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 81569e8b7a..2d5130e1e8 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -415,7 +415,7 @@ class PythonLanguage: yield _ping_pong_scenario( 'python_protobuf_sync_streaming_qps_unconstrained', rpc_type='STREAMING', client_type='SYNC_CLIENT', server_type='SYNC_SERVER', - unconstrained_client='async') + unconstrained_client='sync') yield _ping_pong_scenario( 'python_to_cpp_protobuf_sync_unary_ping_pong', rpc_type='UNARY', -- cgit v1.2.3