aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi63
1 files changed, 43 insertions, 20 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
index aa187e88a6..a81ff4d823 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi
@@ -15,6 +15,7 @@
cimport cpython
import threading
+import time
_INTERNAL_CALL_ERROR_MESSAGE_FORMAT = (
'Internal gRPC call error %d. ' +
@@ -83,6 +84,7 @@ cdef class _ChannelState:
self.integrated_call_states = {}
self.segregated_call_states = set()
self.connectivity_due = set()
+ self.closed_reason = None
cdef tuple _operate(grpc_call *c_call, object operations, object user_tag):
@@ -142,10 +144,10 @@ cdef _cancel(
_check_and_raise_call_error_no_metadata(c_call_error)
-cdef BatchOperationEvent _next_call_event(
+cdef _next_call_event(
_ChannelState channel_state, grpc_completion_queue *c_completion_queue,
- on_success):
- tag, event = _latent_event(c_completion_queue, None)
+ on_success, deadline):
+ tag, event = _latent_event(c_completion_queue, deadline)
with channel_state.condition:
on_success(tag)
channel_state.condition.notify_all()
@@ -229,8 +231,7 @@ cdef void _call(
call_state.due.update(started_tags)
on_success(started_tags)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
-
+ raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason)
cdef void _process_integrated_call_tag(
_ChannelState state, _BatchOperationTag tag) except *:
cdef _CallState call_state = state.integrated_call_states.pop(tag)
@@ -302,7 +303,7 @@ cdef class SegregatedCall:
_process_segregated_call_tag(
self._channel_state, self._call_state, self._c_completion_queue, tag)
return _next_call_event(
- self._channel_state, self._c_completion_queue, on_success)
+ self._channel_state, self._c_completion_queue, on_success, None)
cdef SegregatedCall _segregated_call(
@@ -346,7 +347,7 @@ cdef object _watch_connectivity_state(
state.c_connectivity_completion_queue, <cpython.PyObject *>tag)
state.connectivity_due.add(tag)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
+ raise ValueError('Cannot invoke RPC: %s' % state.closed_reason)
completed_tag, event = _latent_event(
state.c_connectivity_completion_queue, None)
with state.condition:
@@ -355,12 +356,15 @@ cdef object _watch_connectivity_state(
return event
-cdef _close(_ChannelState state, grpc_status_code code, object details):
+cdef _close(Channel channel, grpc_status_code code, object details,
+ drain_calls):
+ cdef _ChannelState state = channel._state
cdef _CallState call_state
encoded_details = _encode(details)
with state.condition:
if state.open:
state.open = False
+ state.closed_reason = details
for call_state in set(state.integrated_call_states.values()):
grpc_call_cancel_with_status(
call_state.c_call, code, encoded_details, NULL)
@@ -370,12 +374,19 @@ cdef _close(_ChannelState state, grpc_status_code code, object details):
# TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity
# watching.
- while state.integrated_call_states:
- state.condition.wait()
- while state.segregated_call_states:
- state.condition.wait()
- while state.connectivity_due:
- state.condition.wait()
+ if drain_calls:
+ while not _calls_drained(state):
+ event = channel.next_call_event()
+ if event.completion_type == CompletionType.queue_timeout:
+ continue
+ event.tag(event)
+ else:
+ while state.integrated_call_states:
+ state.condition.wait()
+ while state.segregated_call_states:
+ state.condition.wait()
+ while state.connectivity_due:
+ state.condition.wait()
_destroy_c_completion_queue(state.c_call_completion_queue)
_destroy_c_completion_queue(state.c_connectivity_completion_queue)
@@ -390,13 +401,17 @@ cdef _close(_ChannelState state, grpc_status_code code, object details):
state.condition.wait()
+cdef _calls_drained(_ChannelState state):
+ return not (state.integrated_call_states or state.segregated_call_states or
+ state.connectivity_due)
+
cdef class Channel:
def __cinit__(
self, bytes target, object arguments,
ChannelCredentials channel_credentials):
arguments = () if arguments is None else tuple(arguments)
- grpc_init()
+ fork_handlers_and_grpc_init()
self._state = _ChannelState()
self._vtable.copy = &_copy_pointer
self._vtable.destroy = &_destroy_pointer
@@ -435,9 +450,14 @@ cdef class Channel:
def next_call_event(self):
def on_success(tag):
- _process_integrated_call_tag(self._state, tag)
- return _next_call_event(
- self._state, self._state.c_call_completion_queue, on_success)
+ if tag is not None:
+ _process_integrated_call_tag(self._state, tag)
+ if is_fork_support_enabled():
+ queue_deadline = time.time() + 1.0
+ else:
+ queue_deadline = None
+ return _next_call_event(self._state, self._state.c_call_completion_queue,
+ on_success, queue_deadline)
def segregated_call(
self, int flags, method, host, object deadline, object metadata,
@@ -452,11 +472,14 @@ cdef class Channel:
return grpc_channel_check_connectivity_state(
self._state.c_channel, try_to_connect)
else:
- raise ValueError('Cannot invoke RPC on closed channel!')
+ raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason)
def watch_connectivity_state(
self, grpc_connectivity_state last_observed_state, object deadline):
return _watch_connectivity_state(self._state, last_observed_state, deadline)
def close(self, code, details):
- _close(self._state, code, details)
+ _close(self, code, details, False)
+
+ def close_on_fork(self, code, details):
+ _close(self, code, details, True)