diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-12-26 13:33:06 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-12-26 13:33:06 -0800 |
commit | 5a6183f1bdf3ff0c9e29fe454fde6e187e33fcc3 (patch) | |
tree | 3ccb35dfefa90df9f615ade0721a565ac579aed2 /src/python/grpcio_tests | |
parent | 31a775b425eac37bb43c301cfb25e1f6a4bde106 (diff) | |
parent | fc7d0911a3a44d7bc926d3db99b7300a0c0f33dc (diff) |
Merge branch 'master' into failhijackedsend
Diffstat (limited to 'src/python/grpcio_tests')
35 files changed, 295 insertions, 52 deletions
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 18413abab0..d5327711d3 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -22,7 +22,6 @@ import re import shutil import subprocess import sys -import traceback import setuptools from setuptools.command import build_ext @@ -134,6 +133,7 @@ class TestGevent(setuptools.Command): # This test will stuck while running higher version of gevent 'unit._auth_context_test.AuthContextTest.testSessionResumption', # TODO(https://github.com/grpc/grpc/issues/15411) enable these tests + 'unit._metadata_flags_test', 'unit._exit_test.ExitTest.test_in_flight_unary_unary_call', 'unit._exit_test.ExitTest.test_in_flight_unary_stream_call', 'unit._exit_test.ExitTest.test_in_flight_stream_unary_call', diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index f9cb9d0cec..800b865da6 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -37,13 +37,19 @@ PACKAGE_DIRECTORIES = { } INSTALL_REQUIRES = ( - 'coverage>=4.0', 'enum34>=1.0.4', + 'coverage>=4.0', + 'enum34>=1.0.4', 'grpcio>={version}'.format(version=grpc_version.VERSION), - 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION), + # TODO(https://github.com/pypa/warehouse/issues/5196) + # Re-enable it once we got the name back + # 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION), 'grpcio-status>={version}'.format(version=grpc_version.VERSION), 'grpcio-tools>={version}'.format(version=grpc_version.VERSION), 'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION), - 'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0', + 'oauth2client>=1.4.7', + 'protobuf>=3.6.0', + 'six>=1.10', + 'google-auth>=1.0.0', 'requests>=2.14.2') if not PY3: diff --git a/src/python/grpcio_tests/tests/_runner.py b/src/python/grpcio_tests/tests/_runner.py index eaaa027e61..9ef0f17684 100644 --- a/src/python/grpcio_tests/tests/_runner.py +++ b/src/python/grpcio_tests/tests/_runner.py @@ -203,7 +203,7 @@ class Runner(object): check_kill_self() time.sleep(0) case_thread.join() - except: + except: # pylint: disable=try-except-raise # re-raise the exception after forcing the with-block to end raise result.set_output(augmented_case.case, stdout_pipe.output(), diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py index 8ca5189522..c63ff5cd84 100644 --- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -88,11 +88,10 @@ def _generate_channel_server_pairs(n): def _close_channel_server_pairs(pairs): for pair in pairs: pair.server.stop(None) - # TODO(ericgribkoff) This del should not be required - del pair.server pair.channel.close() +@unittest.skip('https://github.com/pypa/warehouse/issues/5196') class ChannelzServicerTest(unittest.TestCase): def _send_successful_unary_unary(self, idx): diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index 350b5eebe5..c1d9436c2f 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -39,8 +39,12 @@ class HealthServicerTest(unittest.TestCase): health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server) self._server.start() - channel = grpc.insecure_channel('localhost:%d' % port) - self._stub = health_pb2_grpc.HealthStub(channel) + self._channel = grpc.insecure_channel('localhost:%d' % port) + self._stub = health_pb2_grpc.HealthStub(self._channel) + + def tearDown(self): + self._server.stop(None) + self._channel.close() def test_empty_service(self): request = health_pb2.HealthCheckRequest() diff --git a/src/python/grpcio_tests/tests/interop/client.py b/src/python/grpcio_tests/tests/interop/client.py index 698c37017f..56cb29477c 100644 --- a/src/python/grpcio_tests/tests/interop/client.py +++ b/src/python/grpcio_tests/tests/interop/client.py @@ -54,7 +54,6 @@ def _args(): help='replace platform root CAs with ca.pem') parser.add_argument( '--server_host_override', - default="foo.test.google.fr", type=str, help='the server host to which to claim to connect') parser.add_argument( @@ -100,10 +99,13 @@ def _stub(args): channel_credentials = grpc.composite_channel_credentials( channel_credentials, call_credentials) - channel = grpc.secure_channel(target, channel_credentials, (( - 'grpc.ssl_target_name_override', - args.server_host_override, - ),)) + channel_opts = None + if args.server_host_override: + channel_opts = (( + 'grpc.ssl_target_name_override', + args.server_host_override, + ),) + channel = grpc.secure_channel(target, channel_credentials, channel_opts) else: channel = grpc.insecure_channel(target) if args.test_case == "unimplemented_service": diff --git a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py index e21ea0010a..2b735526cb 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/_split_definitions_test.py @@ -144,7 +144,7 @@ class _ProtoBeforeGrpcProtocStyle(object): absolute_proto_file_names) pb2_grpc_protoc_exit_code = _protoc( proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) - return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code, + return pb2_protoc_exit_code, pb2_grpc_protoc_exit_code class _GrpcBeforeProtoProtocStyle(object): @@ -160,7 +160,7 @@ class _GrpcBeforeProtoProtocStyle(object): proto_path, None, 'grpc_2_0', python_out, absolute_proto_file_names) pb2_protoc_exit_code = _protoc(proto_path, python_out, None, None, absolute_proto_file_names) - return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code, + return pb2_grpc_protoc_exit_code, pb2_protoc_exit_code _PROTOC_STYLES = ( @@ -243,9 +243,9 @@ class _Test(six.with_metaclass(abc.ABCMeta, unittest.TestCase)): def _services_modules(self): if self.PROTOC_STYLE.grpc_in_pb2_expected(): - return self._services_pb2, self._services_pb2_grpc, + return self._services_pb2, self._services_pb2_grpc else: - return self._services_pb2_grpc, + return (self._services_pb2_grpc,) def test_imported_attributes(self): self._protoc() diff --git a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py index b46e53315e..43c90af6a7 100644 --- a/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py +++ b/src/python/grpcio_tests/tests/protoc_plugin/beta_python_plugin_test.py @@ -223,7 +223,7 @@ def _CreateService(payload_pb2, responses_pb2, service_pb2): server.start() channel = implementations.insecure_channel('localhost', port) stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel) - yield servicer_methods, stub, + yield servicer_methods, stub server.stop(0) diff --git a/src/python/grpcio_tests/tests/qps/benchmark_client.py b/src/python/grpcio_tests/tests/qps/benchmark_client.py index 0488450740..fac0e44e5a 100644 --- a/src/python/grpcio_tests/tests/qps/benchmark_client.py +++ b/src/python/grpcio_tests/tests/qps/benchmark_client.py @@ -180,7 +180,7 @@ class StreamingSyncBenchmarkClient(BenchmarkClient): self._streams = [ _SyncStream(self._stub, self._generic, self._request, self._handle_response) - for _ in xrange(config.outstanding_rpcs_per_channel) + for _ in range(config.outstanding_rpcs_per_channel) ] self._curr_stream = 0 diff --git a/src/python/grpcio_tests/tests/qps/client_runner.py b/src/python/grpcio_tests/tests/qps/client_runner.py index e79abab3c7..a57524c74e 100644 --- a/src/python/grpcio_tests/tests/qps/client_runner.py +++ b/src/python/grpcio_tests/tests/qps/client_runner.py @@ -77,7 +77,7 @@ class ClosedLoopClientRunner(ClientRunner): def start(self): self._is_running = True self._client.start() - for _ in xrange(self._request_count): + for _ in range(self._request_count): self._client.send_request() def stop(self): diff --git a/src/python/grpcio_tests/tests/qps/worker_server.py b/src/python/grpcio_tests/tests/qps/worker_server.py index 337a94b546..a03367ec63 100644 --- a/src/python/grpcio_tests/tests/qps/worker_server.py +++ b/src/python/grpcio_tests/tests/qps/worker_server.py @@ -109,7 +109,7 @@ class WorkerServer(worker_service_pb2_grpc.WorkerServiceServicer): start_time = time.time() # Create a client for each channel - for i in xrange(config.client_channels): + for i in range(config.client_channels): server = config.server_targets[i % len(config.server_targets)] runner = self._create_client_runner(server, config, qps_data) client_runners.append(runner) diff --git a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py index bcd9e14a38..560f6d3ddb 100644 --- a/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py +++ b/src/python/grpcio_tests/tests/reflection/_reflection_servicer_test.py @@ -56,8 +56,12 @@ class ReflectionServicerTest(unittest.TestCase): port = self._server.add_insecure_port('[::]:0') self._server.start() - channel = grpc.insecure_channel('localhost:%d' % port) - self._stub = reflection_pb2_grpc.ServerReflectionStub(channel) + self._channel = grpc.insecure_channel('localhost:%d' % port) + self._stub = reflection_pb2_grpc.ServerReflectionStub(self._channel) + + def tearDown(self): + self._server.stop(None) + self._channel.close() def testFileByName(self): requests = ( diff --git a/src/python/grpcio_tests/tests/stress/client.py b/src/python/grpcio_tests/tests/stress/client.py index 41f2e1b6c2..4c35b05044 100644 --- a/src/python/grpcio_tests/tests/stress/client.py +++ b/src/python/grpcio_tests/tests/stress/client.py @@ -71,7 +71,6 @@ def _args(): '--use_tls', help='Whether to use TLS', default=False, type=bool) parser.add_argument( '--server_host_override', - default="foo.test.google.fr", help='the server host to which to claim to connect', type=str) return parser.parse_args() @@ -132,9 +131,9 @@ def run_test(args): server.start() for test_server_target in test_server_targets: - for _ in xrange(args.num_channels_per_server): + for _ in range(args.num_channels_per_server): channel = _get_channel(test_server_target, args) - for _ in xrange(args.num_stubs_per_channel): + for _ in range(args.num_stubs_per_channel): stub = test_pb2_grpc.TestServiceStub(channel) runner = test_runner.TestRunner(stub, test_cases, hist, exception_queue, stop_event) diff --git a/src/python/grpcio_tests/tests/testing/_client_application.py b/src/python/grpcio_tests/tests/testing/_client_application.py index 3ddeba2373..4d42df0389 100644 --- a/src/python/grpcio_tests/tests/testing/_client_application.py +++ b/src/python/grpcio_tests/tests/testing/_client_application.py @@ -130,9 +130,9 @@ def _run_stream_stream(stub): request_pipe = _Pipe() response_iterator = stub.StreStre(iter(request_pipe)) request_pipe.add(_application_common.STREAM_STREAM_REQUEST) - first_responses = next(response_iterator), next(response_iterator), + first_responses = next(response_iterator), next(response_iterator) request_pipe.add(_application_common.STREAM_STREAM_REQUEST) - second_responses = next(response_iterator), next(response_iterator), + second_responses = next(response_iterator), next(response_iterator) request_pipe.close() try: next(response_iterator) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index b27e6f2693..f202a3f932 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -57,6 +57,7 @@ "unit._reconnect_test.ReconnectTest", "unit._resource_exhausted_test.ResourceExhaustedTest", "unit._rpc_test.RPCTest", + "unit._server_shutdown_test.ServerShutdown", "unit._server_ssl_cert_config_test.ServerSSLCertConfigFetcherParamsChecks", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 4f850220f8..1b462ec67a 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -28,6 +28,7 @@ GRPCIO_TESTS_UNIT = [ # TODO(ghostwriternr): To be added later. # "_server_ssl_cert_config_test.py", "_server_test.py", + "_server_shutdown_test.py", "_session_cache_test.py", ] @@ -50,6 +51,11 @@ py_library( ) py_library( + name = "_server_shutdown_scenarios", + srcs = ["_server_shutdown_scenarios.py"], +) + +py_library( name = "_thread_pool", srcs = ["_thread_pool.py"], ) @@ -70,6 +76,7 @@ py_library( ":resources", ":test_common", ":_exit_scenarios", + ":_server_shutdown_scenarios", ":_thread_pool", ":_from_grpc_import_star", "//src/python/grpcio_tests/tests/unit/framework/common", diff --git a/src/python/grpcio_tests/tests/unit/_api_test.py b/src/python/grpcio_tests/tests/unit/_api_test.py index 427894bfe9..0dc6a8718c 100644 --- a/src/python/grpcio_tests/tests/unit/_api_test.py +++ b/src/python/grpcio_tests/tests/unit/_api_test.py @@ -101,6 +101,7 @@ class ChannelTest(unittest.TestCase): def test_secure_channel(self): channel_credentials = grpc.ssl_channel_credentials() channel = grpc.secure_channel('google.com:443', channel_credentials) + channel.close() if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index b1b5bbdcab..96c4e9ec76 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -71,8 +71,8 @@ class AuthContextTest(unittest.TestCase): port = server.add_insecure_port('[::]:0') server.start() - channel = grpc.insecure_channel('localhost:%d' % port) - response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + with grpc.insecure_channel('localhost:%d' % port) as channel: + response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) server.stop(None) auth_data = pickle.loads(response) @@ -98,6 +98,7 @@ class AuthContextTest(unittest.TestCase): channel_creds, options=_PROPERTY_OPTIONS) response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + channel.close() server.stop(None) auth_data = pickle.loads(response) @@ -132,6 +133,7 @@ class AuthContextTest(unittest.TestCase): options=_PROPERTY_OPTIONS) response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + channel.close() server.stop(None) auth_data = pickle.loads(response) diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index 727fb7d65f..565bd39b3a 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -75,6 +75,8 @@ class ChannelConnectivityTest(unittest.TestCase): channel.unsubscribe(callback.update) fifth_connectivities = callback.connectivities() + channel.close() + self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), first_connectivities) self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities) @@ -108,7 +110,8 @@ class ChannelConnectivityTest(unittest.TestCase): _ready_in_connectivities) second_callback.block_until_connectivities_satisfy( _ready_in_connectivities) - del channel + channel.close() + server.stop(None) self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), first_connectivities) @@ -139,6 +142,7 @@ class ChannelConnectivityTest(unittest.TestCase): callback.block_until_connectivities_satisfy( _last_connectivity_is_not_ready) channel.unsubscribe(callback.update) + channel.close() self.assertFalse(thread_pool.was_used()) diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py index 345460ef40..46a4eb9bb6 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py @@ -60,6 +60,8 @@ class ChannelReadyFutureTest(unittest.TestCase): self.assertTrue(ready_future.done()) self.assertFalse(ready_future.running()) + channel.close() + def test_immediately_connectable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) @@ -84,6 +86,9 @@ class ChannelReadyFutureTest(unittest.TestCase): self.assertFalse(ready_future.running()) self.assertFalse(thread_pool.was_used()) + channel.close() + server.stop(None) + if __name__ == '__main__': logging.basicConfig() diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 876d8e827e..87884a19dc 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -77,6 +77,9 @@ class CompressionTest(unittest.TestCase): self._port = self._server.add_insecure_port('[::]:0') self._server.start() + def tearDown(self): + self._server.stop(None) + def testUnary(self): request = b'\x00' * 100 @@ -102,6 +105,7 @@ class CompressionTest(unittest.TestCase): response = multi_callable( request, metadata=[('grpc-internal-encoding-request', 'gzip')]) self.assertEqual(request, response) + compressed_channel.close() def testStreaming(self): request = b'\x00' * 100 @@ -115,6 +119,7 @@ class CompressionTest(unittest.TestCase): call = multi_callable(iter([request] * test_constants.STREAM_LENGTH)) for response in call: self.assertEqual(request, response) + compressed_channel.close() if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 3e8393b53c..f27ea422d0 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -96,6 +96,7 @@ class EmptyMessageTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testUnaryUnary(self): response = self._channel.unary_unary(_UNARY_UNARY)(_REQUEST) diff --git a/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py b/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py index 6c551df3ec..81de1dae1d 100644 --- a/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py +++ b/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py @@ -71,6 +71,7 @@ class ErrorMessageEncodingTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testMessageEncoding(self): for message in _UNICODE_ERROR_MESSAGES: diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index ad847ae03e..1ada25382d 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * # pylint: disable=wildcard-import +from grpc import * # pylint: disable=wildcard-import,unused-wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index 99db0ac58b..a647e5e720 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -337,6 +337,7 @@ class InterceptorTest(unittest.TestCase): def tearDown(self): self._server.stop(None) self._server_pool.shutdown(wait=True) + self._channel.close() def testTripleRequestMessagesClientInterceptor(self): diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index 0ff49490d5..7ed7c83893 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -62,6 +62,9 @@ class InvalidMetadataTest(unittest.TestCase): self._stream_unary = _stream_unary_multi_callable(self._channel) self._stream_stream = _stream_stream_multi_callable(self._channel) + def tearDown(self): + self._channel.close() + def testUnaryRequestBlockingUnaryResponse(self): request = b'\x07\x08' metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponse'),) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 00949e2236..e89b521cc5 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -215,6 +215,7 @@ class InvocationDefectsTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testIterableStreamRequestBlockingUnaryResponse(self): requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index 0dafab827a..a63664ac5d 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -198,8 +198,8 @@ class MetadataCodeDetailsTest(unittest.TestCase): port = self._server.add_insecure_port('[::]:0') self._server.start() - channel = grpc.insecure_channel('localhost:{}'.format(port)) - self._unary_unary = channel.unary_unary( + self._channel = grpc.insecure_channel('localhost:{}'.format(port)) + self._unary_unary = self._channel.unary_unary( '/'.join(( '', _SERVICE, @@ -208,17 +208,17 @@ class MetadataCodeDetailsTest(unittest.TestCase): request_serializer=_REQUEST_SERIALIZER, response_deserializer=_RESPONSE_DESERIALIZER, ) - self._unary_stream = channel.unary_stream('/'.join(( + self._unary_stream = self._channel.unary_stream('/'.join(( '', _SERVICE, _UNARY_STREAM, )),) - self._stream_unary = channel.stream_unary('/'.join(( + self._stream_unary = self._channel.stream_unary('/'.join(( '', _SERVICE, _STREAM_UNARY, )),) - self._stream_stream = channel.stream_stream( + self._stream_stream = self._channel.stream_stream( '/'.join(( '', _SERVICE, @@ -228,6 +228,10 @@ class MetadataCodeDetailsTest(unittest.TestCase): response_deserializer=_RESPONSE_DESERIALIZER, ) + def tearDown(self): + self._server.stop(None) + self._channel.close() + def testSuccessfulUnaryUnary(self): self._servicer.set_details(_DETAILS) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py b/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py index 2d352e99d4..7b32b5b5f3 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py @@ -187,13 +187,14 @@ class MetadataFlagsTest(unittest.TestCase): def test_call_wait_for_ready_default(self): for perform_call in _ALL_CALL_CASES: - self.check_connection_does_failfast(perform_call, - create_dummy_channel()) + with create_dummy_channel() as channel: + self.check_connection_does_failfast(perform_call, channel) def test_call_wait_for_ready_disabled(self): for perform_call in _ALL_CALL_CASES: - self.check_connection_does_failfast( - perform_call, create_dummy_channel(), wait_for_ready=False) + with create_dummy_channel() as channel: + self.check_connection_does_failfast( + perform_call, channel, wait_for_ready=False) def test_call_wait_for_ready_enabled(self): # To test the wait mechanism, Python thread is required to make @@ -210,16 +211,16 @@ class MetadataFlagsTest(unittest.TestCase): wg.done() def test_call(perform_call): - try: - channel = grpc.insecure_channel(addr) - channel.subscribe(wait_for_transient_failure) - perform_call(channel, wait_for_ready=True) - except BaseException as e: # pylint: disable=broad-except - # If the call failed, the thread would be destroyed. The channel - # object can be collected before calling the callback, which - # will result in a deadlock. - wg.done() - unhandled_exceptions.put(e, True) + with grpc.insecure_channel(addr) as channel: + try: + channel.subscribe(wait_for_transient_failure) + perform_call(channel, wait_for_ready=True) + except BaseException as e: # pylint: disable=broad-except + # If the call failed, the thread would be destroyed. The + # channel object can be collected before calling the + # callback, which will result in a deadlock. + wg.done() + unhandled_exceptions.put(e, True) test_threads = [] for perform_call in _ALL_CALL_CASES: diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 777ab683e3..892df3df08 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -186,6 +186,7 @@ class MetadataTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testUnaryUnary(self): multi_callable = self._channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index f6d4fcbd0a..d4ea126e2b 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -98,6 +98,8 @@ class ReconnectTest(unittest.TestCase): server.add_insecure_port('[::]:{}'.format(port)) server.start() self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) + server.stop(None) + channel.close() if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py index 4fead8fcd5..517c2d2f97 100644 --- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py +++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py @@ -148,6 +148,7 @@ class ResourceExhaustedTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testUnaryUnary(self): multi_callable = self._channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index a768d6c7c1..a99121cee5 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -193,6 +193,7 @@ class RPCTest(unittest.TestCase): def tearDown(self): self._server.stop(None) + self._channel.close() def testUnrecognizedMethod(self): request = b'abc' diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py new file mode 100644 index 0000000000..1d1fdba11e --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -0,0 +1,97 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" + +import argparse +import os +import threading +import time +import logging + +import grpc +from tests.unit import test_common + +from concurrent import futures +from six.moves import queue + +WAIT_TIME = 1000 + +REQUEST = b'request' +RESPONSE = b'response' + +SERVER_RAISES_EXCEPTION = 'server_raises_exception' +SERVER_DEALLOCATED = 'server_deallocated' +SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' + +FORK_EXIT = '/test/ForkExit' + + +def fork_and_exit(request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == FORK_EXIT: + return grpc.unary_unary_rpc_method_handler(fork_and_exit) + else: + return None + + +def run_server(port_queue): + server = test_common.test_server() + port = server.add_insecure_port('[::]:0') + port_queue.put(port) + server.add_generic_rpc_handlers((GenericHandler(),)) + server.start() + # threading.Event.wait() does not exhibit the bug identified in + # https://github.com/grpc/grpc/issues/17093, sleep instead + time.sleep(WAIT_TIME) + + +def run_test(args): + if args.scenario == SERVER_RAISES_EXCEPTION: + server = test_common.test_server() + server.start() + raise Exception() + elif args.scenario == SERVER_DEALLOCATED: + server = test_common.test_server() + server.start() + server.__del__() + while server._state.stage != grpc._server._ServerStage.STOPPED: + pass + elif args.scenario == SERVER_FORK_CAN_EXIT: + port_queue = queue.Queue() + thread = threading.Thread(target=run_server, args=(port_queue,)) + thread.daemon = True + thread.start() + port = port_queue.get() + channel = grpc.insecure_channel('localhost:%d' % port) + multi_callable = channel.unary_unary(FORK_EXIT) + result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) + os.wait() + else: + raise ValueError('unknown test scenario') + + +if __name__ == '__main__': + logging.basicConfig() + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + args = parser.parse_args() + run_test(args) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py new file mode 100644 index 0000000000..47446d65a5 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -0,0 +1,90 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests clean shutdown of server on various interpreter exit conditions. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import subprocess +import sys +import threading +import unittest +import logging + +from tests.unit import _server_shutdown_scenarios + +SCENARIO_FILE = os.path.abspath( + os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '_server_shutdown_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: # pylint: disable=broad-except + pass + + +atexit.register(cleanup_processes) + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ServerShutdown(unittest.TestCase): + + # Currently we shut down a server (if possible) after the Python server + # instance is garbage collected. This behavior may change in the future. + def test_deallocated_server_stops(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_exception_exits(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + @unittest.skipIf(os.name == 'nt', 'fork not supported on windows') + def test_server_fork_can_exit(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) |