diff options
Diffstat (limited to 'src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi')
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi | 63 |
1 files changed, 43 insertions, 20 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index aa187e88a6..a81ff4d823 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -15,6 +15,7 @@ cimport cpython import threading +import time _INTERNAL_CALL_ERROR_MESSAGE_FORMAT = ( 'Internal gRPC call error %d. ' + @@ -83,6 +84,7 @@ cdef class _ChannelState: self.integrated_call_states = {} self.segregated_call_states = set() self.connectivity_due = set() + self.closed_reason = None cdef tuple _operate(grpc_call *c_call, object operations, object user_tag): @@ -142,10 +144,10 @@ cdef _cancel( _check_and_raise_call_error_no_metadata(c_call_error) -cdef BatchOperationEvent _next_call_event( +cdef _next_call_event( _ChannelState channel_state, grpc_completion_queue *c_completion_queue, - on_success): - tag, event = _latent_event(c_completion_queue, None) + on_success, deadline): + tag, event = _latent_event(c_completion_queue, deadline) with channel_state.condition: on_success(tag) channel_state.condition.notify_all() @@ -229,8 +231,7 @@ cdef void _call( call_state.due.update(started_tags) on_success(started_tags) else: - raise ValueError('Cannot invoke RPC on closed channel!') - + raise ValueError('Cannot invoke RPC: %s' % channel_state.closed_reason) cdef void _process_integrated_call_tag( _ChannelState state, _BatchOperationTag tag) except *: cdef _CallState call_state = state.integrated_call_states.pop(tag) @@ -302,7 +303,7 @@ cdef class SegregatedCall: _process_segregated_call_tag( self._channel_state, self._call_state, self._c_completion_queue, tag) return _next_call_event( - self._channel_state, self._c_completion_queue, on_success) + self._channel_state, self._c_completion_queue, on_success, None) cdef SegregatedCall _segregated_call( @@ -346,7 +347,7 @@ cdef object _watch_connectivity_state( state.c_connectivity_completion_queue, <cpython.PyObject *>tag) state.connectivity_due.add(tag) else: - raise ValueError('Cannot invoke RPC on closed channel!') + raise ValueError('Cannot invoke RPC: %s' % state.closed_reason) completed_tag, event = _latent_event( state.c_connectivity_completion_queue, None) with state.condition: @@ -355,12 +356,15 @@ cdef object _watch_connectivity_state( return event -cdef _close(_ChannelState state, grpc_status_code code, object details): +cdef _close(Channel channel, grpc_status_code code, object details, + drain_calls): + cdef _ChannelState state = channel._state cdef _CallState call_state encoded_details = _encode(details) with state.condition: if state.open: state.open = False + state.closed_reason = details for call_state in set(state.integrated_call_states.values()): grpc_call_cancel_with_status( call_state.c_call, code, encoded_details, NULL) @@ -370,12 +374,19 @@ cdef _close(_ChannelState state, grpc_status_code code, object details): # TODO(https://github.com/grpc/grpc/issues/3064): Cancel connectivity # watching. - while state.integrated_call_states: - state.condition.wait() - while state.segregated_call_states: - state.condition.wait() - while state.connectivity_due: - state.condition.wait() + if drain_calls: + while not _calls_drained(state): + event = channel.next_call_event() + if event.completion_type == CompletionType.queue_timeout: + continue + event.tag(event) + else: + while state.integrated_call_states: + state.condition.wait() + while state.segregated_call_states: + state.condition.wait() + while state.connectivity_due: + state.condition.wait() _destroy_c_completion_queue(state.c_call_completion_queue) _destroy_c_completion_queue(state.c_connectivity_completion_queue) @@ -390,13 +401,17 @@ cdef _close(_ChannelState state, grpc_status_code code, object details): state.condition.wait() +cdef _calls_drained(_ChannelState state): + return not (state.integrated_call_states or state.segregated_call_states or + state.connectivity_due) + cdef class Channel: def __cinit__( self, bytes target, object arguments, ChannelCredentials channel_credentials): arguments = () if arguments is None else tuple(arguments) - grpc_init() + fork_handlers_and_grpc_init() self._state = _ChannelState() self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer @@ -435,9 +450,14 @@ cdef class Channel: def next_call_event(self): def on_success(tag): - _process_integrated_call_tag(self._state, tag) - return _next_call_event( - self._state, self._state.c_call_completion_queue, on_success) + if tag is not None: + _process_integrated_call_tag(self._state, tag) + if is_fork_support_enabled(): + queue_deadline = time.time() + 1.0 + else: + queue_deadline = None + return _next_call_event(self._state, self._state.c_call_completion_queue, + on_success, queue_deadline) def segregated_call( self, int flags, method, host, object deadline, object metadata, @@ -452,11 +472,14 @@ cdef class Channel: return grpc_channel_check_connectivity_state( self._state.c_channel, try_to_connect) else: - raise ValueError('Cannot invoke RPC on closed channel!') + raise ValueError('Cannot invoke RPC: %s' % self._state.closed_reason) def watch_connectivity_state( self, grpc_connectivity_state last_observed_state, object deadline): return _watch_connectivity_state(self._state, last_observed_state, deadline) def close(self, code, details): - _close(self._state, code, details) + _close(self, code, details, False) + + def close_on_fork(self, code, details): + _close(self, code, details, True) |