diff options
Diffstat (limited to 'src/python/grpcio_tests')
-rw-r--r-- | src/python/grpcio_tests/commands.py | 25 | ||||
-rw-r--r-- | src/python/grpcio_tests/grpc_version.py | 2 | ||||
-rw-r--r-- | src/python/grpcio_tests/setup.py | 1 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/fork/__init__.py | 13 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/fork/client.py | 76 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/fork/methods.py | 445 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/stress/test_runner.py | 2 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/tests.json | 2 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/unit/_channel_args_test.py | 5 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/unit/_cython/_fork_test.py | 68 |
10 files changed, 636 insertions, 3 deletions
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index a23c980017..0dfbf3180b 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -202,3 +202,28 @@ class RunInterop(test.test): from tests.interop import client sys.argv[1:] = self.args.split() client.test_interoperability() + + +class RunFork(test.test): + + description = 'run fork test client' + user_options = [('args=', 'a', 'pass-thru arguments for the client')] + + def initialize_options(self): + self.args = '' + + def finalize_options(self): + # distutils requires this override. + pass + + def run(self): + if self.distribution.install_requires: + self.distribution.fetch_build_eggs( + self.distribution.install_requires) + if self.distribution.tests_require: + self.distribution.fetch_build_eggs(self.distribution.tests_require) + # We import here to ensure that our setuptools parent has had a chance to + # edit the Python system path. + from tests.fork import client + sys.argv[1:] = self.args.split() + client.test_fork() diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index b2cf129e4f..f4b8a34a46 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.15.0.dev0' +VERSION = '1.16.0.dev0' diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index a94c0963ec..61c98fa038 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -52,6 +52,7 @@ COMMAND_CLASS = { 'preprocess': commands.GatherProto, 'build_package_protos': grpc_tools.command.BuildPackageProtos, 'build_py': commands.BuildPy, + 'run_fork': commands.RunFork, 'run_interop': commands.RunInterop, 'test_lite': commands.TestLite, 'test_gevent': commands.TestGevent, diff --git a/src/python/grpcio_tests/tests/fork/__init__.py b/src/python/grpcio_tests/tests/fork/__init__.py new file mode 100644 index 0000000000..9a26bac010 --- /dev/null +++ b/src/python/grpcio_tests/tests/fork/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests/fork/client.py b/src/python/grpcio_tests/tests/fork/client.py new file mode 100644 index 0000000000..9a32629ed5 --- /dev/null +++ b/src/python/grpcio_tests/tests/fork/client.py @@ -0,0 +1,76 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python implementation of the GRPC interoperability test client.""" + +import argparse +import logging +import sys + +from tests.fork import methods + + +def _args(): + + def parse_bool(value): + if value == 'true': + return True + if value == 'false': + return False + raise argparse.ArgumentTypeError('Only true/false allowed') + + parser = argparse.ArgumentParser() + parser.add_argument( + '--server_host', + default="localhost", + type=str, + help='the host to which to connect') + parser.add_argument( + '--server_port', + type=int, + required=True, + help='the port to which to connect') + parser.add_argument( + '--test_case', + default='large_unary', + type=str, + help='the test case to execute') + parser.add_argument( + '--use_tls', + default=False, + type=parse_bool, + help='require a secure connection') + return parser.parse_args() + + +def _test_case_from_arg(test_case_arg): + for test_case in methods.TestCase: + if test_case_arg == test_case.value: + return test_case + else: + raise ValueError('No test case "%s"!' % test_case_arg) + + +def test_fork(): + logging.basicConfig(level=logging.INFO) + args = _args() + if args.test_case == "all": + for test_case in methods.TestCase: + test_case.run_test(args) + else: + test_case = _test_case_from_arg(args.test_case) + test_case.run_test(args) + + +if __name__ == '__main__': + test_fork() diff --git a/src/python/grpcio_tests/tests/fork/methods.py b/src/python/grpcio_tests/tests/fork/methods.py new file mode 100644 index 0000000000..889ef13cb2 --- /dev/null +++ b/src/python/grpcio_tests/tests/fork/methods.py @@ -0,0 +1,445 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementations of fork support test methods.""" + +import enum +import json +import logging +import multiprocessing +import os +import threading +import time + +import grpc + +from six.moves import queue + +from src.proto.grpc.testing import empty_pb2 +from src.proto.grpc.testing import messages_pb2 +from src.proto.grpc.testing import test_pb2_grpc + +_LOGGER = logging.getLogger(__name__) + + +def _channel(args): + target = '{}:{}'.format(args.server_host, args.server_port) + if args.use_tls: + channel_credentials = grpc.ssl_channel_credentials() + channel = grpc.secure_channel(target, channel_credentials) + else: + channel = grpc.insecure_channel(target) + return channel + + +def _validate_payload_type_and_length(response, expected_type, expected_length): + if response.payload.type is not expected_type: + raise ValueError('expected payload type %s, got %s' % + (expected_type, type(response.payload.type))) + elif len(response.payload.body) != expected_length: + raise ValueError('expected payload body size %d, got %d' % + (expected_length, len(response.payload.body))) + + +def _async_unary(stub): + size = 314159 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=size, + payload=messages_pb2.Payload(body=b'\x00' * 271828)) + response_future = stub.UnaryCall.future(request) + response = response_future.result() + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) + + +def _blocking_unary(stub): + size = 314159 + request = messages_pb2.SimpleRequest( + response_type=messages_pb2.COMPRESSABLE, + response_size=size, + payload=messages_pb2.Payload(body=b'\x00' * 271828)) + response = stub.UnaryCall(request) + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size) + + +class _Pipe(object): + + def __init__(self): + self._condition = threading.Condition() + self._values = [] + self._open = True + + def __iter__(self): + return self + + def __next__(self): + return self.next() + + def next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + +class _ChildProcess(object): + + def __init__(self, task, args=None): + if args is None: + args = () + self._exceptions = multiprocessing.Queue() + + def record_exceptions(): + try: + task(*args) + except Exception as e: # pylint: disable=broad-except + self._exceptions.put(e) + + self._process = multiprocessing.Process(target=record_exceptions) + + def start(self): + self._process.start() + + def finish(self): + self._process.join() + if self._process.exitcode != 0: + raise ValueError('Child process failed with exitcode %d' % + self._process.exitcode) + try: + exception = self._exceptions.get(block=False) + raise ValueError('Child process failed: %s' % exception) + except queue.Empty: + pass + + +def _async_unary_same_channel(channel): + + def child_target(): + try: + _async_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + _async_unary(stub) + child_process.finish() + + +def _async_unary_new_channel(channel, args): + + def child_target(): + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _async_unary(child_stub) + child_channel.close() + + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + _async_unary(stub) + child_process.finish() + + +def _blocking_unary_same_channel(channel): + + def child_target(): + try: + _blocking_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + child_process.finish() + + +def _blocking_unary_new_channel(channel, args): + + def child_target(): + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _blocking_unary(child_stub) + child_channel.close() + + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + child_process = _ChildProcess(child_target) + child_process.start() + _blocking_unary(stub) + child_process.finish() + + +# Verify that the fork channel registry can handle already closed channels +def _close_channel_before_fork(channel, args): + + def child_target(): + new_channel.close() + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + _blocking_unary(child_stub) + child_channel.close() + + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + channel.close() + + new_channel = _channel(args) + new_stub = test_pb2_grpc.TestServiceStub(new_channel) + child_process = _ChildProcess(child_target) + child_process.start() + _blocking_unary(new_stub) + child_process.finish() + + +def _connectivity_watch(channel, args): + + def child_target(): + + def child_connectivity_callback(state): + child_states.append(state) + + child_states = [] + child_channel = _channel(args) + child_stub = test_pb2_grpc.TestServiceStub(child_channel) + child_channel.subscribe(child_connectivity_callback) + _async_unary(child_stub) + if len(child_states + ) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY: + raise ValueError('Channel did not move to READY') + if len(parent_states) > 1: + raise ValueError('Received connectivity updates on parent callback') + child_channel.unsubscribe(child_connectivity_callback) + child_channel.close() + + def parent_connectivity_callback(state): + parent_states.append(state) + + parent_states = [] + channel.subscribe(parent_connectivity_callback) + stub = test_pb2_grpc.TestServiceStub(channel) + child_process = _ChildProcess(child_target) + child_process.start() + _async_unary(stub) + if len(parent_states + ) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY: + raise ValueError('Channel did not move to READY') + channel.unsubscribe(parent_connectivity_callback) + child_process.finish() + + # Need to unsubscribe or _channel.py in _poll_connectivity triggers a + # "Cannot invoke RPC on closed channel!" error. + # TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling + channel.unsubscribe(parent_connectivity_callback) + + +def _ping_pong_with_child_processes_after_first_response( + channel, args, child_target, run_after_close=True): + request_response_sizes = ( + 31415, + 9, + 2653, + 58979, + ) + request_payload_sizes = ( + 27182, + 8, + 1828, + 45904, + ) + stub = test_pb2_grpc.TestServiceStub(channel) + pipe = _Pipe() + parent_bidi_call = stub.FullDuplexCall(pipe) + child_processes = [] + first_message_received = False + for response_size, payload_size in zip(request_response_sizes, + request_payload_sizes): + request = messages_pb2.StreamingOutputCallRequest( + response_type=messages_pb2.COMPRESSABLE, + response_parameters=( + messages_pb2.ResponseParameters(size=response_size),), + payload=messages_pb2.Payload(body=b'\x00' * payload_size)) + pipe.add(request) + if first_message_received: + child_process = _ChildProcess(child_target, + (parent_bidi_call, channel, args)) + child_process.start() + child_processes.append(child_process) + response = next(parent_bidi_call) + first_message_received = True + child_process = _ChildProcess(child_target, + (parent_bidi_call, channel, args)) + child_process.start() + child_processes.append(child_process) + _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, + response_size) + pipe.close() + if run_after_close: + child_process = _ChildProcess(child_target, + (parent_bidi_call, channel, args)) + child_process.start() + child_processes.append(child_process) + for child_process in child_processes: + child_process.finish() + + +def _in_progress_bidi_continue_call(channel): + + def child_target(parent_bidi_call, parent_channel, args): + stub = test_pb2_grpc.TestServiceStub(parent_channel) + try: + _async_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + inherited_code = parent_bidi_call.code() + inherited_details = parent_bidi_call.details() + if inherited_code != grpc.StatusCode.CANCELLED: + raise ValueError( + 'Expected inherited code CANCELLED, got %s' % inherited_code) + if inherited_details != 'Channel closed due to fork': + raise ValueError( + 'Expected inherited details Channel closed due to fork, got %s' + % inherited_details) + + # Don't run child_target after closing the parent call, as the call may have + # received a status from the server before fork occurs. + _ping_pong_with_child_processes_after_first_response( + channel, None, child_target, run_after_close=False) + + +def _in_progress_bidi_same_channel_async_call(channel): + + def child_target(parent_bidi_call, parent_channel, args): + stub = test_pb2_grpc.TestServiceStub(parent_channel) + try: + _async_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + _ping_pong_with_child_processes_after_first_response( + channel, None, child_target) + + +def _in_progress_bidi_same_channel_blocking_call(channel): + + def child_target(parent_bidi_call, parent_channel, args): + stub = test_pb2_grpc.TestServiceStub(parent_channel) + try: + _blocking_unary(stub) + raise Exception( + 'Child should not be able to re-use channel after fork') + except ValueError as expected_value_error: + pass + + _ping_pong_with_child_processes_after_first_response( + channel, None, child_target) + + +def _in_progress_bidi_new_channel_async_call(channel, args): + + def child_target(parent_bidi_call, parent_channel, args): + channel = _channel(args) + stub = test_pb2_grpc.TestServiceStub(channel) + _async_unary(stub) + + _ping_pong_with_child_processes_after_first_response( + channel, args, child_target) + + +def _in_progress_bidi_new_channel_blocking_call(channel, args): + + def child_target(parent_bidi_call, parent_channel, args): + channel = _channel(args) + stub = test_pb2_grpc.TestServiceStub(channel) + _blocking_unary(stub) + + _ping_pong_with_child_processes_after_first_response( + channel, args, child_target) + + +@enum.unique +class TestCase(enum.Enum): + + CONNECTIVITY_WATCH = 'connectivity_watch' + CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork' + ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel' + ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel' + BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel' + BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel' + IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call' + IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call' + IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call' + IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call' + IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call' + + def run_test(self, args): + _LOGGER.info("Running %s", self) + channel = _channel(args) + if self is TestCase.ASYNC_UNARY_SAME_CHANNEL: + _async_unary_same_channel(channel) + elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL: + _async_unary_new_channel(channel, args) + elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL: + _blocking_unary_same_channel(channel) + elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL: + _blocking_unary_new_channel(channel, args) + elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK: + _close_channel_before_fork(channel, args) + elif self is TestCase.CONNECTIVITY_WATCH: + _connectivity_watch(channel, args) + elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL: + _in_progress_bidi_continue_call(channel) + elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL: + _in_progress_bidi_same_channel_async_call(channel) + elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL: + _in_progress_bidi_same_channel_blocking_call(channel) + elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL: + _in_progress_bidi_new_channel_async_call(channel, args) + elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL: + _in_progress_bidi_new_channel_blocking_call(channel, args) + else: + raise NotImplementedError( + 'Test case "%s" not implemented!' % self.name) + channel.close() diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py index 764cda17fb..e66eda64a8 100644 --- a/src/python/grpcio_tests/tests/stress/test_runner.py +++ b/src/python/grpcio_tests/tests/stress/test_runner.py @@ -53,5 +53,5 @@ class TestRunner(threading.Thread): except Exception as e: # pylint: disable=broad-except traceback.print_exc() self._exception_queue.put( - Exception("An exception occured during test {}" + Exception("An exception occurred during test {}" .format(test_case), e)) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index ebc41c63f0..76d5d22d57 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -32,6 +32,8 @@ "unit._credentials_test.CredentialsTest", "unit._cython._cancel_many_calls_test.CancelManyCallsTest", "unit._cython._channel_test.ChannelTest", + "unit._cython._fork_test.ForkPosixTester", + "unit._cython._fork_test.ForkWindowsTester", "unit._cython._no_messages_server_completion_queue_per_call_test.Test", "unit._cython._no_messages_single_server_completion_queue_test.Test", "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", diff --git a/src/python/grpcio_tests/tests/unit/_channel_args_test.py b/src/python/grpcio_tests/tests/unit/_channel_args_test.py index 1a2d2c0117..869c2f4d2f 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_args_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_args_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests of Channel Args on client/server side.""" +from concurrent import futures import unittest import grpc @@ -39,7 +40,9 @@ class ChannelArgsTest(unittest.TestCase): grpc.insecure_channel('localhost:8080', options=TEST_CHANNEL_ARGS) def test_server(self): - grpc.server(None, options=TEST_CHANNEL_ARGS) + grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=TEST_CHANNEL_ARGS) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py b/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py new file mode 100644 index 0000000000..aeb02458a7 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py @@ -0,0 +1,68 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import threading +import unittest + +from grpc._cython import cygrpc + + +def _get_number_active_threads(): + return cygrpc._fork_state.active_thread_count._num_active_threads + + +@unittest.skipIf(os.name == 'nt', 'Posix-specific tests') +class ForkPosixTester(unittest.TestCase): + + def setUp(self): + cygrpc._GRPC_ENABLE_FORK_SUPPORT = True + + def testForkManagedThread(self): + + def cb(): + self.assertEqual(1, _get_number_active_threads()) + + thread = cygrpc.ForkManagedThread(cb) + thread.start() + thread.join() + self.assertEqual(0, _get_number_active_threads()) + + def testForkManagedThreadThrowsException(self): + + def cb(): + self.assertEqual(1, _get_number_active_threads()) + raise Exception("expected exception") + + thread = cygrpc.ForkManagedThread(cb) + thread.start() + thread.join() + self.assertEqual(0, _get_number_active_threads()) + + +@unittest.skipUnless(os.name == 'nt', 'Windows-specific tests') +class ForkWindowsTester(unittest.TestCase): + + def testForkManagedThreadIsNoOp(self): + + def cb(): + pass + + thread = cygrpc.ForkManagedThread(cb) + thread.start() + thread.join() + + +if __name__ == '__main__': + unittest.main(verbosity=2) |