aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
authorGravatar Lidi Zheng <scallopsky@gmail.com>2018-11-07 11:10:23 -0800
committerGravatar GitHub <noreply@github.com>2018-11-07 11:10:23 -0800
commit678ea0895051646c80e59bc7ca6fe16d942e073d (patch)
treea17a1cff3132fcd26af4ff56875cc5c4721a8448 /src/python/grpcio
parenta7a3f5b42e21caa6be1617220a35afc5d67ad7f0 (diff)
parent4821221e3a430b7276408048d8f3fb4ee4c55fd6 (diff)
Merge pull request #16919 from lidizheng/wait-for-ready
Add wait-for-ready semantics
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/grpc/__init__.py57
-rw-r--r--src/python/grpcio/grpc/_channel.py121
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi4
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi6
-rw-r--r--src/python/grpcio/grpc/_interceptor.py126
5 files changed, 239 insertions, 75 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 43e382b875..df98dd10ad 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -387,6 +387,7 @@ class ClientCallDetails(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to
the service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready mechanism.
"""
@@ -638,7 +639,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC from client-side."""
@abc.abstractmethod
- def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ def __call__(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -648,6 +654,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
The response value for the RPC.
@@ -660,7 +668,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError()
@abc.abstractmethod
- def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ def with_call(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -670,6 +683,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
The response value for the RPC and a Call value for the RPC.
@@ -682,7 +697,12 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError()
@abc.abstractmethod
- def future(self, request, timeout=None, metadata=None, credentials=None):
+ def future(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
"""Asynchronously invokes the underlying RPC.
Args:
@@ -692,6 +712,8 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
An object that is both a Call for the RPC and a Future.
@@ -707,7 +729,12 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-stream RPC from client-side."""
@abc.abstractmethod
- def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ def __call__(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
"""Invokes the underlying RPC.
Args:
@@ -717,6 +744,8 @@ class UnaryStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
An object that is both a Call for the RPC and an iterator of
@@ -735,7 +764,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -746,6 +776,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
The response value for the RPC.
@@ -762,7 +794,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
"""Synchronously invokes the underlying RPC on the client.
Args:
@@ -773,6 +806,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
The response value for the RPC and a Call object for the RPC.
@@ -789,7 +824,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
"""Asynchronously invokes the underlying RPC on the client.
Args:
@@ -799,6 +835,8 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
An object that is both a Call for the RPC and a Future.
@@ -818,7 +856,8 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
"""Invokes the underlying RPC on the client.
Args:
@@ -828,6 +867,8 @@ class StreamStreamMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: Optional :term:`metadata` to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
+ wait_for_ready: An optional flag to enable wait for ready
+ mechanism
Returns:
An object that is both a Call for the RPC and an iterator of
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 734eac3801..3ff7658748 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -467,10 +467,11 @@ def _end_unary_response_blocking(state, call, with_call, deadline):
raise _Rendezvous(state, None, None, deadline)
-def _stream_unary_invocation_operationses(metadata):
+def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):
return (
(
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(metadata,
+ initial_metadata_flags),
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
),
@@ -478,15 +479,19 @@ def _stream_unary_invocation_operationses(metadata):
)
-def _stream_unary_invocation_operationses_and_tags(metadata):
+def _stream_unary_invocation_operationses_and_tags(metadata,
+ initial_metadata_flags):
return tuple((
operations,
None,
- ) for operations in _stream_unary_invocation_operationses(metadata))
+ )
+ for operations in _stream_unary_invocation_operationses(
+ metadata, initial_metadata_flags))
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
+ # pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
@@ -495,15 +500,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
- def _prepare(self, request, timeout, metadata):
+ def _prepare(self, request, timeout, metadata, wait_for_ready):
deadline, serialized_request, rendezvous = _start_unary_request(
request, timeout, self._request_serializer)
+ initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+ wait_for_ready)
if serialized_request is None:
return None, None, None, rendezvous
else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = (
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(metadata,
+ initial_metadata_flags),
cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
@@ -512,9 +520,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
)
return state, operations, deadline, None
- def _blocking(self, request, timeout, metadata, credentials):
+ def _blocking(self, request, timeout, metadata, credentials,
+ wait_for_ready):
state, operations, deadline, rendezvous = self._prepare(
- request, timeout, metadata)
+ request, timeout, metadata, wait_for_ready)
if state is None:
raise rendezvous
else:
@@ -528,17 +537,34 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
_handle_event(event, state, self._response_deserializer)
return state, call,
- def __call__(self, request, timeout=None, metadata=None, credentials=None):
- state, call, = self._blocking(request, timeout, metadata, credentials)
+ def __call__(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
+ state, call, = self._blocking(request, timeout, metadata, credentials,
+ wait_for_ready)
return _end_unary_response_blocking(state, call, False, None)
- def with_call(self, request, timeout=None, metadata=None, credentials=None):
- state, call, = self._blocking(request, timeout, metadata, credentials)
+ def with_call(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
+ state, call, = self._blocking(request, timeout, metadata, credentials,
+ wait_for_ready)
return _end_unary_response_blocking(state, call, True, None)
- def future(self, request, timeout=None, metadata=None, credentials=None):
+ def future(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
state, operations, deadline, rendezvous = self._prepare(
- request, timeout, metadata)
+ request, timeout, metadata, wait_for_ready)
if state is None:
raise rendezvous
else:
@@ -553,6 +579,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
+ # pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
@@ -561,16 +588,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
- def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ def __call__(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
deadline, serialized_request, rendezvous = _start_unary_request(
request, timeout, self._request_serializer)
+ initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+ wait_for_ready)
if serialized_request is None:
raise rendezvous
else:
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
operationses = (
(
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(metadata,
+ initial_metadata_flags),
cygrpc.SendMessageOperation(serialized_request,
_EMPTY_FLAGS),
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
@@ -589,6 +624,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
+ # pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
@@ -597,13 +633,17 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
- def _blocking(self, request_iterator, timeout, metadata, credentials):
+ def _blocking(self, request_iterator, timeout, metadata, credentials,
+ wait_for_ready):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
+ initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+ wait_for_ready)
call = self._channel.segregated_call(
0, self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials,
- _stream_unary_invocation_operationses_and_tags(metadata))
+ _stream_unary_invocation_operationses_and_tags(
+ metadata, initial_metadata_flags))
_consume_request_iterator(request_iterator, state, call,
self._request_serializer, None)
while True:
@@ -619,32 +659,38 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
state, call, = self._blocking(request_iterator, timeout, metadata,
- credentials)
+ credentials, wait_for_ready)
return _end_unary_response_blocking(state, call, False, None)
def with_call(self,
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
state, call, = self._blocking(request_iterator, timeout, metadata,
- credentials)
+ credentials, wait_for_ready)
return _end_unary_response_blocking(state, call, True, None)
def future(self,
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
event_handler = _event_handler(state, self._response_deserializer)
+ initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+ wait_for_ready)
call = self._managed_call(
0, self._method, None, deadline, metadata, None
if credentials is None else credentials._credentials,
- _stream_unary_invocation_operationses(metadata), event_handler)
+ _stream_unary_invocation_operationses(
+ metadata, initial_metadata_flags), event_handler)
_consume_request_iterator(request_iterator, state, call,
self._request_serializer, event_handler)
return _Rendezvous(state, call, self._response_deserializer, deadline)
@@ -652,6 +698,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
+ # pylint: disable=too-many-arguments
def __init__(self, channel, managed_call, method, request_serializer,
response_deserializer):
self._channel = channel
@@ -664,12 +711,16 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
deadline = _deadline(timeout)
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
+ initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
+ wait_for_ready)
operationses = (
(
- cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(metadata,
+ initial_metadata_flags),
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
),
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
@@ -684,6 +735,24 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
return _Rendezvous(state, call, self._response_deserializer, deadline)
+class _InitialMetadataFlags(int):
+ """Stores immutable initial metadata flags"""
+
+ def __new__(cls, value=_EMPTY_FLAGS):
+ value &= cygrpc.InitialMetadataFlags.used_mask
+ return super(_InitialMetadataFlags, cls).__new__(cls, value)
+
+ def with_wait_for_ready(self, wait_for_ready):
+ if wait_for_ready is not None:
+ if wait_for_ready:
+ self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
+ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
+ elif not wait_for_ready:
+ self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
+ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
+ return self
+
+
class _ChannelCallState(object):
def __init__(self, channel):
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 4781219319..23428f0b0c 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -140,6 +140,10 @@ cdef extern from "grpc/grpc.h":
const int GRPC_WRITE_NO_COMPRESS
const int GRPC_WRITE_USED_MASK
+ const int GRPC_INITIAL_METADATA_WAIT_FOR_READY
+ const int GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
+ const int GRPC_INITIAL_METADATA_USED_MASK
+
const int GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
ctypedef struct grpc_completion_queue:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
index c39fef08fa..53f0c7f0bb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/metadata.pyx.pxi
@@ -15,6 +15,12 @@
import collections
+class InitialMetadataFlags:
+ used_mask = GRPC_INITIAL_METADATA_USED_MASK
+ wait_for_ready = GRPC_INITIAL_METADATA_WAIT_FOR_READY
+ wait_for_ready_explicitly_set = GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET
+
+
_Metadatum = collections.namedtuple('_Metadatum', ('key', 'value',))
diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py
index 1d2d374ad1..4345114026 100644
--- a/src/python/grpcio/grpc/_interceptor.py
+++ b/src/python/grpcio/grpc/_interceptor.py
@@ -46,7 +46,7 @@ def service_pipeline(interceptors):
class _ClientCallDetails(
collections.namedtuple(
'_ClientCallDetails',
- ('method', 'timeout', 'metadata', 'credentials')),
+ ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready')),
grpc.ClientCallDetails):
pass
@@ -72,7 +72,12 @@ def _unwrap_client_call_details(call_details, default_details):
except AttributeError:
credentials = default_details.credentials
- return method, timeout, metadata, credentials
+ try:
+ wait_for_ready = call_details.wait_for_ready
+ except AttributeError:
+ wait_for_ready = default_details.wait_for_ready
+
+ return method, timeout, metadata, credentials, wait_for_ready
class _FailureOutcome(grpc.RpcError, grpc.Future, grpc.Call):
@@ -193,28 +198,39 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
self._method = method
self._interceptor = interceptor
- def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ def __call__(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
response, ignored_call = self._with_call(
request,
timeout=timeout,
metadata=metadata,
- credentials=credentials)
+ credentials=credentials,
+ wait_for_ready=wait_for_ready)
return response
- def _with_call(self, request, timeout=None, metadata=None,
- credentials=None):
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
+ def _with_call(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
+ client_call_details = _ClientCallDetails(
+ self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request):
- new_method, new_timeout, new_metadata, new_credentials = (
+ new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details))
try:
response, call = self._thunk(new_method).with_call(
request,
timeout=new_timeout,
metadata=new_metadata,
- credentials=new_credentials)
+ credentials=new_credentials,
+ wait_for_ready=new_wait_for_ready)
return _UnaryOutcome(response, call)
except grpc.RpcError:
raise
@@ -225,25 +241,37 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
continuation, client_call_details, request)
return call.result(), call
- def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ def with_call(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
return self._with_call(
request,
timeout=timeout,
metadata=metadata,
- credentials=credentials)
+ credentials=credentials,
+ wait_for_ready=wait_for_ready)
- def future(self, request, timeout=None, metadata=None, credentials=None):
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
+ def future(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
+ client_call_details = _ClientCallDetails(
+ self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request):
- new_method, new_timeout, new_metadata, new_credentials = (
+ new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method).future(
request,
timeout=new_timeout,
metadata=new_metadata,
- credentials=new_credentials)
+ credentials=new_credentials,
+ wait_for_ready=new_wait_for_ready)
try:
return self._interceptor.intercept_unary_unary(
@@ -259,18 +287,24 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._method = method
self._interceptor = interceptor
- def __call__(self, request, timeout=None, metadata=None, credentials=None):
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
+ def __call__(self,
+ request,
+ timeout=None,
+ metadata=None,
+ credentials=None,
+ wait_for_ready=None):
+ client_call_details = _ClientCallDetails(
+ self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request):
- new_method, new_timeout, new_metadata, new_credentials = (
+ new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method)(
request,
timeout=new_timeout,
metadata=new_metadata,
- credentials=new_credentials)
+ credentials=new_credentials,
+ wait_for_ready=new_wait_for_ready)
try:
return self._interceptor.intercept_unary_stream(
@@ -290,31 +324,35 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
response, ignored_call = self._with_call(
request_iterator,
timeout=timeout,
metadata=metadata,
- credentials=credentials)
+ credentials=credentials,
+ wait_for_ready=wait_for_ready)
return response
def _with_call(self,
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
+ credentials=None,
+ wait_for_ready=None):
+ client_call_details = _ClientCallDetails(
+ self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request_iterator):
- new_method, new_timeout, new_metadata, new_credentials = (
+ new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details))
try:
response, call = self._thunk(new_method).with_call(
request_iterator,
timeout=new_timeout,
metadata=new_metadata,
- credentials=new_credentials)
+ credentials=new_credentials,
+ wait_for_ready=new_wait_for_ready)
return _UnaryOutcome(response, call)
except grpc.RpcError:
raise
@@ -329,29 +367,33 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
+ credentials=None,
+ wait_for_ready=None):
return self._with_call(
request_iterator,
timeout=timeout,
metadata=metadata,
- credentials=credentials)
+ credentials=credentials,
+ wait_for_ready=wait_for_ready)
def future(self,
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
+ credentials=None,
+ wait_for_ready=None):
+ client_call_details = _ClientCallDetails(
+ self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request_iterator):
- new_method, new_timeout, new_metadata, new_credentials = (
+ new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method).future(
request_iterator,
timeout=new_timeout,
metadata=new_metadata,
- credentials=new_credentials)
+ credentials=new_credentials,
+ wait_for_ready=new_wait_for_ready)
try:
return self._interceptor.intercept_stream_unary(
@@ -371,18 +413,20 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
request_iterator,
timeout=None,
metadata=None,
- credentials=None):
- client_call_details = _ClientCallDetails(self._method, timeout,
- metadata, credentials)
+ credentials=None,
+ wait_for_ready=None):
+ client_call_details = _ClientCallDetails(
+ self._method, timeout, metadata, credentials, wait_for_ready)
def continuation(new_details, request_iterator):
- new_method, new_timeout, new_metadata, new_credentials = (
+ new_method, new_timeout, new_metadata, new_credentials, new_wait_for_ready = (
_unwrap_client_call_details(new_details, client_call_details))
return self._thunk(new_method)(
request_iterator,
timeout=new_timeout,
metadata=new_metadata,
- credentials=new_credentials)
+ credentials=new_credentials,
+ wait_for_ready=new_wait_for_ready)
try:
return self._interceptor.intercept_stream_stream(