aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython
diff options
context:
space:
mode:
authorGravatar Eric Gribkoff <ericgribkoff@google.com>2018-06-25 10:25:28 -0700
committerGravatar Eric Gribkoff <ericgribkoff@google.com>2018-08-22 10:34:54 -0700
commitf8cf7ee56d4150ae870f44298a406f7b2ca038c0 (patch)
tree394c613d5b08d05a9e2f4494bcd59817e538ff10 /src/python/grpcio/grpc/_cython
parentf10596f0f3d6ab2dfe67b86d259a8d3effb73a99 (diff)
Support gRPC Python client-side fork with epoll1
A process may fork after invoking grpc_init() and use gRPC in the child if and only if the child process first destroys all gRPC resources inherited from the parent process and invokes grpc_shutdown(). Subsequent to this, the child will be able to re-initialize and use gRPC. After fork, the parent process will be able to continue to use existing gRPC resources such as channels and calls without interference from the child process. To facilitate gRPC Python applications meeting the above constraints, gRPC Python will automatically destroy and shutdown all gRPC Core resources in the child's post-fork handler, including cancelling in-flight calls (see detailed design below). From the client's perspective, the child process is now free to create new channels and use gRPC.
Diffstat (limited to 'src/python/grpcio/grpc/_cython')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi63
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi8
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pxd.pxi29
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi203
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/fork_windows.pyx.pxi63
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd3
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx5
12 files changed, 355 insertions, 28 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
index a0de862d94..24e85b08e7 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi
@@ -19,7 +19,7 @@ cdef class Call:
def __cinit__(self):
# Create an *empty* call
- grpc_init()
+ fork_handlers_and_grpc_init()
self.c_call = NULL
self.references = []
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
index f067d76fab..ced32abba1 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pxd.pxi
@@ -40,6 +40,7 @@ cdef class _ChannelState:
# field and just use the NULLness of c_channel as an indication that the
# channel is closed.
cdef object open
+ cdef object closed_reason
# A dict from _BatchOperationTag to _CallState
cdef dict integrated_call_states
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index aa187e88a6..a81ff4d823 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -15,6 +15,7 @@
cimport cpython
import threading
+import time
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
'Internal gRPC call error %d. ' +
@@ -83,6 +84,7 @@ cdef class _ChannelState:
self.integrated_call_states = {}
self.segregated_call_states = set()
self.connectivity_due = set()
+ self.closed_reason = None
cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
@@ -142,10 +144,10 @@ cdef _cancel(
_check_and_raise_call_error_no_metadata(c_call_error)
-cdef BatchOperationEvent _next_call_event(
+cdef _next_call_event(
_ChannelState channel_state, grpc_completion_queue *c_completion_queue,
- on_success):
- tag, event = _latent_event(c_completion_queue, None)
+ on_success, deadline):
+ tag, event = _latent_event(c_completion_queue, deadline)
with channel_state.condition:
on_success(tag)
channel_state.condition.notify_all()
@@ -229,8 +231,7 @@ cdef void _call(
call_state.due.update(started_tags)
on_success(started_tags)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
-
+ raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
cdef void _process_integrated_call_tag(
_ChannelState state, _BatchOperationTag tag) except *:
cdef _CallState call_state = state.integrated_call_states.pop(tag)
@@ -302,7 +303,7 @@ cdef class SegregatedCall:
_process_segregated_call_tag(
self._channel_state, self._call_state, self._c_completion_queue, tag)
return _next_call_event(
- self._channel_state, self._c_completion_queue, on_success)
+ self._channel_state, self._c_completion_queue, on_success, None)
cdef SegregatedCall _segregated_call(
@@ -346,7 +347,7 @@ cdef object _watch_connectivity_state(
state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
state.connectivity_due.add(tag)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
+ raise ValueError('Cannot invoke RPC: %s' % state.closed_reason)
completed_tag, event = _latent_event(
state.c_connectivity_completion_queue, None)
with state.condition:
@@ -355,12 +356,15 @@ cdef object _watch_connectivity_state(
return event
-cdef _close(_ChannelState state, grpc_status_code code, object details):
+cdef _close(Channel channel, grpc_status_code code, object details,
+ drain_calls):
+ cdef _ChannelState state = channel._state
cdef _CallState call_state
encoded_details = _encode(details)
with state.condition:
if state.open:
state.open = False
+ state.closed_reason = details
for call_state in set(state.integrated_call_states.values()):
grpc_call_cancel_with_status(
call_state.c_call, code, encoded_details, NULL)
@@ -370,12 +374,19 @@ cdef _close(_ChannelState state, grpc_status_code code, object details):
# TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
# watching.
- while state.integrated_call_states:
- state.condition.wait()
- while state.segregated_call_states:
- state.condition.wait()
- while state.connectivity_due:
- state.condition.wait()
+ if drain_calls:
+ while not _calls_drained(state):
+ event = channel.next_call_event()
+ if event.completion_type == CompletionType.queue_timeout:
+ continue
+ event.tag(event)
+ else:
+ while state.integrated_call_states:
+ state.condition.wait()
+ while state.segregated_call_states:
+ state.condition.wait()
+ while state.connectivity_due:
+ state.condition.wait()
_destroy_c_completion_queue(state.c_call_completion_queue)
_destroy_c_completion_queue(state.c_connectivity_completion_queue)
@@ -390,13 +401,17 @@ cdef _close(_ChannelState state, grpc_status_code code, object details):
state.condition.wait()
+cdef _calls_drained(_ChannelState state):
+ return not (state.integrated_call_states or state.segregated_call_states or
+ state.connectivity_due)
+
cdef class Channel:
def __cinit__(
self, bytes target, object arguments,
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
- grpc_init()
+ fork_handlers_and_grpc_init()
self._state = _ChannelState()
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
@@ -435,9 +450,14 @@ cdef class Channel:
def next_call_event(self):
def on_success(tag):
- _process_integrated_call_tag(self._state, tag)
- return _next_call_event(
- self._state, self._state.c_call_completion_queue, on_success)
+ if tag is not None:
+ _process_integrated_call_tag(self._state, tag)
+ if is_fork_support_enabled():
+ queue_deadline = time.time() + 1.0
+ else:
+ queue_deadline = None
+ return _next_call_event(self._state, self._state.c_call_completion_queue,
+ on_success, queue_deadline)
def segregated_call(
self, int flags, method, host, object deadline, object metadata,
@@ -452,11 +472,14 @@ cdef class Channel:
return grpc_channel_check_connectivity_state(
self._state.c_channel, try_to_connect)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
+ raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state, object deadline):
return _watch_connectivity_state(self._state, last_observed_state, deadline)
def close(self, code, details):
- _close(self._state, code, details)
+ _close(self, code, details, False)
+
+ def close_on_fork(self, code, details):
+ _close(self, code, details, True)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index a2d765546a..141116df5d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -71,7 +71,7 @@ cdef class CompletionQueue:
def __cinit__(self, shutdown_cq=False):
cdef grpc_completion_queue_attributes c_attrs
- grpc_init()
+ fork_handlers_and_grpc_init()
if shutdown_cq:
c_attrs.version = 1
c_attrs.cq_completion_type = GRPC_CQ_NEXT
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 0a25218e19..e3c1c8215c 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -21,7 +21,7 @@ from libc.stdint cimport uintptr_t
def _spawn_callback_in_thread(cb_func, args):
- threading.Thread(target=cb_func, args=args).start()
+ ForkManagedThread(target=cb_func, args=args).start()
async_callback_func = _spawn_callback_in_thread
@@ -114,7 +114,7 @@ cdef class ChannelCredentials:
cdef class SSLSessionCacheLRU:
def __cinit__(self, capacity):
- grpc_init()
+ fork_handlers_and_grpc_init()
self._cache = grpc_ssl_session_cache_create_lru(capacity)
def __int__(self):
@@ -172,7 +172,7 @@ cdef class CompositeChannelCredentials(ChannelCredentials):
cdef class ServerCertificateConfig:
def __cinit__(self):
- grpc_init()
+ fork_handlers_and_grpc_init()
self.c_cert_config = NULL
self.c_pem_root_certs = NULL
self.c_ssl_pem_key_cert_pairs = NULL
@@ -187,7 +187,7 @@ cdef class ServerCertificateConfig:
cdef class ServerCredentials:
def __cinit__(self):
- grpc_init()
+ fork_handlers_and_grpc_init()
self.c_credentials = NULL
self.references = []
self.initial_cert_config = None
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
new file mode 100644
index 0000000000..a925bdd2e6
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pxd.pxi
@@ -0,0 +1,29 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef extern from "pthread.h" nogil:
+ int pthread_atfork(
+ void (*prepare)() nogil,
+ void (*parent)() nogil,
+ void (*child)() nogil)
+
+
+cdef void __prefork() nogil
+
+
+cdef void __postfork_parent() nogil
+
+
+cdef void __postfork_child() nogil \ No newline at end of file
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
new file mode 100644
index 0000000000..1176258da8
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi
@@ -0,0 +1,203 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import os
+import threading
+
+_LOGGER = logging.getLogger(__name__)
+
+_AWAIT_THREADS_TIMEOUT_SECONDS = 5
+
+_TRUE_VALUES = ['yes', 'Yes', 'YES', 'true', 'True', 'TRUE', '1']
+
+# This flag enables experimental support within gRPC Python for applications
+# that will fork() without exec(). When enabled, gRPC Python will attempt to
+# pause all of its internally created threads before the fork syscall proceeds.
+#
+# For this to be successful, the application must not have multiple threads of
+# its own calling into gRPC when fork is invoked. Any callbacks from gRPC
+# Python-spawned threads into user code (e.g., callbacks for asynchronous RPCs)
+# must not block and should execute quickly.
+#
+# This flag is not supported on Windows.
+_GRPC_ENABLE_FORK_SUPPORT = (
+ os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
+ .lower() in _TRUE_VALUES)
+
+_GRPC_POLL_STRATEGY = os.environ.get('GRPC_POLL_STRATEGY')
+
+cdef void __prefork() nogil:
+ with gil:
+ with _fork_state.fork_in_progress_condition:
+ _fork_state.fork_in_progress = True
+ if not _fork_state.active_thread_count.await_zero_threads(
+ _AWAIT_THREADS_TIMEOUT_SECONDS):
+ _LOGGER.error(
+ 'Failed to shutdown gRPC Python threads prior to fork. '
+ 'Behavior after fork will be undefined.')
+
+
+cdef void __postfork_parent() nogil:
+ with gil:
+ with _fork_state.fork_in_progress_condition:
+ _fork_state.fork_in_progress = False
+ _fork_state.fork_in_progress_condition.notify_all()
+
+
+cdef void __postfork_child() nogil:
+ with gil:
+ # Thread could be holding the fork_in_progress_condition inside of
+ # block_if_fork_in_progress() when fork occurs. Reset the lock here.
+ _fork_state.fork_in_progress_condition = threading.Condition()
+ # A thread in return_from_user_request_generator() may hold this lock
+ # when fork occurs.
+ _fork_state.active_thread_count = _ActiveThreadCount()
+ for state_to_reset in _fork_state.postfork_states_to_reset:
+ state_to_reset.reset_postfork_child()
+ _fork_state.fork_epoch += 1
+ for channel in _fork_state.channels:
+ channel._close_on_fork()
+ # TODO(ericgribkoff) Check and abort if core is not shutdown
+ with _fork_state.fork_in_progress_condition:
+ _fork_state.fork_in_progress = False
+
+
+def fork_handlers_and_grpc_init():
+ grpc_init()
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ # TODO(ericgribkoff) epoll1 is default for grpcio distribution. Decide whether to expose
+ # grpc_get_poll_strategy_name() from ev_posix.cc to get actual polling choice.
+ if _GRPC_POLL_STRATEGY is not None and _GRPC_POLL_STRATEGY != "epoll1":
+ _LOGGER.error(
+ 'gRPC Python fork support is only compatible with the epoll1 '
+ 'polling engine')
+ return
+ with _fork_state.fork_handler_registered_lock:
+ if not _fork_state.fork_handler_registered:
+ pthread_atfork(&__prefork, &__postfork_parent, &__postfork_child)
+ _fork_state.fork_handler_registered = True
+
+
+class ForkManagedThread(object):
+ def __init__(self, target, args=()):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ def managed_target(*args):
+ try:
+ target(*args)
+ finally:
+ _fork_state.active_thread_count.decrement()
+ self._thread = threading.Thread(target=managed_target, args=args)
+ else:
+ self._thread = threading.Thread(target=target, args=args)
+
+ def setDaemon(self, daemonic):
+ self._thread.daemon = daemonic
+
+ def start(self):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.active_thread_count.increment()
+ self._thread.start()
+
+ def join(self):
+ self._thread.join()
+
+
+def block_if_fork_in_progress(postfork_state_to_reset=None):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ with _fork_state.fork_in_progress_condition:
+ if not _fork_state.fork_in_progress:
+ return
+ if postfork_state_to_reset is not None:
+ _fork_state.postfork_states_to_reset.append(postfork_state_to_reset)
+ _fork_state.active_thread_count.decrement()
+ _fork_state.fork_in_progress_condition.wait()
+ _fork_state.active_thread_count.increment()
+
+
+def enter_user_request_generator():
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.active_thread_count.decrement()
+
+
+def return_from_user_request_generator():
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.active_thread_count.increment()
+ block_if_fork_in_progress()
+
+
+def get_fork_epoch():
+ return _fork_state.fork_epoch
+
+
+def is_fork_support_enabled():
+ return _GRPC_ENABLE_FORK_SUPPORT
+
+
+def fork_register_channel(channel):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.channels.add(channel)
+
+
+def fork_unregister_channel(channel):
+ if _GRPC_ENABLE_FORK_SUPPORT:
+ _fork_state.channels.remove(channel)
+
+
+class _ActiveThreadCount(object):
+ def __init__(self):
+ self._num_active_threads = 0
+ self._condition = threading.Condition()
+
+ def increment(self):
+ with self._condition:
+ self._num_active_threads += 1
+
+ def decrement(self):
+ with self._condition:
+ self._num_active_threads -= 1
+ if self._num_active_threads == 0:
+ self._condition.notify_all()
+
+ def await_zero_threads(self, timeout_secs):
+ end_time = time.time() + timeout_secs
+ wait_time = timeout_secs
+ with self._condition:
+ while True:
+ if self._num_active_threads > 0:
+ self._condition.wait(wait_time)
+ if self._num_active_threads == 0:
+ return True
+ # Thread count may have increased before this re-obtains the
+ # lock after a notify(). Wait again until timeout_secs has
+ # elapsed.
+ wait_time = end_time - time.time()
+ if wait_time <= 0:
+ return False
+
+
+class _ForkState(object):
+ def __init__(self):
+ self.fork_in_progress_condition = threading.Condition()
+ self.fork_in_progress = False
+ self.postfork_states_to_reset = []
+ self.fork_handler_registered_lock = threading.Lock()
+ self.fork_handler_registered = False
+ self.active_thread_count = _ActiveThreadCount()
+ self.fork_epoch = 0
+ self.channels = set()
+
+
+_fork_state = _ForkState()
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/fork_windows.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/fork_windows.pyx.pxi
new file mode 100644
index 0000000000..8dc1ef3b1a
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/fork_windows.pyx.pxi
@@ -0,0 +1,63 @@
+# Copyright 2018 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import threading
+
+# No-op implementations for Windows.
+
+def fork_handlers_and_grpc_init():
+ grpc_init()
+
+
+class ForkManagedThread(object):
+ def __init__(self, target, args=()):
+ self._thread = threading.Thread(target=target, args=args)
+
+ def setDaemon(self, daemonic):
+ self._thread.daemon = daemonic
+
+ def start(self):
+ self._thread.start()
+
+ def join(self):
+ self._thread.join()
+
+
+def block_if_fork_in_progress(postfork_state_to_reset=None):
+ pass
+
+
+def enter_user_request_generator():
+ pass
+
+
+def return_from_user_request_generator():
+ pass
+
+
+def get_fork_epoch():
+ return 0
+
+
+def is_fork_support_enabled():
+ return False
+
+
+def fork_register_channel(channel):
+ pass
+
+
+def fork_unregister_channel(channel):
+ pass
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 37b98ebbdb..fe98d559f3 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -127,7 +127,7 @@ class CompressionLevel:
cdef class CallDetails:
def __cinit__(self):
- grpc_init()
+ fork_handlers_and_grpc_init()
with nogil:
grpc_call_details_init(&self.c_details)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index da3dd21244..db59d468dc 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -60,7 +60,7 @@ cdef grpc_ssl_certificate_config_reload_status _server_cert_config_fetcher_wrapp
cdef class Server:
def __cinit__(self, object arguments):
- grpc_init()
+ fork_handlers_and_grpc_init()
self.references = []
self.registered_completion_queues = []
self._vtable.copy = &_copy_pointer
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index 0cc26bc0d0..8258b857bc 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -31,3 +31,6 @@ include "_cygrpc/time.pxd.pxi"
include "_cygrpc/_hooks.pxd.pxi"
include "_cygrpc/grpc_gevent.pxd.pxi"
+
+IF UNAME_SYSNAME != "Windows":
+ include "_cygrpc/fork_posix.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index 3cac406687..026f7ba2e3 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -39,6 +39,11 @@ include "_cygrpc/_hooks.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"
+IF UNAME_SYSNAME == "Windows":
+ include "_cygrpc/fork_windows.pyx.pxi"
+ELSE:
+ include "_cygrpc/fork_posix.pyx.pxi"
+
#
# initialize gRPC
#