diff options
Diffstat (limited to 'src/python/grpcio/grpc/_channel.py')
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 129 |
1 files changed, 101 insertions, 28 deletions
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 6876601785..ab154d8512 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -466,10 +466,11 @@ def _end_unary_response_blocking(state, call, with_call, deadline): raise _Rendezvous(state, None, None, deadline) -def _stream_unary_invocation_operationses(metadata): +def _stream_unary_invocation_operationses(metadata, initial_metadata_flags): return ( ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ), @@ -477,15 +478,19 @@ def _stream_unary_invocation_operationses(metadata): ) -def _stream_unary_invocation_operationses_and_tags(metadata): +def _stream_unary_invocation_operationses_and_tags(metadata, + initial_metadata_flags): return tuple(( operations, None, - ) for operations in _stream_unary_invocation_operationses(metadata)) + ) + for operations in _stream_unary_invocation_operationses( + metadata, initial_metadata_flags)) class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -494,15 +499,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def _prepare(self, request, timeout, metadata): + def _prepare(self, request, timeout, metadata, wait_for_ready): deadline, serialized_request, rendezvous = _start_unary_request( request, timeout, self._request_serializer) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) if serialized_request is None: return None, None, None, rendezvous else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), @@ -511,9 +519,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ) return state, operations, deadline, None - def _blocking(self, request, timeout, metadata, credentials): + def _blocking(self, request, timeout, metadata, credentials, + wait_for_ready): state, operations, deadline, rendezvous = self._prepare( - request, timeout, metadata) + request, timeout, metadata, wait_for_ready) if state is None: raise rendezvous else: @@ -527,17 +536,34 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): _handle_event(event, state, self._response_deserializer) return state, call, - def __call__(self, request, timeout=None, metadata=None, credentials=None): - state, call, = self._blocking(request, timeout, metadata, credentials) + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + state, call, = self._blocking(request, timeout, metadata, credentials, + wait_for_ready) return _end_unary_response_blocking(state, call, False, None) - def with_call(self, request, timeout=None, metadata=None, credentials=None): - state, call, = self._blocking(request, timeout, metadata, credentials) + def with_call(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + state, call, = self._blocking(request, timeout, metadata, credentials, + wait_for_ready) return _end_unary_response_blocking(state, call, True, None) - def future(self, request, timeout=None, metadata=None, credentials=None): + def future(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): state, operations, deadline, rendezvous = self._prepare( - request, timeout, metadata) + request, timeout, metadata, wait_for_ready) if state is None: raise rendezvous else: @@ -552,6 +578,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -560,16 +587,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): deadline, serialized_request, rendezvous = _start_unary_request( request, timeout, self._request_serializer) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) if serialized_request is None: raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) operationses = ( ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), @@ -588,6 +623,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -596,13 +632,17 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def _blocking(self, request_iterator, timeout, metadata, credentials): + def _blocking(self, request_iterator, timeout, metadata, credentials, + wait_for_ready): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) call = self._channel.segregated_call( 0, self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - _stream_unary_invocation_operationses_and_tags(metadata)) + _stream_unary_invocation_operationses_and_tags( + metadata, initial_metadata_flags)) _consume_request_iterator(request_iterator, state, call, self._request_serializer, None) while True: @@ -618,32 +658,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): state, call, = self._blocking(request_iterator, timeout, metadata, - credentials) + credentials, wait_for_ready) return _end_unary_response_blocking(state, call, False, None) def with_call(self, request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): state, call, = self._blocking(request_iterator, timeout, metadata, - credentials) + credentials, wait_for_ready) return _end_unary_response_blocking(state, call, True, None) def future(self, request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) event_handler = _event_handler(state, self._response_deserializer) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) call = self._managed_call( 0, self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - _stream_unary_invocation_operationses(metadata), event_handler) + _stream_unary_invocation_operationses( + metadata, initial_metadata_flags), event_handler) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -651,6 +697,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -663,12 +710,16 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) operationses = ( ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ), (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), @@ -683,6 +734,24 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): return _Rendezvous(state, call, self._response_deserializer, deadline) +class _InitialMetadataFlags(int): + """Stores immutable initial metadata flags""" + + def __new__(cls, value=_EMPTY_FLAGS): + value &= cygrpc.InitialMetadataFlags.used_mask + return super(_InitialMetadataFlags, cls).__new__(cls, value) + + def with_wait_for_ready(self, wait_for_ready): + if wait_for_ready is not None: + if wait_for_ready: + self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ + cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) + elif not wait_for_ready: + self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ + cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) + return self + + class _ChannelCallState(object): def __init__(self, channel): @@ -979,5 +1048,9 @@ class Channel(grpc.Channel): # for as long as they are in use and to close them after using them, # then deletion of this grpc._channel.Channel instance can be made to # effect closure of the underlying cygrpc.Channel instance. - cygrpc.fork_unregister_channel(self) - _moot(self._connectivity_state) + if cygrpc is not None: # Globals may have already been collected. + cygrpc.fork_unregister_channel(self) + # This prevent the failed-at-initializing object removal from failing. + # Though the __init__ failed, the removal will still trigger __del__. + if _moot is not None and hasattr(self, "_connectivity_state"): + _moot(self._connectivity_state) |