aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests')
-rw-r--r--src/python/grpcio_tests/commands.py25
-rw-r--r--src/python/grpcio_tests/grpc_version.py2
-rw-r--r--src/python/grpcio_tests/setup.py1
-rw-r--r--src/python/grpcio_tests/tests/fork/__init__.py13
-rw-r--r--src/python/grpcio_tests/tests/fork/client.py76
-rw-r--r--src/python/grpcio_tests/tests/fork/methods.py445
-rw-r--r--src/python/grpcio_tests/tests/stress/test_runner.py2
-rw-r--r--src/python/grpcio_tests/tests/tests.json2
-rw-r--r--src/python/grpcio_tests/tests/unit/_channel_args_test.py5
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_fork_test.py68
10 files changed, 636 insertions, 3 deletions
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index a23c980017..0dfbf3180b 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -202,3 +202,28 @@ class RunInterop(test.test):
from tests.interop import client
sys.argv[1:] = self.args.split()
client.test_interoperability()
+
+
+class RunFork(test.test):
+
+ description = 'run fork test client'
+ user_options = [('args=', 'a', 'pass-thru arguments for the client')]
+
+ def initialize_options(self):
+ self.args = ''
+
+ def finalize_options(self):
+ # distutils requires this override.
+ pass
+
+ def run(self):
+ if self.distribution.install_requires:
+ self.distribution.fetch_build_eggs(
+ self.distribution.install_requires)
+ if self.distribution.tests_require:
+ self.distribution.fetch_build_eggs(self.distribution.tests_require)
+ # We import here to ensure that our setuptools parent has had a chance to
+ # edit the Python system path.
+ from tests.fork import client
+ sys.argv[1:] = self.args.split()
+ client.test_fork()
diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py
index b2cf129e4f..f4b8a34a46 100644
--- a/src/python/grpcio_tests/grpc_version.py
+++ b/src/python/grpcio_tests/grpc_version.py
@@ -14,4 +14,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!!
-VERSION = '1.15.0.dev0'
+VERSION = '1.16.0.dev0'
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index a94c0963ec..61c98fa038 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -52,6 +52,7 @@ COMMAND_CLASS = {
'preprocess': commands.GatherProto,
'build_package_protos': grpc_tools.command.BuildPackageProtos,
'build_py': commands.BuildPy,
+ 'run_fork': commands.RunFork,
'run_interop': commands.RunInterop,
'test_lite': commands.TestLite,
'test_gevent': commands.TestGevent,
diff --git a/src/python/grpcio_tests/tests/fork/__init__.py b/src/python/grpcio_tests/tests/fork/__init__.py
new file mode 100644
index 0000000000..9a26bac010
--- /dev/null
+++ b/src/python/grpcio_tests/tests/fork/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/python/grpcio_tests/tests/fork/client.py b/src/python/grpcio_tests/tests/fork/client.py
new file mode 100644
index 0000000000..9a32629ed5
--- /dev/null
+++ b/src/python/grpcio_tests/tests/fork/client.py
@@ -0,0 +1,76 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""The Python implementation of the GRPC interoperability test client."""
+
+import argparse
+import logging
+import sys
+
+from tests.fork import methods
+
+
+def _args():
+
+ def parse_bool(value):
+ if value == 'true':
+ return True
+ if value == 'false':
+ return False
+ raise argparse.ArgumentTypeError('Only true/false allowed')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--server_host',
+ default="localhost",
+ type=str,
+ help='the host to which to connect')
+ parser.add_argument(
+ '--server_port',
+ type=int,
+ required=True,
+ help='the port to which to connect')
+ parser.add_argument(
+ '--test_case',
+ default='large_unary',
+ type=str,
+ help='the test case to execute')
+ parser.add_argument(
+ '--use_tls',
+ default=False,
+ type=parse_bool,
+ help='require a secure connection')
+ return parser.parse_args()
+
+
+def _test_case_from_arg(test_case_arg):
+ for test_case in methods.TestCase:
+ if test_case_arg == test_case.value:
+ return test_case
+ else:
+ raise ValueError('No test case "%s"!' % test_case_arg)
+
+
+def test_fork():
+ logging.basicConfig(level=logging.INFO)
+ args = _args()
+ if args.test_case == "all":
+ for test_case in methods.TestCase:
+ test_case.run_test(args)
+ else:
+ test_case = _test_case_from_arg(args.test_case)
+ test_case.run_test(args)
+
+
+if __name__ == '__main__':
+ test_fork()
diff --git a/src/python/grpcio_tests/tests/fork/methods.py b/src/python/grpcio_tests/tests/fork/methods.py
new file mode 100644
index 0000000000..889ef13cb2
--- /dev/null
+++ b/src/python/grpcio_tests/tests/fork/methods.py
@@ -0,0 +1,445 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Implementations of fork support test methods."""
+
+import enum
+import json
+import logging
+import multiprocessing
+import os
+import threading
+import time
+
+import grpc
+
+from six.moves import queue
+
+from src.proto.grpc.testing import empty_pb2
+from src.proto.grpc.testing import messages_pb2
+from src.proto.grpc.testing import test_pb2_grpc
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def _channel(args):
+ target = '{}:{}'.format(args.server_host, args.server_port)
+ if args.use_tls:
+ channel_credentials = grpc.ssl_channel_credentials()
+ channel = grpc.secure_channel(target, channel_credentials)
+ else:
+ channel = grpc.insecure_channel(target)
+ return channel
+
+
+def _validate_payload_type_and_length(response, expected_type, expected_length):
+ if response.payload.type is not expected_type:
+ raise ValueError('expected payload type %s, got %s' %
+ (expected_type, type(response.payload.type)))
+ elif len(response.payload.body) != expected_length:
+ raise ValueError('expected payload body size %d, got %d' %
+ (expected_length, len(response.payload.body)))
+
+
+def _async_unary(stub):
+ size = 314159
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_size=size,
+ payload=messages_pb2.Payload(body=b'\x00' * 271828))
+ response_future = stub.UnaryCall.future(request)
+ response = response_future.result()
+ _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
+
+
+def _blocking_unary(stub):
+ size = 314159
+ request = messages_pb2.SimpleRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_size=size,
+ payload=messages_pb2.Payload(body=b'\x00' * 271828))
+ response = stub.UnaryCall(request)
+ _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
+
+
+class _Pipe(object):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self.next()
+
+ def next(self):
+ with self._condition:
+ while not self._values and self._open:
+ self._condition.wait()
+ if self._values:
+ return self._values.pop(0)
+ else:
+ raise StopIteration()
+
+ def add(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
+
+class _ChildProcess(object):
+
+ def __init__(self, task, args=None):
+ if args is None:
+ args = ()
+ self._exceptions = multiprocessing.Queue()
+
+ def record_exceptions():
+ try:
+ task(*args)
+ except Exception as e: # pylint: disable=broad-except
+ self._exceptions.put(e)
+
+ self._process = multiprocessing.Process(target=record_exceptions)
+
+ def start(self):
+ self._process.start()
+
+ def finish(self):
+ self._process.join()
+ if self._process.exitcode != 0:
+ raise ValueError('Child process failed with exitcode %d' %
+ self._process.exitcode)
+ try:
+ exception = self._exceptions.get(block=False)
+ raise ValueError('Child process failed: %s' % exception)
+ except queue.Empty:
+ pass
+
+
+def _async_unary_same_channel(channel):
+
+ def child_target():
+ try:
+ _async_unary(stub)
+ raise Exception(
+ 'Child should not be able to re-use channel after fork')
+ except ValueError as expected_value_error:
+ pass
+
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _async_unary(stub)
+ child_process = _ChildProcess(child_target)
+ child_process.start()
+ _async_unary(stub)
+ child_process.finish()
+
+
+def _async_unary_new_channel(channel, args):
+
+ def child_target():
+ child_channel = _channel(args)
+ child_stub = test_pb2_grpc.TestServiceStub(child_channel)
+ _async_unary(child_stub)
+ child_channel.close()
+
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _async_unary(stub)
+ child_process = _ChildProcess(child_target)
+ child_process.start()
+ _async_unary(stub)
+ child_process.finish()
+
+
+def _blocking_unary_same_channel(channel):
+
+ def child_target():
+ try:
+ _blocking_unary(stub)
+ raise Exception(
+ 'Child should not be able to re-use channel after fork')
+ except ValueError as expected_value_error:
+ pass
+
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _blocking_unary(stub)
+ child_process = _ChildProcess(child_target)
+ child_process.start()
+ child_process.finish()
+
+
+def _blocking_unary_new_channel(channel, args):
+
+ def child_target():
+ child_channel = _channel(args)
+ child_stub = test_pb2_grpc.TestServiceStub(child_channel)
+ _blocking_unary(child_stub)
+ child_channel.close()
+
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _blocking_unary(stub)
+ child_process = _ChildProcess(child_target)
+ child_process.start()
+ _blocking_unary(stub)
+ child_process.finish()
+
+
+# Verify that the fork channel registry can handle already closed channels
+def _close_channel_before_fork(channel, args):
+
+ def child_target():
+ new_channel.close()
+ child_channel = _channel(args)
+ child_stub = test_pb2_grpc.TestServiceStub(child_channel)
+ _blocking_unary(child_stub)
+ child_channel.close()
+
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _blocking_unary(stub)
+ channel.close()
+
+ new_channel = _channel(args)
+ new_stub = test_pb2_grpc.TestServiceStub(new_channel)
+ child_process = _ChildProcess(child_target)
+ child_process.start()
+ _blocking_unary(new_stub)
+ child_process.finish()
+
+
+def _connectivity_watch(channel, args):
+
+ def child_target():
+
+ def child_connectivity_callback(state):
+ child_states.append(state)
+
+ child_states = []
+ child_channel = _channel(args)
+ child_stub = test_pb2_grpc.TestServiceStub(child_channel)
+ child_channel.subscribe(child_connectivity_callback)
+ _async_unary(child_stub)
+ if len(child_states
+ ) < 2 or child_states[-1] != grpc.ChannelConnectivity.READY:
+ raise ValueError('Channel did not move to READY')
+ if len(parent_states) > 1:
+ raise ValueError('Received connectivity updates on parent callback')
+ child_channel.unsubscribe(child_connectivity_callback)
+ child_channel.close()
+
+ def parent_connectivity_callback(state):
+ parent_states.append(state)
+
+ parent_states = []
+ channel.subscribe(parent_connectivity_callback)
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ child_process = _ChildProcess(child_target)
+ child_process.start()
+ _async_unary(stub)
+ if len(parent_states
+ ) < 2 or parent_states[-1] != grpc.ChannelConnectivity.READY:
+ raise ValueError('Channel did not move to READY')
+ channel.unsubscribe(parent_connectivity_callback)
+ child_process.finish()
+
+ # Need to unsubscribe or _channel.py in _poll_connectivity triggers a
+ # "Cannot invoke RPC on closed channel!" error.
+ # TODO(ericgribkoff) Fix issue with channel.close() and connectivity polling
+ channel.unsubscribe(parent_connectivity_callback)
+
+
+def _ping_pong_with_child_processes_after_first_response(
+ channel, args, child_target, run_after_close=True):
+ request_response_sizes = (
+ 31415,
+ 9,
+ 2653,
+ 58979,
+ )
+ request_payload_sizes = (
+ 27182,
+ 8,
+ 1828,
+ 45904,
+ )
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ pipe = _Pipe()
+ parent_bidi_call = stub.FullDuplexCall(pipe)
+ child_processes = []
+ first_message_received = False
+ for response_size, payload_size in zip(request_response_sizes,
+ request_payload_sizes):
+ request = messages_pb2.StreamingOutputCallRequest(
+ response_type=messages_pb2.COMPRESSABLE,
+ response_parameters=(
+ messages_pb2.ResponseParameters(size=response_size),),
+ payload=messages_pb2.Payload(body=b'\x00' * payload_size))
+ pipe.add(request)
+ if first_message_received:
+ child_process = _ChildProcess(child_target,
+ (parent_bidi_call, channel, args))
+ child_process.start()
+ child_processes.append(child_process)
+ response = next(parent_bidi_call)
+ first_message_received = True
+ child_process = _ChildProcess(child_target,
+ (parent_bidi_call, channel, args))
+ child_process.start()
+ child_processes.append(child_process)
+ _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
+ response_size)
+ pipe.close()
+ if run_after_close:
+ child_process = _ChildProcess(child_target,
+ (parent_bidi_call, channel, args))
+ child_process.start()
+ child_processes.append(child_process)
+ for child_process in child_processes:
+ child_process.finish()
+
+
+def _in_progress_bidi_continue_call(channel):
+
+ def child_target(parent_bidi_call, parent_channel, args):
+ stub = test_pb2_grpc.TestServiceStub(parent_channel)
+ try:
+ _async_unary(stub)
+ raise Exception(
+ 'Child should not be able to re-use channel after fork')
+ except ValueError as expected_value_error:
+ pass
+ inherited_code = parent_bidi_call.code()
+ inherited_details = parent_bidi_call.details()
+ if inherited_code != grpc.StatusCode.CANCELLED:
+ raise ValueError(
+ 'Expected inherited code CANCELLED, got %s' % inherited_code)
+ if inherited_details != 'Channel closed due to fork':
+ raise ValueError(
+ 'Expected inherited details Channel closed due to fork, got %s'
+ % inherited_details)
+
+ # Don't run child_target after closing the parent call, as the call may have
+ # received a status from the server before fork occurs.
+ _ping_pong_with_child_processes_after_first_response(
+ channel, None, child_target, run_after_close=False)
+
+
+def _in_progress_bidi_same_channel_async_call(channel):
+
+ def child_target(parent_bidi_call, parent_channel, args):
+ stub = test_pb2_grpc.TestServiceStub(parent_channel)
+ try:
+ _async_unary(stub)
+ raise Exception(
+ 'Child should not be able to re-use channel after fork')
+ except ValueError as expected_value_error:
+ pass
+
+ _ping_pong_with_child_processes_after_first_response(
+ channel, None, child_target)
+
+
+def _in_progress_bidi_same_channel_blocking_call(channel):
+
+ def child_target(parent_bidi_call, parent_channel, args):
+ stub = test_pb2_grpc.TestServiceStub(parent_channel)
+ try:
+ _blocking_unary(stub)
+ raise Exception(
+ 'Child should not be able to re-use channel after fork')
+ except ValueError as expected_value_error:
+ pass
+
+ _ping_pong_with_child_processes_after_first_response(
+ channel, None, child_target)
+
+
+def _in_progress_bidi_new_channel_async_call(channel, args):
+
+ def child_target(parent_bidi_call, parent_channel, args):
+ channel = _channel(args)
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _async_unary(stub)
+
+ _ping_pong_with_child_processes_after_first_response(
+ channel, args, child_target)
+
+
+def _in_progress_bidi_new_channel_blocking_call(channel, args):
+
+ def child_target(parent_bidi_call, parent_channel, args):
+ channel = _channel(args)
+ stub = test_pb2_grpc.TestServiceStub(channel)
+ _blocking_unary(stub)
+
+ _ping_pong_with_child_processes_after_first_response(
+ channel, args, child_target)
+
+
+@enum.unique
+class TestCase(enum.Enum):
+
+ CONNECTIVITY_WATCH = 'connectivity_watch'
+ CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
+ ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
+ ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
+ BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
+ BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
+ IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
+ IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
+ IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
+ IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
+ IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
+
+ def run_test(self, args):
+ _LOGGER.info("Running %s", self)
+ channel = _channel(args)
+ if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
+ _async_unary_same_channel(channel)
+ elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
+ _async_unary_new_channel(channel, args)
+ elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
+ _blocking_unary_same_channel(channel)
+ elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
+ _blocking_unary_new_channel(channel, args)
+ elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
+ _close_channel_before_fork(channel, args)
+ elif self is TestCase.CONNECTIVITY_WATCH:
+ _connectivity_watch(channel, args)
+ elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
+ _in_progress_bidi_continue_call(channel)
+ elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
+ _in_progress_bidi_same_channel_async_call(channel)
+ elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
+ _in_progress_bidi_same_channel_blocking_call(channel)
+ elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
+ _in_progress_bidi_new_channel_async_call(channel, args)
+ elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
+ _in_progress_bidi_new_channel_blocking_call(channel, args)
+ else:
+ raise NotImplementedError(
+ 'Test case "%s" not implemented!' % self.name)
+ channel.close()
diff --git a/src/python/grpcio_tests/tests/stress/test_runner.py b/src/python/grpcio_tests/tests/stress/test_runner.py
index 764cda17fb..e66eda64a8 100644
--- a/src/python/grpcio_tests/tests/stress/test_runner.py
+++ b/src/python/grpcio_tests/tests/stress/test_runner.py
@@ -53,5 +53,5 @@ class TestRunner(threading.Thread):
except Exception as e: # pylint: disable=broad-except
traceback.print_exc()
self._exception_queue.put(
- Exception("An exception occured during test {}"
+ Exception("An exception occurred during test {}"
.format(test_case), e))
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index ebc41c63f0..76d5d22d57 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -32,6 +32,8 @@
"unit._credentials_test.CredentialsTest",
"unit._cython._cancel_many_calls_test.CancelManyCallsTest",
"unit._cython._channel_test.ChannelTest",
+ "unit._cython._fork_test.ForkPosixTester",
+ "unit._cython._fork_test.ForkWindowsTester",
"unit._cython._no_messages_server_completion_queue_per_call_test.Test",
"unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
diff --git a/src/python/grpcio_tests/tests/unit/_channel_args_test.py b/src/python/grpcio_tests/tests/unit/_channel_args_test.py
index 1a2d2c0117..869c2f4d2f 100644
--- a/src/python/grpcio_tests/tests/unit/_channel_args_test.py
+++ b/src/python/grpcio_tests/tests/unit/_channel_args_test.py
@@ -13,6 +13,7 @@
# limitations under the License.
"""Tests of Channel Args on client/server side."""
+from concurrent import futures
import unittest
import grpc
@@ -39,7 +40,9 @@ class ChannelArgsTest(unittest.TestCase):
grpc.insecure_channel('localhost:8080', options=TEST_CHANNEL_ARGS)
def test_server(self):
- grpc.server(None, options=TEST_CHANNEL_ARGS)
+ grpc.server(
+ futures.ThreadPoolExecutor(max_workers=1),
+ options=TEST_CHANNEL_ARGS)
if __name__ == '__main__':
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py b/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py
new file mode 100644
index 0000000000..aeb02458a7
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py
@@ -0,0 +1,68 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+
+def _get_number_active_threads():
+ return cygrpc._fork_state.active_thread_count._num_active_threads
+
+
+@unittest.skipIf(os.name == 'nt', 'Posix-specific tests')
+class ForkPosixTester(unittest.TestCase):
+
+ def setUp(self):
+ cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
+
+ def testForkManagedThread(self):
+
+ def cb():
+ self.assertEqual(1, _get_number_active_threads())
+
+ thread = cygrpc.ForkManagedThread(cb)
+ thread.start()
+ thread.join()
+ self.assertEqual(0, _get_number_active_threads())
+
+ def testForkManagedThreadThrowsException(self):
+
+ def cb():
+ self.assertEqual(1, _get_number_active_threads())
+ raise Exception("expected exception")
+
+ thread = cygrpc.ForkManagedThread(cb)
+ thread.start()
+ thread.join()
+ self.assertEqual(0, _get_number_active_threads())
+
+
+@unittest.skipUnless(os.name == 'nt', 'Windows-specific tests')
+class ForkWindowsTester(unittest.TestCase):
+
+ def testForkManagedThreadIsNoOp(self):
+
+ def cb():
+ pass
+
+ thread = cygrpc.ForkManagedThread(cb)
+ thread.start()
+ thread.join()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)