aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
diff options
context:
space:
mode:
authorGravatar Eric Gribkoff <ericgribkoff@google.com>2018-06-25 10:25:28 -0700
committerGravatar Eric Gribkoff <ericgribkoff@google.com>2018-08-22 10:34:54 -0700
commitf8cf7ee56d4150ae870f44298a406f7b2ca038c0 (patch)
tree394c613d5b08d05a9e2f4494bcd59817e538ff10 /src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
parentf10596f0f3d6ab2dfe67b86d259a8d3effb73a99 (diff)
Support gRPC Python client-side fork with epoll1
A process may fork after invoking grpc_init() and use gRPC in the child if and only if the child process first destroys all gRPC resources inherited from the parent process and invokes grpc_shutdown(). Subsequent to this, the child will be able to re-initialize and use gRPC. After fork, the parent process will be able to continue to use existing gRPC resources such as channels and calls without interference from the child process. To facilitate gRPC Python applications meeting the above constraints, gRPC Python will automatically destroy and shutdown all gRPC Core resources in the child's post-fork handler, including cancelling in-flight calls (see detailed design below). From the client's perspective, the child process is now free to create new channels and use gRPC.
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi63
1 files changed, 43 insertions, 20 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index aa187e88a6..a81ff4d823 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -15,6 +15,7 @@
cimport cpython
import threading
+import time
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
'Internal gRPC call error %d. ' +
@@ -83,6 +84,7 @@ cdef class _ChannelState:
self.integrated_call_states = {}
self.segregated_call_states = set()
self.connectivity_due = set()
+ self.closed_reason = None
cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
@@ -142,10 +144,10 @@ cdef _cancel(
_check_and_raise_call_error_no_metadata(c_call_error)
-cdef BatchOperationEvent _next_call_event(
+cdef _next_call_event(
_ChannelState channel_state, grpc_completion_queue *c_completion_queue,
- on_success):
- tag, event = _latent_event(c_completion_queue, None)
+ on_success, deadline):
+ tag, event = _latent_event(c_completion_queue, deadline)
with channel_state.condition:
on_success(tag)
channel_state.condition.notify_all()
@@ -229,8 +231,7 @@ cdef void _call(
call_state.due.update(started_tags)
on_success(started_tags)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
-
+ raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
cdef void _process_integrated_call_tag(
_ChannelState state, _BatchOperationTag tag) except *:
cdef _CallState call_state = state.integrated_call_states.pop(tag)
@@ -302,7 +303,7 @@ cdef class SegregatedCall:
_process_segregated_call_tag(
self._channel_state, self._call_state, self._c_completion_queue, tag)
return _next_call_event(
- self._channel_state, self._c_completion_queue, on_success)
+ self._channel_state, self._c_completion_queue, on_success, None)
cdef SegregatedCall _segregated_call(
@@ -346,7 +347,7 @@ cdef object _watch_connectivity_state(
state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
state.connectivity_due.add(tag)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
+ raise ValueError('Cannot invoke RPC: %s' % state.closed_reason)
completed_tag, event = _latent_event(
state.c_connectivity_completion_queue, None)
with state.condition:
@@ -355,12 +356,15 @@ cdef object _watch_connectivity_state(
return event
-cdef _close(_ChannelState state, grpc_status_code code, object details):
+cdef _close(Channel channel, grpc_status_code code, object details,
+ drain_calls):
+ cdef _ChannelState state = channel._state
cdef _CallState call_state
encoded_details = _encode(details)
with state.condition:
if state.open:
state.open = False
+ state.closed_reason = details
for call_state in set(state.integrated_call_states.values()):
grpc_call_cancel_with_status(
call_state.c_call, code, encoded_details, NULL)
@@ -370,12 +374,19 @@ cdef _close(_ChannelState state, grpc_status_code code, object details):
# TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
# watching.
- while state.integrated_call_states:
- state.condition.wait()
- while state.segregated_call_states:
- state.condition.wait()
- while state.connectivity_due:
- state.condition.wait()
+ if drain_calls:
+ while not _calls_drained(state):
+ event = channel.next_call_event()
+ if event.completion_type == CompletionType.queue_timeout:
+ continue
+ event.tag(event)
+ else:
+ while state.integrated_call_states:
+ state.condition.wait()
+ while state.segregated_call_states:
+ state.condition.wait()
+ while state.connectivity_due:
+ state.condition.wait()
_destroy_c_completion_queue(state.c_call_completion_queue)
_destroy_c_completion_queue(state.c_connectivity_completion_queue)
@@ -390,13 +401,17 @@ cdef _close(_ChannelState state, grpc_status_code code, object details):
state.condition.wait()
+cdef _calls_drained(_ChannelState state):
+ return not (state.integrated_call_states or state.segregated_call_states or
+ state.connectivity_due)
+
cdef class Channel:
def __cinit__(
self, bytes target, object arguments,
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
- grpc_init()
+ fork_handlers_and_grpc_init()
self._state = _ChannelState()
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
@@ -435,9 +450,14 @@ cdef class Channel:
def next_call_event(self):
def on_success(tag):
- _process_integrated_call_tag(self._state, tag)
- return _next_call_event(
- self._state, self._state.c_call_completion_queue, on_success)
+ if tag is not None:
+ _process_integrated_call_tag(self._state, tag)
+ if is_fork_support_enabled():
+ queue_deadline = time.time() + 1.0
+ else:
+ queue_deadline = None
+ return _next_call_event(self._state, self._state.c_call_completion_queue,
+ on_success, queue_deadline)
def segregated_call(
self, int flags, method, host, object deadline, object metadata,
@@ -452,11 +472,14 @@ cdef class Channel:
return grpc_channel_check_connectivity_state(
self._state.c_channel, try_to_connect)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
+ raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state, object deadline):
return _watch_connectivity_state(self._state, last_observed_state, deadline)
def close(self, code, details):
- _close(self._state, code, details)
+ _close(self, code, details, False)
+
+ def close_on_fork(self, code, details):
+ _close(self, code, details, True)