diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-12-26 13:33:06 -0800 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-12-26 13:33:06 -0800 |
commit | 5a6183f1bdf3ff0c9e29fe454fde6e187e33fcc3 (patch) | |
tree | 3ccb35dfefa90df9f615ade0721a565ac579aed2 /src/python/grpcio_tests/tests/unit | |
parent | 31a775b425eac37bb43c301cfb25e1f6a4bde106 (diff) | |
parent | fc7d0911a3a44d7bc926d3db99b7300a0c0f33dc (diff) |
Merge branch 'master' into failhijackedsend
Diffstat (limited to 'src/python/grpcio_tests/tests/unit')
20 files changed, 251 insertions, 23 deletions
diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 4f850220f8..1b462ec67a 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -28,6 +28,7 @@ GRPCIO_TESTS_UNIT = [ # TODO(ghostwriternr): To be added later. # "_server_ssl_cert_config_test.py", "_server_test.py", + "_server_shutdown_test.py", "_session_cache_test.py", ] @@ -50,6 +51,11 @@ py_library( ) py_library( + name = "_server_shutdown_scenarios", + srcs = ["_server_shutdown_scenarios.py"], +) + +py_library( name = "_thread_pool", srcs = ["_thread_pool.py"], ) @@ -70,6 +76,7 @@ py_library( ":resources", ":test_common", ":_exit_scenarios", + ":_server_shutdown_scenarios", ":_thread_pool", ":_from_grpc_import_star", "//src/python/grpcio_tests/tests/unit/framework/common", diff --git a/src/python/grpcio_tests/tests/unit/_api_test.py b/src/python/grpcio_tests/tests/unit/_api_test.py index 427894bfe9..0dc6a8718c 100644 --- a/src/python/grpcio_tests/tests/unit/_api_test.py +++ b/src/python/grpcio_tests/tests/unit/_api_test.py @@ -101,6 +101,7 @@ class ChannelTest(unittest.TestCase): def test_secure_channel(self): channel_credentials = grpc.ssl_channel_credentials() channel = grpc.secure_channel('google.com:443', channel_credentials) + channel.close() if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index b1b5bbdcab..96c4e9ec76 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -71,8 +71,8 @@ class AuthContextTest(unittest.TestCase): port = server.add_insecure_port('[::]:0') server.start() - channel = grpc.insecure_channel('localhost:%d' % port) - response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + with grpc.insecure_channel('localhost:%d' % port) as channel: + response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) server.stop(None) auth_data = pickle.loads(response) @@ -98,6 +98,7 @@ class AuthContextTest(unittest.TestCase): channel_creds, options=_PROPERTY_OPTIONS) response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + channel.close() server.stop(None) auth_data = pickle.loads(response) @@ -132,6 +133,7 @@ class AuthContextTest(unittest.TestCase): options=_PROPERTY_OPTIONS) response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + channel.close() server.stop(None) auth_data = pickle.loads(response) diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index 727fb7d65f..565bd39b3a 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -75,6 +75,8 @@ class ChannelConnectivityTest(unittest.TestCase): channel.unsubscribe(callback.update) fifth_connectivities = callback.connectivities() + channel.close() + self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), first_connectivities) self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities) @@ -108,7 +110,8 @@ class ChannelConnectivityTest(unittest.TestCase): _ready_in_connectivities) second_callback.block_until_connectivities_satisfy( _ready_in_connectivities) - del channel + channel.close() + server.stop(None) self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), first_connectivities) @@ -139,6 +142,7 @@ class ChannelConnectivityTest(unittest.TestCase): callback.block_until_connectivities_satisfy( _last_connectivity_is_not_ready) channel.unsubscribe(callback.update) + channel.close() self.assertFalse(thread_pool.was_used()) diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py index 345460ef40..46a4eb9bb6 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py @@ -60,6 +60,8 @@ class ChannelReadyFutureTest(unittest.TestCase): self.assertTrue(ready_future.done()) self.assertFalse(ready_future.running()) + channel.close() + def test_immediately_connectable_channel_connectivity(self): thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) server = grpc.server(thread_pool, options=(('grpc.so_reuseport', 0),)) @@ -84,6 +86,9 @@ class ChannelReadyFutureTest(unittest.TestCase): self.assertFalse(ready_future.running()) self.assertFalse(thread_pool.was_used()) + channel.close() + server.stop(None) + if __name__ == '__main__': logging.basicConfig() diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 876d8e827e..87884a19dc 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -77,6 +77,9 @@ class CompressionTest(unittest.TestCase): self._port = self._server.add_insecure_port('[::]:0') self._server.start() + def tearDown(self): + self._server.stop(None) + def testUnary(self): request = b'\x00' * 100 @@ -102,6 +105,7 @@ class CompressionTest(unittest.TestCase): response = multi_callable( request, metadata=[('grpc-internal-encoding-request', 'gzip')]) self.assertEqual(request, response) + compressed_channel.close() def testStreaming(self): request = b'\x00' * 100 @@ -115,6 +119,7 @@ class CompressionTest(unittest.TestCase): call = multi_callable(iter([request] * test_constants.STREAM_LENGTH)) for response in call: self.assertEqual(request, response) + compressed_channel.close() if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_empty_message_test.py b/src/python/grpcio_tests/tests/unit/_empty_message_test.py index 3e8393b53c..f27ea422d0 100644 --- a/src/python/grpcio_tests/tests/unit/_empty_message_test.py +++ b/src/python/grpcio_tests/tests/unit/_empty_message_test.py @@ -96,6 +96,7 @@ class EmptyMessageTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testUnaryUnary(self): response = self._channel.unary_unary(_UNARY_UNARY)(_REQUEST) diff --git a/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py b/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py index 6c551df3ec..81de1dae1d 100644 --- a/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py +++ b/src/python/grpcio_tests/tests/unit/_error_message_encoding_test.py @@ -71,6 +71,7 @@ class ErrorMessageEncodingTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testMessageEncoding(self): for message in _UNICODE_ERROR_MESSAGES: diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index ad847ae03e..1ada25382d 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * # pylint: disable=wildcard-import +from grpc import * # pylint: disable=wildcard-import,unused-wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/src/python/grpcio_tests/tests/unit/_interceptor_test.py b/src/python/grpcio_tests/tests/unit/_interceptor_test.py index 99db0ac58b..a647e5e720 100644 --- a/src/python/grpcio_tests/tests/unit/_interceptor_test.py +++ b/src/python/grpcio_tests/tests/unit/_interceptor_test.py @@ -337,6 +337,7 @@ class InterceptorTest(unittest.TestCase): def tearDown(self): self._server.stop(None) self._server_pool.shutdown(wait=True) + self._channel.close() def testTripleRequestMessagesClientInterceptor(self): diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index 0ff49490d5..7ed7c83893 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -62,6 +62,9 @@ class InvalidMetadataTest(unittest.TestCase): self._stream_unary = _stream_unary_multi_callable(self._channel) self._stream_stream = _stream_stream_multi_callable(self._channel) + def tearDown(self): + self._channel.close() + def testUnaryRequestBlockingUnaryResponse(self): request = b'\x07\x08' metadata = (('InVaLiD', 'UnaryRequestBlockingUnaryResponse'),) diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index 00949e2236..e89b521cc5 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -215,6 +215,7 @@ class InvocationDefectsTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testIterableStreamRequestBlockingUnaryResponse(self): requests = [b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)] diff --git a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py index 0dafab827a..a63664ac5d 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_code_details_test.py @@ -198,8 +198,8 @@ class MetadataCodeDetailsTest(unittest.TestCase): port = self._server.add_insecure_port('[::]:0') self._server.start() - channel = grpc.insecure_channel('localhost:{}'.format(port)) - self._unary_unary = channel.unary_unary( + self._channel = grpc.insecure_channel('localhost:{}'.format(port)) + self._unary_unary = self._channel.unary_unary( '/'.join(( '', _SERVICE, @@ -208,17 +208,17 @@ class MetadataCodeDetailsTest(unittest.TestCase): request_serializer=_REQUEST_SERIALIZER, response_deserializer=_RESPONSE_DESERIALIZER, ) - self._unary_stream = channel.unary_stream('/'.join(( + self._unary_stream = self._channel.unary_stream('/'.join(( '', _SERVICE, _UNARY_STREAM, )),) - self._stream_unary = channel.stream_unary('/'.join(( + self._stream_unary = self._channel.stream_unary('/'.join(( '', _SERVICE, _STREAM_UNARY, )),) - self._stream_stream = channel.stream_stream( + self._stream_stream = self._channel.stream_stream( '/'.join(( '', _SERVICE, @@ -228,6 +228,10 @@ class MetadataCodeDetailsTest(unittest.TestCase): response_deserializer=_RESPONSE_DESERIALIZER, ) + def tearDown(self): + self._server.stop(None) + self._channel.close() + def testSuccessfulUnaryUnary(self): self._servicer.set_details(_DETAILS) diff --git a/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py b/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py index 2d352e99d4..7b32b5b5f3 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_flags_test.py @@ -187,13 +187,14 @@ class MetadataFlagsTest(unittest.TestCase): def test_call_wait_for_ready_default(self): for perform_call in _ALL_CALL_CASES: - self.check_connection_does_failfast(perform_call, - create_dummy_channel()) + with create_dummy_channel() as channel: + self.check_connection_does_failfast(perform_call, channel) def test_call_wait_for_ready_disabled(self): for perform_call in _ALL_CALL_CASES: - self.check_connection_does_failfast( - perform_call, create_dummy_channel(), wait_for_ready=False) + with create_dummy_channel() as channel: + self.check_connection_does_failfast( + perform_call, channel, wait_for_ready=False) def test_call_wait_for_ready_enabled(self): # To test the wait mechanism, Python thread is required to make @@ -210,16 +211,16 @@ class MetadataFlagsTest(unittest.TestCase): wg.done() def test_call(perform_call): - try: - channel = grpc.insecure_channel(addr) - channel.subscribe(wait_for_transient_failure) - perform_call(channel, wait_for_ready=True) - except BaseException as e: # pylint: disable=broad-except - # If the call failed, the thread would be destroyed. The channel - # object can be collected before calling the callback, which - # will result in a deadlock. - wg.done() - unhandled_exceptions.put(e, True) + with grpc.insecure_channel(addr) as channel: + try: + channel.subscribe(wait_for_transient_failure) + perform_call(channel, wait_for_ready=True) + except BaseException as e: # pylint: disable=broad-except + # If the call failed, the thread would be destroyed. The + # channel object can be collected before calling the + # callback, which will result in a deadlock. + wg.done() + unhandled_exceptions.put(e, True) test_threads = [] for perform_call in _ALL_CALL_CASES: diff --git a/src/python/grpcio_tests/tests/unit/_metadata_test.py b/src/python/grpcio_tests/tests/unit/_metadata_test.py index 777ab683e3..892df3df08 100644 --- a/src/python/grpcio_tests/tests/unit/_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_metadata_test.py @@ -186,6 +186,7 @@ class MetadataTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testUnaryUnary(self): multi_callable = self._channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index f6d4fcbd0a..d4ea126e2b 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -98,6 +98,8 @@ class ReconnectTest(unittest.TestCase): server.add_insecure_port('[::]:{}'.format(port)) server.start() self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) + server.stop(None) + channel.close() if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py index 4fead8fcd5..517c2d2f97 100644 --- a/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py +++ b/src/python/grpcio_tests/tests/unit/_resource_exhausted_test.py @@ -148,6 +148,7 @@ class ResourceExhaustedTest(unittest.TestCase): def tearDown(self): self._server.stop(0) + self._channel.close() def testUnaryUnary(self): multi_callable = self._channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_rpc_test.py b/src/python/grpcio_tests/tests/unit/_rpc_test.py index a768d6c7c1..a99121cee5 100644 --- a/src/python/grpcio_tests/tests/unit/_rpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_rpc_test.py @@ -193,6 +193,7 @@ class RPCTest(unittest.TestCase): def tearDown(self): self._server.stop(None) + self._channel.close() def testUnrecognizedMethod(self): request = b'abc' diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py new file mode 100644 index 0000000000..1d1fdba11e --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_scenarios.py @@ -0,0 +1,97 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" + +import argparse +import os +import threading +import time +import logging + +import grpc +from tests.unit import test_common + +from concurrent import futures +from six.moves import queue + +WAIT_TIME = 1000 + +REQUEST = b'request' +RESPONSE = b'response' + +SERVER_RAISES_EXCEPTION = 'server_raises_exception' +SERVER_DEALLOCATED = 'server_deallocated' +SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' + +FORK_EXIT = '/test/ForkExit' + + +def fork_and_exit(request, servicer_context): + pid = os.fork() + if pid == 0: + os._exit(0) + return RESPONSE + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == FORK_EXIT: + return grpc.unary_unary_rpc_method_handler(fork_and_exit) + else: + return None + + +def run_server(port_queue): + server = test_common.test_server() + port = server.add_insecure_port('[::]:0') + port_queue.put(port) + server.add_generic_rpc_handlers((GenericHandler(),)) + server.start() + # threading.Event.wait() does not exhibit the bug identified in + # https://github.com/grpc/grpc/issues/17093, sleep instead + time.sleep(WAIT_TIME) + + +def run_test(args): + if args.scenario == SERVER_RAISES_EXCEPTION: + server = test_common.test_server() + server.start() + raise Exception() + elif args.scenario == SERVER_DEALLOCATED: + server = test_common.test_server() + server.start() + server.__del__() + while server._state.stage != grpc._server._ServerStage.STOPPED: + pass + elif args.scenario == SERVER_FORK_CAN_EXIT: + port_queue = queue.Queue() + thread = threading.Thread(target=run_server, args=(port_queue,)) + thread.daemon = True + thread.start() + port = port_queue.get() + channel = grpc.insecure_channel('localhost:%d' % port) + multi_callable = channel.unary_unary(FORK_EXIT) + result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) + os.wait() + else: + raise ValueError('unknown test scenario') + + +if __name__ == '__main__': + logging.basicConfig() + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + args = parser.parse_args() + run_test(args) diff --git a/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py new file mode 100644 index 0000000000..47446d65a5 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_shutdown_test.py @@ -0,0 +1,90 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests clean shutdown of server on various interpreter exit conditions. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import subprocess +import sys +import threading +import unittest +import logging + +from tests.unit import _server_shutdown_scenarios + +SCENARIO_FILE = os.path.abspath( + os.path.join( + os.path.dirname(os.path.realpath(__file__)), + '_server_shutdown_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: # pylint: disable=broad-except + pass + + +atexit.register(cleanup_processes) + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ServerShutdown(unittest.TestCase): + + # Currently we shut down a server (if possible) after the Python server + # instance is garbage collected. This behavior may change in the future. + def test_deallocated_server_stops(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + def test_server_exception_exits(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + @unittest.skipIf(os.name == 'nt', 'fork not supported on windows') + def test_server_fork_can_exit(self): + process = subprocess.Popen( + BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT], + stdout=sys.stdout, + stderr=sys.stderr) + wait(process) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) |