diff options
author | Ken Payson <kpayson@google.com> | 2017-12-20 11:26:08 -0800 |
---|---|---|
committer | Ken Payson <kpayson@google.com> | 2017-12-20 13:02:00 -0800 |
commit | 3bc8e422dcc171aeda4a883b3d59c8e71d6cf33b (patch) | |
tree | f1a30fa4fc5f05438401012be9350db16e0c0426 /src/python/grpcio_tests/tests | |
parent | cc04915b07cfd5245b43bb91cfa610cd406dc3f6 (diff) |
Disable so_reuseport for Python tests
Diffstat (limited to 'src/python/grpcio_tests/tests')
28 files changed, 93 insertions, 75 deletions
diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index ac31e72409..3cbbb8de33 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -16,12 +16,11 @@ import unittest import grpc -from grpc.framework.foundation import logging_pool from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc -from tests.unit.framework.common import test_constants +from tests.unit import test_common class HealthServicerTest(unittest.TestCase): @@ -35,8 +34,7 @@ class HealthServicerTest(unittest.TestCase): health_pb2.HealthCheckResponse.UNKNOWN) servicer.set('grpc.test.TestServiceNotServing', health_pb2.HealthCheckResponse.NOT_SERVING) - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(server_pool) + self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server) self._server.start() diff --git a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py index 4136739f05..8d464b2d4b 100644 --- a/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_insecure_intraop_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Insecure client-server interoperability as a unit test.""" -from concurrent import futures import unittest import grpc @@ -22,13 +21,14 @@ from src.proto.grpc.testing import test_pb2_grpc from tests.interop import _intraop_test_case from tests.interop import methods from tests.interop import server +from tests.unit import test_common class InsecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), self.server) port = self.server.add_insecure_port('[::]:0') diff --git a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py index 6514d77371..6ce8b3715b 100644 --- a/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py +++ b/src/python/grpcio_tests/tests/interop/_secure_intraop_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Secure client-server interoperability as a unit test.""" -from concurrent import futures import unittest import grpc @@ -22,6 +21,7 @@ from src.proto.grpc.testing import test_pb2_grpc from tests.interop import _intraop_test_case from tests.interop import methods from tests.interop import resources +from tests.unit import test_common _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' @@ -29,7 +29,7 @@ _SERVER_HOST_OVERRIDE = 'foo.test.google.fr' class SecureIntraopTest(_intraop_test_case.IntraopTestCase, unittest.TestCase): def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), self.server) port = self.server.add_secure_port( diff --git a/src/python/grpcio_tests/tests/interop/server.py b/src/python/grpcio_tests/tests/interop/server.py index eeb41a21d2..dd4f5146e9 100644 --- a/src/python/grpcio_tests/tests/interop/server.py +++ b/src/python/grpcio_tests/tests/interop/server.py @@ -23,6 +23,7 @@ from src.proto.grpc.testing import test_pb2_grpc from tests.interop import methods from tests.interop import resources +from tests.unit import test_common _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -38,7 +39,7 @@ def serve(): help='require a secure connection') args = parser.parse_args() - server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + server = test_common.test_server() test_pb2_grpc.add_TestServiceServicer_to_server(methods.TestService(), server) if args.use_tls: diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py index 5b84001aab..8fc539e641 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_python_plugin_test.py @@ -13,7 +13,6 @@ # limitations under the License. import collections -from concurrent import futures import contextlib import distutils.spawn import errno @@ -28,6 +27,7 @@ import unittest from six import moves import grpc +from tests.unit import test_common from tests.unit.framework.common import test_constants import tests.protoc_plugin.protos.payload.test_payload_pb2 as payload_pb2 @@ -155,8 +155,7 @@ def _CreateService(): def HalfDuplexCall(self, request_iter, context): return servicer_methods.HalfDuplexCall(request_iter, context) - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) + server = test_common.test_server() getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) port = server.add_insecure_port('[::]:0') @@ -177,8 +176,7 @@ def _CreateIncompleteService(): class Servicer(getattr(service_pb2_grpc, SERVICER_IDENTIFIER)): pass - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=test_constants.POOL_SIZE)) + server = test_common.test_server() getattr(service_pb2_grpc, ADD_SERVICER_TO_SERVER_IDENTIFIER)(Servicer(), server) port = server.add_insecure_port('[::]:0') diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index 7868cdbfb3..c732e55108 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -13,7 +13,6 @@ # limitations under the License. import abc -from concurrent import futures import contextlib import importlib import os @@ -29,7 +28,7 @@ import six import grpc from grpc_tools import protoc -from tests.unit.framework.common import test_constants +from tests.unit import test_common _MESSAGES_IMPORT = b'import "messages.proto";' _SPLIT_NAMESPACE = b'package grpc_protoc_plugin.invocation_testing.split;' @@ -256,9 +255,7 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): self._protoc() for services_module in self._services_modules(): - server = grpc.server( - futures.ThreadPoolExecutor( - max_workers=test_constants.POOL_SIZE)) + server = test_common.test_server() services_module.add_TestServiceServicer_to_server( _Servicer(self._messages_pb2.Response), server) port = server.add_insecure_port('[::]:0') diff --git a/src/python/grpcio_tests/tests/qps/qps_worker.py b/src/python/grpcio_tests/tests/qps/qps_worker.py index 3e46c0b8c2..54f69db109 100644 --- a/src/python/grpcio_tests/tests/qps/qps_worker.py +++ b/src/python/grpcio_tests/tests/qps/qps_worker.py @@ -16,15 +16,15 @@ import argparse import time -from concurrent import futures import grpc from src.proto.grpc.testing import services_pb2_grpc from tests.qps import worker_server +from tests.unit import test_common def run_worker_server(port): - server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) + server = test_common.test_server() servicer = worker_server.WorkerServer() services_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server) server.add_insecure_port('[::]:{}'.format(port)) diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index adb10cbcec..fef4fb0459 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -28,6 +28,7 @@ from tests.qps import benchmark_server from tests.qps import client_runner from tests.qps import histogram from tests.unit import resources +from tests.unit import test_common class WorkerServer(services_pb2_grpc.WorkerServiceServicer): @@ -68,8 +69,7 @@ class WorkerServer(services_pb2_grpc.WorkerServiceServicer): server_threads = multiprocessing.cpu_count() * 5 else: server_threads = config.async_server_threads - server = grpc.server( - futures.ThreadPoolExecutor(max_workers=server_threads)) + server = test_common.test_server(max_workers=server_threads) if config.server_type == control_pb2.ASYNC_SERVER: servicer = benchmark_server.BenchmarkServer() services_pb2_grpc.add_BenchmarkServiceServicer_to_server(servicer, diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index a86743fa5a..86037e258a 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -16,7 +16,6 @@ import unittest import grpc -from grpc.framework.foundation import logging_pool from grpc_reflection.v1alpha import reflection from grpc_reflection.v1alpha import reflection_pb2 from grpc_reflection.v1alpha import reflection_pb2_grpc @@ -27,7 +26,7 @@ from google.protobuf import descriptor_pb2 from src.proto.grpc.testing import empty_pb2 from src.proto.grpc.testing.proto2 import empty2_extensions_pb2 -from tests.unit.framework.common import test_constants +from tests.unit import test_common _EMPTY_PROTO_FILE_NAME = 'src/proto/grpc/testing/empty.proto' _EMPTY_PROTO_SYMBOL_NAME = 'grpc.testing.Empty' @@ -46,8 +45,7 @@ def _file_descriptor_to_proto(descriptor): class ReflectionServicerTest(unittest.TestCase): def setUp(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(server_pool) + self._server = test_common.test_server() reflection.enable_server_reflection(_SERVICE_NAMES, self._server) port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index c6a0a23549..ebc04a71e0 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -18,11 +18,9 @@ import unittest import grpc from grpc import _channel -from grpc.framework.foundation import logging_pool import six from tests.unit import test_common -from tests.unit.framework.common import test_constants from tests.unit import resources _REQUEST = b'\x00\x00\x00' @@ -55,12 +53,12 @@ def handle_unary_unary(request, servicer_context): class AuthContextTest(unittest.TestCase): def testInsecure(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) handler = grpc.method_handlers_generic_handler('test', { 'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary) }) - server = grpc.server(server_pool, (handler,)) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) port = server.add_insecure_port('[::]:0') server.start() @@ -74,12 +72,12 @@ class AuthContextTest(unittest.TestCase): self.assertDictEqual({}, auth_data[_AUTH_CTX]) def testSecureNoCert(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) handler = grpc.method_handlers_generic_handler('test', { 'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary) }) - server = grpc.server(server_pool, (handler,)) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) port = server.add_secure_port('[::]:0', server_cred) server.start() @@ -101,12 +99,12 @@ class AuthContextTest(unittest.TestCase): }, auth_data[_AUTH_CTX]) def testSecureClientCert(self): - server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) handler = grpc.method_handlers_generic_handler('test', { 'UnaryUnary': grpc.unary_unary_rpc_method_handler(handle_unary_unary) }) - server = grpc.server(server_pool, (handler,)) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) server_cred = grpc.ssl_server_credentials( _SERVER_CERTS, root_certificates=_TEST_ROOT_CERTIFICATES, diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index f8c61270ca..f9eb0011dc 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -83,7 +83,7 @@ class ChannelConnectivityTest(unittest.TestCase): def test_immediately_connectable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) + server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() first_callback = _Callback() @@ -125,7 +125,7 @@ class ChannelConnectivityTest(unittest.TestCase): def test_reachable_then_unreachable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) + server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() callback = _Callback() diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py index bdd2d86169..30b486079c 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py @@ -61,7 +61,7 @@ class ChannelReadyFutureTest(unittest.TestCase): def test_immediately_connectable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) - server = grpc.server(thread_pool) + server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index e576a5aca9..93e599d8f8 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -17,7 +17,6 @@ import unittest import grpc from grpc import _grpcio_metadata -from grpc.framework.foundation import logging_pool from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -72,9 +71,8 @@ class _GenericHandler(grpc.GenericRpcHandler): class CompressionTest(unittest.TestCase): def setUp(self): - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_GenericHandler(),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers((_GenericHandler(),)) self._port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index a8a7175cc7..7df8e2fde6 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -141,7 +141,8 @@ class CancelManyCallsTest(unittest.TestCase): test_constants.THREAD_CONCURRENCY) server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index 96f0f1589b..c5acd36bf2 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -88,7 +88,8 @@ class RpcTest(object): def setUp(self): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 1deb15ba03..c7d19058da 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -112,7 +112,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): def testReadSomeButNotAllResponses(self): server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 4eda685486..33a35ae235 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -60,7 +60,8 @@ class TypeSmokeTest(unittest.TestCase): del completion_queue def testServerUpDown(self): - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) del server def testChannelUpDown(self): @@ -72,7 +73,8 @@ class TypeSmokeTest(unittest.TestCase): b'test plugin name!') def testServerStartNoExplicitShutdown(self): - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) completion_queue = cygrpc.CompletionQueue() server.register_completion_queue(completion_queue) port = server.add_http2_port(b'[::]:0') @@ -82,7 +84,8 @@ class TypeSmokeTest(unittest.TestCase): def testServerStartShutdown(self): completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) server.add_http2_port(b'[::]:0') server.register_completion_queue(completion_queue) server.start() @@ -99,7 +102,8 @@ class ServerClientMixin(object): def setUpMixin(self, server_credentials, client_credentials, host_override): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server = cygrpc.Server( + cygrpc.ChannelArgs([cygrpc.ChannelArg(b'grpc.so_reuseport', 0)])) self.server.register_completion_queue(self.server_completion_queue) if server_credentials: self.port = self.server.add_http2_port(b'[::]:0', diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 62077e7677..14695bc13f 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -15,8 +15,8 @@ import unittest import grpc -from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants _REQUEST = b'' @@ -87,9 +87,8 @@ class _GenericHandler(grpc.GenericRpcHandler): class EmptyMessageTest(unittest.TestCase): def setUp(self): - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_GenericHandler(),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers((_GenericHandler(),)) port = self._server.add_insecure_port('[::]:0') self._server.start() self._channel = grpc.insecure_channel('localhost:%d' % port) diff --git a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py index 7c13dab756..0a0239a63d 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_scenarios.py +++ b/src/python/grpcio_tests/tests/unit/_exit_scenarios.py @@ -168,11 +168,11 @@ if __name__ == '__main__': args = parser.parse_args() if args.scenario == UNSTARTED_SERVER: - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) if args.wait_for_interrupt: time.sleep(WAIT_TIME) elif args.scenario == RUNNING_SERVER: - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() if args.wait_for_interrupt: @@ -187,7 +187,7 @@ if __name__ == '__main__': if args.wait_for_interrupt: time.sleep(WAIT_TIME) elif args.scenario == POLL_CONNECTIVITY: - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:%d' % port) @@ -201,7 +201,7 @@ if __name__ == '__main__': else: handler = GenericHandler() - server = grpc.server(DaemonPool()) + server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) port = server.add_insecure_port('[::]:0') server.add_generic_rpc_handlers((handler,)) server.start() diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index cf875ed7da..2aee298df2 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -22,6 +22,7 @@ from concurrent import futures import grpc from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -304,6 +305,7 @@ class InterceptorTest(unittest.TestCase): self._server = grpc.server( self._server_pool, + options=(('grpc.so_reuseport', 0),), interceptors=(_LoggingInterceptor('s1', self._record), conditional_interceptor, _LoggingInterceptor('s2', self._record),)) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 2a1a49ce74..b46d176d04 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -15,11 +15,10 @@ import itertools import threading import unittest -from concurrent import futures import grpc -from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -191,9 +190,8 @@ class InvocationDefectsTest(unittest.TestCase): def setUp(self): self._control = test_control.PauseFailControl() self._handler = _Handler(self._control) - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(self._server_pool) + self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index cb59cd3769..ec67f99fbc 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -17,7 +17,6 @@ import threading import unittest import grpc -from grpc.framework.foundation import logging_pool from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -186,9 +185,9 @@ class MetadataCodeDetailsTest(unittest.TestCase): def setUp(self): self._servicer = _Servicer() - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_generic_handler(self._servicer),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers( + (_generic_handler(self._servicer),)) port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 557d5273d5..1dd8cadfed 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -18,7 +18,6 @@ import weakref import grpc from grpc import _channel -from grpc.framework.foundation import logging_pool from tests.unit import test_common from tests.unit.framework.common import test_constants @@ -142,9 +141,9 @@ class _GenericHandler(grpc.GenericRpcHandler): class MetadataTest(unittest.TestCase): def setUp(self): - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server( - self._server_pool, handlers=(_GenericHandler(weakref.proxy(self)),)) + self._server = test_common.test_server() + self._server.add_generic_rpc_handlers( + (_GenericHandler(weakref.proxy(self)),)) port = self._server.add_insecure_port('[::]:0') self._server.start() self._channel = grpc.insecure_channel( diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index 53fd1c2ca4..1e0894f638 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests that a channel will reconnect if a connection is dropped""" +import socket import unittest import grpc @@ -38,8 +39,21 @@ class ReconnectTest(unittest.TestCase): 'UnaryUnary': grpc.unary_unary_rpc_method_handler(_handle_unary_unary) }) + # Reserve a port, when we restart the server we want + # to hold onto the port + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + opt = socket.SO_REUSEPORT + except AttributeError: + # SO_REUSEPORT is unavailable on Windows, but SO_REUSEADDR + # allows forcibly re-binding to a port + opt = socket.SO_REUSEADDR + s.setsockopt(socket.SOL_SOCKET, opt, 1) + s.bind(('localhost', 0)) + port = s.getsockname()[1] + server = grpc.server(server_pool, (handler,)) - port = server.add_insecure_port('[::]:0') + server.add_insecure_port('[::]:{}'.format(port)) server.start() channel = grpc.insecure_channel('localhost:%d' % port) multi_callable = channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py index e425a0adfe..df4b129018 100644 --- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py +++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py @@ -139,6 +139,7 @@ class ResourceExhaustedTest(unittest.TestCase): self._server = grpc.server( self._server_pool, handlers=(_GenericHandler(self._trigger),), + options=(('grpc.so_reuseport', 0),), maximum_concurrent_rpcs=test_constants.THREAD_CONCURRENCY) port = self._server.add_insecure_port('[::]:0') self._server.start() diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index 74d8541808..1515a87d93 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -21,6 +21,7 @@ from concurrent import futures import grpc from grpc.framework.foundation import logging_pool +from tests.unit import test_common from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -169,9 +170,8 @@ class RPCTest(unittest.TestCase): def setUp(self): self._control = test_control.PauseFailControl() self._handler = _Handler(self._control) - self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - self._server = grpc.server(self._server_pool) + self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),)) self._server.start() @@ -180,7 +180,6 @@ class RPCTest(unittest.TestCase): def tearDown(self): self._server.stop(None) - self._server_pool.shutdown(wait=True) def testUnrecognizedMethod(self): request = b'abc' diff --git a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py index 005d16ea75..2c513da5d0 100644 --- a/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py +++ b/src/python/grpcio_tests/tests/unit/_server_ssl_cert_config_test.py @@ -40,6 +40,7 @@ from concurrent import futures import grpc from tests.unit import resources +from tests.unit import test_common from tests.testing import _application_common from tests.testing import _server_application from tests.testing.proto import services_pb2_grpc @@ -135,7 +136,7 @@ class _ServerSSLCertReloadTest( raise NotImplementedError() def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() services_pb2_grpc.add_FirstServiceServicer_to_server( _server_application.FirstServiceServicer(), self.server) switch_cert_on_client_num = 10 @@ -407,7 +408,7 @@ class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest): return True def setUp(self): - self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server = test_common.test_server() services_pb2_grpc.add_FirstServiceServicer_to_server( _server_application.FirstServiceServicer(), self.server) self.cert_config_A = grpc.ssl_server_certificate_configuration( diff --git a/src/python/grpcio_tests/tests/unit/test_common.py b/src/python/grpcio_tests/tests/unit/test_common.py index ed71cc996b..6334a32b0e 100644 --- a/src/python/grpcio_tests/tests/unit/test_common.py +++ b/src/python/grpcio_tests/tests/unit/test_common.py @@ -15,6 +15,7 @@ import collections +from concurrent import futures import grpc import six @@ -82,3 +83,13 @@ def test_secure_channel(target, channel_credentials, server_host_override): channel = grpc.secure_channel(target, channel_credentials, ( ('grpc.ssl_target_name_override', server_host_override,),)) return channel + + +def test_server(max_workers=10): + """Creates an insecure grpc server. + + These servers have SO_REUSEPORT disabled to prevent cross-talk. + """ + return grpc.server( + futures.ThreadPoolExecutor(max_workers=max_workers), + options=(('grpc.so_reuseport', 0),)) |