diff options
author | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2016-06-08 11:59:23 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@users.noreply.github.com> | 2016-06-08 11:59:23 -0700 |
commit | e15377484d6ad13d862d08bde01e89e785c17c2a (patch) | |
tree | 51615701eb79e4465620d1ff5fad9125de8ec929 /src/python | |
parent | 09bf5f458a6be15e4d7737aa232ad33ecdf6891a (diff) | |
parent | 9a36e6c7cd4daca929d5d3457edd0060a93030ca (diff) |
Merge pull request #6815 from kpayson64/python_perf_fixes
Remove AsyncStreaming client from python qps tests
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/grpcio/tests/qps/benchmark_client.py | 88 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/client_runner.py | 5 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/qps_worker.py | 4 | ||||
-rw-r--r-- | src/python/grpcio/tests/qps/worker_server.py | 5 |
4 files changed, 38 insertions, 64 deletions
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py index b372ea01ad..e2922347f9 100644 --- a/src/python/grpcio/tests/qps/benchmark_client.py +++ b/src/python/grpcio/tests/qps/benchmark_client.py @@ -82,6 +82,7 @@ class BenchmarkClient: self._response_callbacks = [] def add_response_callback(self, callback): + """callback will be invoked as callback(client, query_time)""" self._response_callbacks.append(callback) @abc.abstractmethod @@ -95,10 +96,10 @@ class BenchmarkClient: def stop(self): pass - def _handle_response(self, query_time): + def _handle_response(self, client, query_time): self._hist.add(query_time * 1e9) # Report times in nanoseconds for callback in self._response_callbacks: - callback(query_time) + callback(client, query_time) class UnarySyncBenchmarkClient(BenchmarkClient): @@ -121,7 +122,7 @@ class UnarySyncBenchmarkClient(BenchmarkClient): start_time = time.time() self._stub.UnaryCall(self._request, _TIMEOUT) end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) class UnaryAsyncBenchmarkClient(BenchmarkClient): @@ -136,19 +137,20 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient): def _response_received(self, start_time, resp): resp.result() end_time = time.time() - self._handle_response(end_time - start_time) + self._handle_response(self, end_time - start_time) def stop(self): self._stub = None -class StreamingSyncBenchmarkClient(BenchmarkClient): +class _SyncStream(object): - def __init__(self, server, config, hist): - super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + def __init__(self, stub, generic, request, handle_response): + self._stub = stub + self._generic = generic + self._request = request + self._handle_response = handle_response self._is_streaming = False - self._pool = futures.ThreadPoolExecutor(max_workers=1) - # Use a thread-safe queue to put requests on the stream self._request_queue = queue.Queue() self._send_time_queue = queue.Queue() @@ -158,15 +160,6 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): def start(self): self._is_streaming = True - self._pool.submit(self._request_stream) - - def stop(self): - self._is_streaming = False - self._pool.shutdown(wait=True) - self._stub = None - - def _request_stream(self): - self._is_streaming = True if self._generic: stream_callable = self._stub.stream_stream( 'grpc.testing.BenchmarkService', 'StreamingCall') @@ -175,8 +168,11 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): response_stream = stream_callable(self._request_generator(), _TIMEOUT) for _ in response_stream: - end_time = time.time() - self._handle_response(end_time - self._send_time_queue.get_nowait()) + self._handle_response( + self, time.time() - self._send_time_queue.get_nowait()) + + def stop(self): + self._is_streaming = False def _request_generator(self): while self._is_streaming: @@ -187,46 +183,28 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): pass -class AsyncReceiver(face.ResponseReceiver): - """Receiver for async stream responses.""" - - def __init__(self, send_time_queue, response_handler): - self._send_time_queue = send_time_queue - self._response_handler = response_handler - - def initial_metadata(self, initial_mdetadata): - pass - - def response(self, response): - end_time = time.time() - self._response_handler(end_time - self._send_time_queue.get_nowait()) - - def complete(self, terminal_metadata, code, details): - pass - - -class StreamingAsyncBenchmarkClient(BenchmarkClient): +class StreamingSyncBenchmarkClient(BenchmarkClient): def __init__(self, server, config, hist): - super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist) - self._send_time_queue = queue.Queue() - self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response) - self._rendezvous = None + super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist) + self._pool = futures.ThreadPoolExecutor( + max_workers=config.outstanding_rpcs_per_channel) + self._streams = [_SyncStream(self._stub, self._generic, + self._request, self._handle_response) + for _ in xrange(config.outstanding_rpcs_per_channel)] + self._curr_stream = 0 def send_request(self): - if self._rendezvous is not None: - self._send_time_queue.put(time.time()) - self._rendezvous.consume(self._request) + # Use a round_robin scheduler to determine what stream to send on + self._streams[self._curr_stream].send_request() + self._curr_stream = (self._curr_stream + 1) % len(self._streams) def start(self): - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - self._rendezvous = stream_callable.event( - self._receiver, lambda *args: None, _TIMEOUT) + for stream in self._streams: + self._pool.submit(stream.start) def stop(self): - self._rendezvous.terminate() - self._rendezvous = None + for stream in self._streams: + stream.stop() + self._pool.shutdown(wait=True) + self._stub = None diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py index 1ede7d2af1..2d1d981733 100644 --- a/src/python/grpcio/tests/qps/client_runner.py +++ b/src/python/grpcio/tests/qps/client_runner.py @@ -98,7 +98,6 @@ class ClosedLoopClientRunner(ClientRunner): self._client.stop() self._client = None - def _send_request(self, response_time): + def _send_request(self, client, response_time): if self._is_running: - self._client.send_request() - + client.send_request() diff --git a/src/python/grpcio/tests/qps/qps_worker.py b/src/python/grpcio/tests/qps/qps_worker.py index 3dda718638..16926379a5 100644 --- a/src/python/grpcio/tests/qps/qps_worker.py +++ b/src/python/grpcio/tests/qps/qps_worker.py @@ -43,9 +43,7 @@ def run_worker_server(port): server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - # Drain outstanding requests for clean exit - time.sleep(2) - server.stop(0) + server.stop(2) if __name__ == '__main__': diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py index 1f9af5482c..d41f8377c2 100644 --- a/src/python/grpcio/tests/qps/worker_server.py +++ b/src/python/grpcio/tests/qps/worker_server.py @@ -153,9 +153,8 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if config.rpc_type == control_pb2.UNARY: client = benchmark_client.UnaryAsyncBenchmarkClient( server, config, qps_data) - elif config.rpc_type == control_pb2.STREAMING: - client = benchmark_client.StreamingAsyncBenchmarkClient( - server, config, qps_data) + else: + raise Exception('Async streaming client not supported') else: raise Exception('Unsupported client type {}'.format(config.client_type)) |