diff options
author | Lidi Zheng <scallopsky@gmail.com> | 2018-11-07 11:10:23 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-07 11:10:23 -0800 |
commit | 678ea0895051646c80e59bc7ca6fe16d942e073d (patch) | |
tree | a17a1cff3132fcd26af4ff56875cc5c4721a8448 /src/python/grpcio | |
parent | a7a3f5b42e21caa6be1617220a35afc5d67ad7f0 (diff) | |
parent | 4821221e3a430b7276408048d8f3fb4ee4c55fd6 (diff) |
Merge pull request #16919 from lidizheng/wait-for-ready
Add wait-for-ready semantics
Diffstat (limited to 'src/python/grpcio')
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 57 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 121 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 4 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi | 6 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_interceptor.py | 126 |
5 files changed, 239 insertions, 75 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 43e382b875..df98dd10ad 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -387,6 +387,7 @@ class ClientCallDetails(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready mechanism. """ @@ -638,7 +639,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): """Affords invoking a unary-unary RPC from client-side.""" @abc.abstractmethod - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): """Synchronously invokes the underlying RPC. Args: @@ -648,6 +654,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: The response value for the RPC. @@ -660,7 +668,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() @abc.abstractmethod - def with_call(self, request, timeout=None, metadata=None, credentials=None): + def with_call(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): """Synchronously invokes the underlying RPC. Args: @@ -670,6 +683,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: The response value for the RPC and a Call value for the RPC. @@ -682,7 +697,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() @abc.abstractmethod - def future(self, request, timeout=None, metadata=None, credentials=None): + def future(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): """Asynchronously invokes the underlying RPC. Args: @@ -692,6 +712,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: An object that is both a Call for the RPC and a Future. @@ -707,7 +729,12 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): """Affords invoking a unary-stream RPC from client-side.""" @abc.abstractmethod - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): """Invokes the underlying RPC. Args: @@ -717,6 +744,8 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: An optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: An object that is both a Call for the RPC and an iterator of @@ -735,7 +764,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): """Synchronously invokes the underlying RPC. Args: @@ -746,6 +776,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: The response value for the RPC. @@ -762,7 +794,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): """Synchronously invokes the underlying RPC on the client. Args: @@ -773,6 +806,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: The response value for the RPC and a Call object for the RPC. @@ -789,7 +824,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): """Asynchronously invokes the underlying RPC on the client. Args: @@ -799,6 +835,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: An object that is both a Call for the RPC and a Future. @@ -818,7 +856,8 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): """Invokes the underlying RPC on the client. Args: @@ -828,6 +867,8 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: Optional :term:`metadata` to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. + wait_for_ready: An optional flag to enable wait for ready + mechanism Returns: An object that is both a Call for the RPC and an iterator of diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 734eac3801..3ff7658748 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -467,10 +467,11 @@ def _end_unary_response_blocking(state, call, with_call, deadline): raise _Rendezvous(state, None, None, deadline) -def _stream_unary_invocation_operationses(metadata): +def _stream_unary_invocation_operationses(metadata, initial_metadata_flags): return ( ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ), @@ -478,15 +479,19 @@ def _stream_unary_invocation_operationses(metadata): ) -def _stream_unary_invocation_operationses_and_tags(metadata): +def _stream_unary_invocation_operationses_and_tags(metadata, + initial_metadata_flags): return tuple(( operations, None, - ) for operations in _stream_unary_invocation_operationses(metadata)) + ) + for operations in _stream_unary_invocation_operationses( + metadata, initial_metadata_flags)) class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -495,15 +500,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def _prepare(self, request, timeout, metadata): + def _prepare(self, request, timeout, metadata, wait_for_ready): deadline, serialized_request, rendezvous = _start_unary_request( request, timeout, self._request_serializer) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) if serialized_request is None: return None, None, None, rendezvous else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), @@ -512,9 +520,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ) return state, operations, deadline, None - def _blocking(self, request, timeout, metadata, credentials): + def _blocking(self, request, timeout, metadata, credentials, + wait_for_ready): state, operations, deadline, rendezvous = self._prepare( - request, timeout, metadata) + request, timeout, metadata, wait_for_ready) if state is None: raise rendezvous else: @@ -528,17 +537,34 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): _handle_event(event, state, self._response_deserializer) return state, call, - def __call__(self, request, timeout=None, metadata=None, credentials=None): - state, call, = self._blocking(request, timeout, metadata, credentials) + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + state, call, = self._blocking(request, timeout, metadata, credentials, + wait_for_ready) return _end_unary_response_blocking(state, call, False, None) - def with_call(self, request, timeout=None, metadata=None, credentials=None): - state, call, = self._blocking(request, timeout, metadata, credentials) + def with_call(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + state, call, = self._blocking(request, timeout, metadata, credentials, + wait_for_ready) return _end_unary_response_blocking(state, call, True, None) - def future(self, request, timeout=None, metadata=None, credentials=None): + def future(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): state, operations, deadline, rendezvous = self._prepare( - request, timeout, metadata) + request, timeout, metadata, wait_for_ready) if state is None: raise rendezvous else: @@ -553,6 +579,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -561,16 +588,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): deadline, serialized_request, rendezvous = _start_unary_request( request, timeout, self._request_serializer) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) if serialized_request is None: raise rendezvous else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) operationses = ( ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), @@ -589,6 +624,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -597,13 +633,17 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def _blocking(self, request_iterator, timeout, metadata, credentials): + def _blocking(self, request_iterator, timeout, metadata, credentials, + wait_for_ready): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) call = self._channel.segregated_call( 0, self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - _stream_unary_invocation_operationses_and_tags(metadata)) + _stream_unary_invocation_operationses_and_tags( + metadata, initial_metadata_flags)) _consume_request_iterator(request_iterator, state, call, self._request_serializer, None) while True: @@ -619,32 +659,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): state, call, = self._blocking(request_iterator, timeout, metadata, - credentials) + credentials, wait_for_ready) return _end_unary_response_blocking(state, call, False, None) def with_call(self, request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): state, call, = self._blocking(request_iterator, timeout, metadata, - credentials) + credentials, wait_for_ready) return _end_unary_response_blocking(state, call, True, None) def future(self, request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) event_handler = _event_handler(state, self._response_deserializer) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) call = self._managed_call( 0, self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - _stream_unary_invocation_operationses(metadata), event_handler) + _stream_unary_invocation_operationses( + metadata, initial_metadata_flags), event_handler) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -652,6 +698,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + # pylint: disable=too-many-arguments def __init__(self, channel, managed_call, method, request_serializer, response_deserializer): self._channel = channel @@ -664,12 +711,16 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): deadline = _deadline(timeout) state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) + initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( + wait_for_ready) operationses = ( ( - cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(metadata, + initial_metadata_flags), cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ), (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), @@ -684,6 +735,24 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): return _Rendezvous(state, call, self._response_deserializer, deadline) +class _InitialMetadataFlags(int): + """Stores immutable initial metadata flags""" + + def __new__(cls, value=_EMPTY_FLAGS): + value &= cygrpc.InitialMetadataFlags.used_mask + return super(_InitialMetadataFlags, cls).__new__(cls, value) + + def with_wait_for_ready(self, wait_for_ready): + if wait_for_ready is not None: + if wait_for_ready: + self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ + cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) + elif not wait_for_ready: + self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ + cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) + return self + + class _ChannelCallState(object): def __init__(self, channel): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 4781219319..23428f0b0c 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -140,6 +140,10 @@ cdef extern from "grpc/grpc.h": const int GRPC_WRITE_NO_COMPRESS const int GRPC_WRITE_USED_MASK + const int GRPC_INITIAL_METADATA_WAIT_FOR_READY + const int GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET + const int GRPC_INITIAL_METADATA_USED_MASK + const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS ctypedef struct grpc_completion_queue: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi index c39fef08fa..53f0c7f0bb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi @@ -15,6 +15,12 @@ import collections +class InitialMetadataFlags: + used_mask = GRPC_INITIAL_METADATA_USED_MASK + wait_for_ready = GRPC_INITIAL_METADATA_WAIT_FOR_READY + wait_for_ready_explicitly_set = GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET + + _Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',)) diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index 1d2d374ad1..4345114026 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -46,7 +46,7 @@ def service_pipeline(interceptors): class _ClientCallDetails( collections.namedtuple( '_ClientCallDetails', - ('method', 'timeout', 'metadata', 'credentials')), + ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')), grpc.ClientCallDetails): pass @@ -72,7 +72,12 @@ def _unwrap_client_call_details(call_details, default_details): except AttributeError: credentials = default_details.credentials - return method, timeout, metadata, credentials + try: + wait_for_ready = call_details.wait_for_ready + except AttributeError: + wait_for_ready = default_details.wait_for_ready + + return method, timeout, metadata, credentials, wait_for_ready class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call): @@ -193,28 +198,39 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._method = method self._interceptor = interceptor - def __call__(self, request, timeout=None, metadata=None, credentials=None): + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): response, ignored_call = self._with_call( request, timeout=timeout, metadata=metadata, - credentials=credentials) + credentials=credentials, + wait_for_ready=wait_for_ready) return response - def _with_call(self, request, timeout=None, metadata=None, - credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) + def _with_call(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + client_call_details = _ClientCallDetails( + self._method, timeout, metadata, credentials, wait_for_ready) def continuation(new_details, request): - new_method, new_timeout, new_metadata, new_credentials = ( + new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = ( _unwrap_client_call_details(new_details, client_call_details)) try: response, call = self._thunk(new_method).with_call( request, timeout=new_timeout, metadata=new_metadata, - credentials=new_credentials) + credentials=new_credentials, + wait_for_ready=new_wait_for_ready) return _UnaryOutcome(response, call) except grpc.RpcError: raise @@ -225,25 +241,37 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): continuation, client_call_details, request) return call.result(), call - def with_call(self, request, timeout=None, metadata=None, credentials=None): + def with_call(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): return self._with_call( request, timeout=timeout, metadata=metadata, - credentials=credentials) + credentials=credentials, + wait_for_ready=wait_for_ready) - def future(self, request, timeout=None, metadata=None, credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) + def future(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + client_call_details = _ClientCallDetails( + self._method, timeout, metadata, credentials, wait_for_ready) def continuation(new_details, request): - new_method, new_timeout, new_metadata, new_credentials = ( + new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = ( _unwrap_client_call_details(new_details, client_call_details)) return self._thunk(new_method).future( request, timeout=new_timeout, metadata=new_metadata, - credentials=new_credentials) + credentials=new_credentials, + wait_for_ready=new_wait_for_ready) try: return self._interceptor.intercept_unary_unary( @@ -259,18 +287,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._method = method self._interceptor = interceptor - def __call__(self, request, timeout=None, metadata=None, credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) + def __call__(self, + request, + timeout=None, + metadata=None, + credentials=None, + wait_for_ready=None): + client_call_details = _ClientCallDetails( + self._method, timeout, metadata, credentials, wait_for_ready) def continuation(new_details, request): - new_method, new_timeout, new_metadata, new_credentials = ( + new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = ( _unwrap_client_call_details(new_details, client_call_details)) return self._thunk(new_method)( request, timeout=new_timeout, metadata=new_metadata, - credentials=new_credentials) + credentials=new_credentials, + wait_for_ready=new_wait_for_ready) try: return self._interceptor.intercept_unary_stream( @@ -290,31 +324,35 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): response, ignored_call = self._with_call( request_iterator, timeout=timeout, metadata=metadata, - credentials=credentials) + credentials=credentials, + wait_for_ready=wait_for_ready) return response def _with_call(self, request_iterator, timeout=None, metadata=None, - credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) + credentials=None, + wait_for_ready=None): + client_call_details = _ClientCallDetails( + self._method, timeout, metadata, credentials, wait_for_ready) def continuation(new_details, request_iterator): - new_method, new_timeout, new_metadata, new_credentials = ( + new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = ( _unwrap_client_call_details(new_details, client_call_details)) try: response, call = self._thunk(new_method).with_call( request_iterator, timeout=new_timeout, metadata=new_metadata, - credentials=new_credentials) + credentials=new_credentials, + wait_for_ready=new_wait_for_ready) return _UnaryOutcome(response, call) except grpc.RpcError: raise @@ -329,29 +367,33 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): + credentials=None, + wait_for_ready=None): return self._with_call( request_iterator, timeout=timeout, metadata=metadata, - credentials=credentials) + credentials=credentials, + wait_for_ready=wait_for_ready) def future(self, request_iterator, timeout=None, metadata=None, - credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) + credentials=None, + wait_for_ready=None): + client_call_details = _ClientCallDetails( + self._method, timeout, metadata, credentials, wait_for_ready) def continuation(new_details, request_iterator): - new_method, new_timeout, new_metadata, new_credentials = ( + new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = ( _unwrap_client_call_details(new_details, client_call_details)) return self._thunk(new_method).future( request_iterator, timeout=new_timeout, metadata=new_metadata, - credentials=new_credentials) + credentials=new_credentials, + wait_for_ready=new_wait_for_ready) try: return self._interceptor.intercept_stream_unary( @@ -371,18 +413,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): request_iterator, timeout=None, metadata=None, - credentials=None): - client_call_details = _ClientCallDetails(self._method, timeout, - metadata, credentials) + credentials=None, + wait_for_ready=None): + client_call_details = _ClientCallDetails( + self._method, timeout, metadata, credentials, wait_for_ready) def continuation(new_details, request_iterator): - new_method, new_timeout, new_metadata, new_credentials = ( + new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = ( _unwrap_client_call_details(new_details, client_call_details)) return self._thunk(new_method)( request_iterator, timeout=new_timeout, metadata=new_metadata, - credentials=new_credentials) + credentials=new_credentials, + wait_for_ready=new_wait_for_ready) try: return self._interceptor.intercept_stream_stream( |