diff options
author | Craig Tiller <ctiller@google.com> | 2016-07-08 14:01:39 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-07-08 14:01:39 -0700 |
commit | 919175b6194e28476626b235ae4fe02caede0bcf (patch) | |
tree | bb39dc431fd237c3859800e46804a8e1e78623d0 /src/python/grpcio_tests/tests/qps | |
parent | ece4aaf578ac61e07b0a9a30a50b88425cdb7e25 (diff) | |
parent | 32d3fbe284ddd7e90b49cdf72349d661869969ca (diff) |
Merge github.com:grpc/grpc into grand-unified-closures
Diffstat (limited to 'src/python/grpcio_tests/tests/qps')
4 files changed, 48 insertions, 36 deletions
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 080281415d..83b46c914e 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -37,16 +37,23 @@ from concurrent import futures from six.moves import queue import grpc -from grpc.beta import implementations -from grpc.framework.interfaces.face import face from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import services_pb2 from tests.unit import resources -from tests.unit.beta import test_utilities +from tests.unit import test_common _TIMEOUT = 60 * 60 * 24 +class GenericStub(object): + + def __init__(self, channel): + self.UnaryCall = channel.unary_unary( + '/grpc.testing.BenchmarkService/UnaryCall') + self.StreamingCall = channel.stream_stream( + '/grpc.testing.BenchmarkService/StreamingCall') + + class BenchmarkClient: """Benchmark client interface that exposes a non-blocking send_request().""" @@ -54,15 +61,12 @@ class BenchmarkClient: def __init__(self, server, config, hist): # Create the stub - host, port = server.split(':') - port = int(port) if config.HasField('security_params'): - creds = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - host, port, creds, config.security_params.server_host_override) + creds = grpc.ssl_channel_credentials(resources.test_root_certificates()) + channel = test_common.test_secure_channel( + server, creds, config.security_params.server_host_override) else: - channel = implementations.insecure_channel(host, port) + channel = grpc.insecure_channel(server) connected_event = threading.Event() def wait_for_ready(connectivity): @@ -73,7 +77,7 @@ class BenchmarkClient: if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False - self._stub = services_pb2.beta_create_BenchmarkService_stub(channel) + self._stub = services_pb2.BenchmarkServiceStub(channel) payload = messages_pb2.Payload( body='\0' * config.payload_config.simple_params.req_size) self._request = messages_pb2.SimpleRequest( @@ -81,7 +85,7 @@ class BenchmarkClient: response_size=config.payload_config.simple_params.resp_size) else: self._generic = True - self._stub = implementations.generic_stub(channel) + self._stub = GenericStub(channel) self._request = '\0' * config.payload_config.bytebuf_params.req_size self._hist = hist @@ -166,13 +170,8 @@ class _SyncStream(object): def start(self): self._is_streaming = True - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - - response_stream = stream_callable(self._request_generator(), _TIMEOUT) + response_stream = self._stub.StreamingCall( + self._request_generator(), _TIMEOUT) for _ in response_stream: self._handle_response( self, time.time() - self._send_time_queue.get_nowait()) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index 8cbf480d58..2b76b810cd 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -31,7 +31,7 @@ from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import services_pb2 -class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): +class BenchmarkServer(services_pb2.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -44,7 +44,7 @@ class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): +class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 16926379a5..3abf0d08dd 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -32,18 +32,21 @@ import argparse import time +from concurrent import futures +import grpc from src.proto.grpc.testing import services_pb2 from tests.qps import worker_server def run_worker_server(port): + server = grpc.server((), futures.ThreadPoolExecutor(max_workers=5)) servicer = worker_server.WorkerServer() - server = services_pb2.beta_create_WorkerService_server(servicer) + services_pb2.add_WorkerServiceServicer_to_server(servicer, server) server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - server.stop(2) + server.stop(0) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index d41f8377c2..932a1ffe2b 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -32,8 +32,8 @@ import random import threading import time -from grpc.beta import implementations -from grpc.framework.interfaces.face import utilities +from concurrent import futures +import grpc from src.proto.grpc.testing import control_pb2 from src.proto.grpc.testing import services_pb2 from src.proto.grpc.testing import stats_pb2 @@ -45,7 +45,7 @@ from tests.qps import histogram from tests.unit import resources -class WorkerServer(services_pb2.BetaWorkerServiceServicer): +class WorkerServer(services_pb2.WorkerServiceServicer): """Python Worker Server implementation.""" def __init__(self): @@ -65,7 +65,7 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if request.mark.reset: start_time = end_time yield status - server.stop(0) + server.stop(None) def _get_server_status(self, start_time, end_time, port, cores): end_time = time.time() @@ -76,25 +76,35 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) def _create_server(self, config): - if config.server_type == control_pb2.SYNC_SERVER: + if config.async_server_threads == 0: + # This is the default concurrent.futures thread pool size, but + # None doesn't seem to work + server_threads = multiprocessing.cpu_count() * 5 + else: + server_threads = config.async_server_threads + server = grpc.server((), futures.ThreadPoolExecutor( + max_workers=server_threads)) + if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - server = services_pb2.beta_create_BenchmarkService_server(servicer) + services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size servicer = benchmark_server.GenericBenchmarkServer(resp_size) method_implementations = { - ('grpc.testing.BenchmarkService', 'StreamingCall'): - utilities.stream_stream_inline(servicer.StreamingCall), - ('grpc.testing.BenchmarkService', 'UnaryCall'): - utilities.unary_unary_inline(servicer.UnaryCall), + 'StreamingCall': + grpc.stream_stream_rpc_method_handler(servicer.StreamingCall), + 'UnaryCall': + grpc.unary_unary_rpc_method_handler(servicer.UnaryCall), } - server = implementations.server(method_implementations) + handler = grpc.method_handlers_generic_handler( + 'grpc.testing.BenchmarkService', method_implementations) + server.add_generic_rpc_handlers((handler,)) else: raise Exception('Unsupported server type {}'.format(config.server_type)) if config.HasField('security_params'): # Use SSL - server_creds = implementations.ssl_server_credentials([( - resources.private_key(), resources.certificate_chain())]) + server_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) else: port = server.add_insecure_port('[::]:{}'.format(config.port)) |