diff options
author | Ken Payson <kpayson@google.com> | 2016-07-06 19:26:09 -0700 |
---|---|---|
committer | Ken Payson <kpayson@google.com> | 2016-07-06 19:26:09 -0700 |
commit | 45c0f2b3051bf1642337e109df57e8031cb654c8 (patch) | |
tree | 6df1970ad7f8c715c06bce4ba554fbb6dec0e078 /src/python/grpcio_tests | |
parent | dcca468abe87f76bcb8fd85973781612742da9df (diff) |
Migrated python performance tests to use GA API
Diffstat (limited to 'src/python/grpcio_tests')
5 files changed, 70 insertions, 36 deletions
diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 080281415d..83b46c914e 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -37,16 +37,23 @@ from concurrent import futures from six.moves import queue import grpc -from grpc.beta import implementations -from grpc.framework.interfaces.face import face from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import services_pb2 from tests.unit import resources -from tests.unit.beta import test_utilities +from tests.unit import test_common _TIMEOUT = 60 * 60 * 24 +class GenericStub(object): + + def __init__(self, channel): + self.UnaryCall = channel.unary_unary( + '/grpc.testing.BenchmarkService/UnaryCall') + self.StreamingCall = channel.stream_stream( + '/grpc.testing.BenchmarkService/StreamingCall') + + class BenchmarkClient: """Benchmark client interface that exposes a non-blocking send_request().""" @@ -54,15 +61,12 @@ class BenchmarkClient: def __init__(self, server, config, hist): # Create the stub - host, port = server.split(':') - port = int(port) if config.HasField('security_params'): - creds = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - host, port, creds, config.security_params.server_host_override) + creds = grpc.ssl_channel_credentials(resources.test_root_certificates()) + channel = test_common.test_secure_channel( + server, creds, config.security_params.server_host_override) else: - channel = implementations.insecure_channel(host, port) + channel = grpc.insecure_channel(server) connected_event = threading.Event() def wait_for_ready(connectivity): @@ -73,7 +77,7 @@ class BenchmarkClient: if config.payload_config.WhichOneof('payload') == 'simple_params': self._generic = False - self._stub = services_pb2.beta_create_BenchmarkService_stub(channel) + self._stub = services_pb2.BenchmarkServiceStub(channel) payload = messages_pb2.Payload( body='\0' * config.payload_config.simple_params.req_size) self._request = messages_pb2.SimpleRequest( @@ -81,7 +85,7 @@ class BenchmarkClient: response_size=config.payload_config.simple_params.resp_size) else: self._generic = True - self._stub = implementations.generic_stub(channel) + self._stub = GenericStub(channel) self._request = '\0' * config.payload_config.bytebuf_params.req_size self._hist = hist @@ -166,13 +170,8 @@ class _SyncStream(object): def start(self): self._is_streaming = True - if self._generic: - stream_callable = self._stub.stream_stream( - 'grpc.testing.BenchmarkService', 'StreamingCall') - else: - stream_callable = self._stub.StreamingCall - - response_stream = stream_callable(self._request_generator(), _TIMEOUT) + response_stream = self._stub.StreamingCall( + self._request_generator(), _TIMEOUT) for _ in response_stream: self._handle_response( self, time.time() - self._send_time_queue.get_nowait()) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_server.py b/src/python/grpcio_tests/tests/qps/benchmark_server.py index 8cbf480d58..2b76b810cd 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_server.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_server.py @@ -31,7 +31,7 @@ from src.proto.grpc.testing import messages_pb2 from src.proto.grpc.testing import services_pb2 -class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): +class BenchmarkServer(services_pb2.BenchmarkServiceServicer): """Synchronous Server implementation for the Benchmark service.""" def UnaryCall(self, request, context): @@ -44,7 +44,7 @@ class BenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): yield messages_pb2.SimpleResponse(payload=payload) -class GenericBenchmarkServer(services_pb2.BetaBenchmarkServiceServicer): +class GenericBenchmarkServer(services_pb2.BenchmarkServiceServicer): """Generic Server implementation for the Benchmark service.""" def __init__(self, resp_size): diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 16926379a5..3abf0d08dd 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -32,18 +32,21 @@ import argparse import time +from concurrent import futures +import grpc from src.proto.grpc.testing import services_pb2 from tests.qps import worker_server def run_worker_server(port): + server = grpc.server((), futures.ThreadPoolExecutor(max_workers=5)) servicer = worker_server.WorkerServer() - server = services_pb2.beta_create_WorkerService_server(servicer) + services_pb2.add_WorkerServiceServicer_to_server(servicer, server) server.add_insecure_port('[::]:{}'.format(port)) server.start() servicer.wait_for_quit() - server.stop(2) + server.stop(0) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index d41f8377c2..932a1ffe2b 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -32,8 +32,8 @@ import random import threading import time -from grpc.beta import implementations -from grpc.framework.interfaces.face import utilities +from concurrent import futures +import grpc from src.proto.grpc.testing import control_pb2 from src.proto.grpc.testing import services_pb2 from src.proto.grpc.testing import stats_pb2 @@ -45,7 +45,7 @@ from tests.qps import histogram from tests.unit import resources -class WorkerServer(services_pb2.BetaWorkerServiceServicer): +class WorkerServer(services_pb2.WorkerServiceServicer): """Python Worker Server implementation.""" def __init__(self): @@ -65,7 +65,7 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): if request.mark.reset: start_time = end_time yield status - server.stop(0) + server.stop(None) def _get_server_status(self, start_time, end_time, port, cores): end_time = time.time() @@ -76,25 +76,35 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer): return control_pb2.ServerStatus(stats=stats, port=port, cores=cores) def _create_server(self, config): - if config.server_type == control_pb2.SYNC_SERVER: + if config.async_server_threads == 0: + # This is the default concurrent.futures thread pool size, but + # None doesn't seem to work + server_threads = multiprocessing.cpu_count() * 5 + else: + server_threads = config.async_server_threads + server = grpc.server((), futures.ThreadPoolExecutor( + max_workers=server_threads)) + if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() - server = services_pb2.beta_create_BenchmarkService_server(servicer) + services_pb2.add_BenchmarkServiceServicer_to_server(servicer, server) elif config.server_type == control_pb2.ASYNC_GENERIC_SERVER: resp_size = config.payload_config.bytebuf_params.resp_size servicer = benchmark_server.GenericBenchmarkServer(resp_size) method_implementations = { - ('grpc.testing.BenchmarkService', 'StreamingCall'): - utilities.stream_stream_inline(servicer.StreamingCall), - ('grpc.testing.BenchmarkService', 'UnaryCall'): - utilities.unary_unary_inline(servicer.UnaryCall), + 'StreamingCall': + grpc.stream_stream_rpc_method_handler(servicer.StreamingCall), + 'UnaryCall': + grpc.unary_unary_rpc_method_handler(servicer.UnaryCall), } - server = implementations.server(method_implementations) + handler = grpc.method_handlers_generic_handler( + 'grpc.testing.BenchmarkService', method_implementations) + server.add_generic_rpc_handlers((handler,)) else: raise Exception('Unsupported server type {}'.format(config.server_type)) if config.HasField('security_params'): # Use SSL - server_creds = implementations.ssl_server_credentials([( - resources.private_key(), resources.certificate_chain())]) + server_creds = grpc.ssl_server_credentials( + ((resources.private_key(), resources.certificate_chain()),)) port = server.add_secure_port('[::]:{}'.format(config.port), server_creds) else: port = server.add_insecure_port('[::]:{}'.format(config.port)) diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py index c8886bf4ca..cd71bd80d7 100644 --- a/src/python/grpcio_tests/tests/unit/test_common.py +++ b/src/python/grpcio_tests/tests/unit/test_common.py @@ -31,6 +31,7 @@ import collections +import grpc import six INVOCATION_INITIAL_METADATA = (('0', 'abc'), ('1', 'def'), ('2', 'ghi'),) @@ -78,3 +79,24 @@ def metadata_transmitted(original_metadata, transmitted_metadata): return False else: return True + + +def test_secure_channel( + target, channel_credentials, server_host_override): + """Creates an insecure Channel to a remote host. + + Args: + host: The name of the remote host to which to connect. + port: The port of the remote host to which to connect. + channel_credentials: The implementations.ChannelCredentials with which to + connect. + server_host_override: The target name used for SSL host name checking. + + Returns: + An implementations.Channel to the remote host through which RPCs may be + conducted. + """ + channel = grpc.secure_channel( + target, channel_credentials, + (('grpc.ssl_target_name_override', server_host_override,),)) + return channel |