aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/commands.py2
-rw-r--r--src/python/grpcio/grpc/__init__.py57
-rw-r--r--src/python/grpcio/grpc/_channel.py32
-rw-r--r--src/python/grpcio/grpc/beta/_client_adaptations.py8
-rw-r--r--src/python/grpcio/tests/tests.json1
-rw-r--r--src/python/grpcio/tests/unit/_channel_connectivity_test.py4
-rw-r--r--src/python/grpcio/tests/unit/_metadata_code_details_test.py523
-rw-r--r--src/python/grpcio/tests/unit/_metadata_test.py8
-rw-r--r--src/python/grpcio/tests/unit/_rpc_test.py21
9 files changed, 615 insertions, 41 deletions
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py
index 3e974eba0a..f498ed4190 100644
--- a/src/python/grpcio/commands.py
+++ b/src/python/grpcio/commands.py
@@ -182,6 +182,8 @@ class BuildProtoModules(setuptools.Command):
'--plugin=protoc-gen-python-grpc={}'.format(
self.grpc_python_plugin_command),
'-I {}'.format(GRPC_STEM),
+ '-I .',
+ '-I {}/third_party/protobuf/src'.format(GRPC_STEM),
'--python_out={}'.format(PROTO_GEN_STEM),
'--python-grpc_out={}'.format(PROTO_GEN_STEM),
] + [path]
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index d07a7a721d..b3eeaad1f7 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -436,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC."""
@abc.abstractmethod
- def __call__(
- self, request, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ def __call__(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -447,12 +445,30 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
- with_call: Whether or not to include return a Call for the RPC in addition
- to the response.
Returns:
- The response value for the RPC, and a Call for the RPC if with_call was
- set to True at invocation.
+ The response value for the RPC.
+
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request: The request value for the RPC.
+ timeout: An optional durating of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
+
+ Returns:
+ The response value for the RPC and a Call value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
@@ -508,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
@abc.abstractmethod
def __call__(
- self, request_iterator, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
@@ -518,8 +533,6 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
- with_call: Whether or not to include return a Call for the RPC in addition
- to the response.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
@@ -533,6 +546,28 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError()
@abc.abstractmethod
+ def with_call(
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
+ """Synchronously invokes the underlying RPC.
+
+ Args:
+ request_iterator: An iterator that yields request values for the RPC.
+ timeout: An optional duration of time in seconds to allow for the RPC.
+ metadata: An optional sequence of pairs of bytes to be transmitted to the
+ service-side of the RPC.
+ credentials: An optional CallCredentials for the RPC.
+
+ Returns:
+ The response value for the RPC and a Call for the RPC.
+
+ Raises:
+ RpcError: Indicating that the RPC terminated with non-OK status. The
+ raised RpcError will also be a Call for the RPC affording the RPC's
+ metadata, status code, and details.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
def future(
self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Asynchronously invokes the underlying RPC.
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index 7cdd542de2..1f34beeb2c 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -449,9 +449,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
)
return state, operations, deadline, deadline_timespec, None
- def __call__(
- self, request, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ def _blocking(self, request, timeout, metadata, credentials):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
request, timeout, metadata)
if rendezvous:
@@ -464,7 +462,15 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
call.set_credentials(credentials._credentials)
call.start_batch(cygrpc.Operations(operations), None)
_handle_event(completion_queue.poll(), state, self._response_deserializer)
- return _end_unary_response_blocking(state, with_call, deadline)
+ return state, deadline
+
+ def __call__(self, request, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, False, deadline)
+
+ def with_call(self, request, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(request, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, True, deadline)
def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
@@ -532,9 +538,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer
- def __call__(
- self, request_iterator, timeout=None, metadata=None, credentials=None,
- with_call=False):
+ def _blocking(self, request_iterator, timeout, metadata, credentials):
deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
completion_queue = cygrpc.CompletionQueue()
@@ -563,7 +567,19 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
state.condition.notify_all()
if not state.due:
break
- return _end_unary_response_blocking(state, with_call, deadline)
+ return state, deadline
+
+ def __call__(
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(
+ request_iterator, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, False, deadline)
+
+ def with_call(
+ self, request_iterator, timeout=None, metadata=None, credentials=None):
+ state, deadline, = self._blocking(
+ request_iterator, timeout, metadata, credentials)
+ return _end_unary_response_blocking(state, True, deadline)
def future(
self, request_iterator, timeout=None, metadata=None, credentials=None):
diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py
index 024808c540..56456cc117 100644
--- a/src/python/grpcio/grpc/beta/_client_adaptations.py
+++ b/src/python/grpcio/grpc/beta/_client_adaptations.py
@@ -186,9 +186,9 @@ def _blocking_unary_unary(
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call:
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options), with_call=True)
+ credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call)
else:
return multi_callable(
@@ -237,9 +237,9 @@ def _blocking_stream_unary(
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call:
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request_iterator, timeout=timeout, metadata=effective_metadata,
- credentials=_credentials(protocol_options), with_call=True)
+ credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call)
else:
return multi_callable(
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 8e509621a8..e384a2fc13 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -24,6 +24,7 @@
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
"_logging_pool_test.LoggingPoolTest",
+ "_metadata_code_details_test.MetadataCodeDetailsTest",
"_metadata_test.MetadataTest",
"_not_found_test.NotFoundTest",
"_python_plugin_test.PythonPluginTest",
diff --git a/src/python/grpcio/tests/unit/_channel_connectivity_test.py b/src/python/grpcio/tests/unit/_channel_connectivity_test.py
index a1575efada..ae8de523ec 100644
--- a/src/python/grpcio/tests/unit/_channel_connectivity_test.py
+++ b/src/python/grpcio/tests/unit/_channel_connectivity_test.py
@@ -135,12 +135,12 @@ class ChannelConnectivityTest(unittest.TestCase):
self.assertNotIn(
grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
self.assertNotIn(
- grpc.ChannelConnectivity.FATAL_FAILURE, third_connectivities)
+ grpc.ChannelConnectivity.SHUTDOWN, third_connectivities)
self.assertNotIn(
grpc.ChannelConnectivity.TRANSIENT_FAILURE,
fourth_connectivities)
self.assertNotIn(
- grpc.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities)
+ grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities)
def test_reachable_then_unreachable_channel_connectivity(self):
server = _server.Server((), futures.ThreadPoolExecutor(max_workers=0))
diff --git a/src/python/grpcio/tests/unit/_metadata_code_details_test.py b/src/python/grpcio/tests/unit/_metadata_code_details_test.py
new file mode 100644
index 0000000000..dd74268cbf
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_metadata_code_details_test.py
@@ -0,0 +1,523 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests application-provided metadata, status code, and details."""
+
+import threading
+import unittest
+
+import grpc
+from grpc.framework.foundation import logging_pool
+
+from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+from tests.unit.framework.common import test_control
+
+_SERIALIZED_REQUEST = b'\x46\x47\x48'
+_SERIALIZED_RESPONSE = b'\x49\x50\x51'
+
+_REQUEST_SERIALIZER = lambda unused_request: _SERIALIZED_REQUEST
+_REQUEST_DESERIALIZER = lambda unused_serialized_request: object()
+_RESPONSE_SERIALIZER = lambda unused_response: _SERIALIZED_RESPONSE
+_RESPONSE_DESERIALIZER = lambda unused_serialized_resopnse: object()
+
+_SERVICE = b'test.TestService'
+_UNARY_UNARY = b'UnaryUnary'
+_UNARY_STREAM = b'UnaryStream'
+_STREAM_UNARY = b'StreamUnary'
+_STREAM_STREAM = b'StreamStream'
+
+_CLIENT_METADATA = (
+ (b'client-md-key', b'client-md-key'),
+ (b'client-md-key-bin', b'\x00\x01')
+)
+
+_SERVER_INITIAL_METADATA = (
+ (b'server-initial-md-key', b'server-initial-md-value'),
+ (b'server-initial-md-key-bin', b'\x00\x02')
+)
+
+_SERVER_TRAILING_METADATA = (
+ (b'server-trailing-md-key', b'server-trailing-md-value'),
+ (b'server-trailing-md-key-bin', b'\x00\x03')
+)
+
+_NON_OK_CODE = grpc.StatusCode.NOT_FOUND
+_DETAILS = b'Test details!'
+
+
+class _Servicer(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._code = None
+ self._details = None
+ self._exception = False
+ self._return_none = False
+ self._received_client_metadata = None
+
+ def unary_unary(self, request, context):
+ with self._lock:
+ self._received_client_metadata = context.invocation_metadata()
+ context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ if self._code is not None:
+ context.set_code(self._code)
+ if self._details is not None:
+ context.set_details(self._details)
+ if self._exception:
+ raise test_control.Defect()
+ else:
+ return None if self._return_none else object()
+
+ def unary_stream(self, request, context):
+ with self._lock:
+ self._received_client_metadata = context.invocation_metadata()
+ context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ if self._code is not None:
+ context.set_code(self._code)
+ if self._details is not None:
+ context.set_details(self._details)
+ for _ in range(test_constants.STREAM_LENGTH // 2):
+ yield _SERIALIZED_RESPONSE
+ if self._exception:
+ raise test_control.Defect()
+
+ def stream_unary(self, request_iterator, context):
+ with self._lock:
+ self._received_client_metadata = context.invocation_metadata()
+ context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ if self._code is not None:
+ context.set_code(self._code)
+ if self._details is not None:
+ context.set_details(self._details)
+ # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the
+ # request iterator.
+ for ignored_request in request_iterator:
+ pass
+ if self._exception:
+ raise test_control.Defect()
+ else:
+ return None if self._return_none else _SERIALIZED_RESPONSE
+
+ def stream_stream(self, request_iterator, context):
+ with self._lock:
+ self._received_client_metadata = context.invocation_metadata()
+ context.send_initial_metadata(_SERVER_INITIAL_METADATA)
+ context.set_trailing_metadata(_SERVER_TRAILING_METADATA)
+ if self._code is not None:
+ context.set_code(self._code)
+ if self._details is not None:
+ context.set_details(self._details)
+ # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the
+ # request iterator.
+ for ignored_request in request_iterator:
+ pass
+ for _ in range(test_constants.STREAM_LENGTH // 3):
+ yield object()
+ if self._exception:
+ raise test_control.Defect()
+
+ def set_code(self, code):
+ with self._lock:
+ self._code = code
+
+ def set_details(self, details):
+ with self._lock:
+ self._details = details
+
+ def set_exception(self):
+ with self._lock:
+ self._exception = True
+
+ def set_return_none(self):
+ with self._lock:
+ self._return_none = True
+
+ def received_client_metadata(self):
+ with self._lock:
+ return self._received_client_metadata
+
+
+def _generic_handler(servicer):
+ method_handlers = {
+ _UNARY_UNARY: grpc.unary_unary_rpc_method_handler(
+ servicer.unary_unary, request_deserializer=_REQUEST_DESERIALIZER,
+ response_serializer=_RESPONSE_SERIALIZER),
+ _UNARY_STREAM: grpc.unary_stream_rpc_method_handler(
+ servicer.unary_stream),
+ _STREAM_UNARY: grpc.stream_unary_rpc_method_handler(
+ servicer.stream_unary),
+ _STREAM_STREAM: grpc.stream_stream_rpc_method_handler(
+ servicer.stream_stream, request_deserializer=_REQUEST_DESERIALIZER,
+ response_serializer=_RESPONSE_SERIALIZER),
+ }
+ return grpc.method_handlers_generic_handler(_SERVICE, method_handlers)
+
+
+class MetadataCodeDetailsTest(unittest.TestCase):
+
+ def setUp(self):
+ self._servicer = _Servicer()
+ self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ self._server = grpc.server(
+ (_generic_handler(self._servicer),), self._server_pool)
+ port = self._server.add_insecure_port('[::]:0')
+ self._server.start()
+
+ channel = grpc.insecure_channel('localhost:{}'.format(port))
+ self._unary_unary = channel.unary_unary(
+ b'/'.join((b'', _SERVICE, _UNARY_UNARY,)),
+ request_serializer=_REQUEST_SERIALIZER,
+ response_deserializer=_RESPONSE_DESERIALIZER,)
+ self._unary_stream = channel.unary_stream(
+ b'/'.join((b'', _SERVICE, _UNARY_STREAM,)),)
+ self._stream_unary = channel.stream_unary(
+ b'/'.join((b'', _SERVICE, _STREAM_UNARY,)),)
+ self._stream_stream = channel.stream_stream(
+ b'/'.join((b'', _SERVICE, _STREAM_STREAM,)),
+ request_serializer=_REQUEST_SERIALIZER,
+ response_deserializer=_RESPONSE_DESERIALIZER,)
+
+
+ def testSuccessfulUnaryUnary(self):
+ self._servicer.set_details(_DETAILS)
+
+ unused_response, call = self._unary_unary.with_call(
+ object(), metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, call.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(grpc.StatusCode.OK, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testSuccessfulUnaryStream(self):
+ self._servicer.set_details(_DETAILS)
+
+ call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
+ received_initial_metadata = call.initial_metadata()
+ for _ in call:
+ pass
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, received_initial_metadata))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(grpc.StatusCode.OK, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testSuccessfulStreamUnary(self):
+ self._servicer.set_details(_DETAILS)
+
+ unused_response, call = self._stream_unary.with_call(
+ iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, call.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(grpc.StatusCode.OK, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testSuccessfulStreamStream(self):
+ self._servicer.set_details(_DETAILS)
+
+ call = self._stream_stream(
+ iter([object()] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+ received_initial_metadata = call.initial_metadata()
+ for _ in call:
+ pass
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, received_initial_metadata))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(grpc.StatusCode.OK, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testCustomCodeUnaryUnary(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA,
+ exception_context.exception.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+ def testCustomCodeUnaryStream(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+
+ call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
+ received_initial_metadata = call.initial_metadata()
+ with self.assertRaises(grpc.RpcError):
+ for _ in call:
+ pass
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, received_initial_metadata))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testCustomCodeStreamUnary(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ self._stream_unary.with_call(
+ iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA,
+ exception_context.exception.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+ def testCustomCodeStreamStream(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+
+ call = self._stream_stream(
+ iter([object()] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+ received_initial_metadata = call.initial_metadata()
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ for _ in call:
+ pass
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, received_initial_metadata))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+ def testCustomCodeExceptionUnaryUnary(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+ self._servicer.set_exception()
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA,
+ exception_context.exception.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+ def testCustomCodeExceptionUnaryStream(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+ self._servicer.set_exception()
+
+ call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA)
+ received_initial_metadata = call.initial_metadata()
+ with self.assertRaises(grpc.RpcError):
+ for _ in call:
+ pass
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, received_initial_metadata))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testCustomCodeExceptionStreamUnary(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+ self._servicer.set_exception()
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ self._stream_unary.with_call(
+ iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA,
+ exception_context.exception.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+ def testCustomCodeExceptionStreamStream(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+ self._servicer.set_exception()
+
+ call = self._stream_stream(
+ iter([object()] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+ received_initial_metadata = call.initial_metadata()
+ with self.assertRaises(grpc.RpcError):
+ for _ in call:
+ pass
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA, received_initial_metadata))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA, call.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, call.code())
+ self.assertEqual(_DETAILS, call.details())
+
+ def testCustomCodeReturnNoneUnaryUnary(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+ self._servicer.set_return_none()
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA,
+ exception_context.exception.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+ def testCustomCodeReturnNoneStreamUnary(self):
+ self._servicer.set_code(_NON_OK_CODE)
+ self._servicer.set_details(_DETAILS)
+ self._servicer.set_return_none()
+
+ with self.assertRaises(grpc.RpcError) as exception_context:
+ self._stream_unary.with_call(
+ iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH),
+ metadata=_CLIENT_METADATA)
+
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _CLIENT_METADATA, self._servicer.received_client_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_INITIAL_METADATA,
+ exception_context.exception.initial_metadata()))
+ self.assertTrue(
+ test_common.metadata_transmitted(
+ _SERVER_TRAILING_METADATA,
+ exception_context.exception.trailing_metadata()))
+ self.assertIs(_NON_OK_CODE, exception_context.exception.code())
+ self.assertEqual(_DETAILS, exception_context.exception.details())
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/_metadata_test.py b/src/python/grpcio/tests/unit/_metadata_test.py
index 77b3901261..2cb13f236b 100644
--- a/src/python/grpcio/tests/unit/_metadata_test.py
+++ b/src/python/grpcio/tests/unit/_metadata_test.py
@@ -173,8 +173,8 @@ class MetadataTest(unittest.TestCase):
def testUnaryUnary(self):
multi_callable = self._channel.unary_unary(_UNARY_UNARY)
- unused_response, call = multi_callable(
- _REQUEST, metadata=_CLIENT_METADATA, with_call=True)
+ unused_response, call = multi_callable.with_call(
+ _REQUEST, metadata=_CLIENT_METADATA)
self.assertTrue(test_common.metadata_transmitted(
_SERVER_INITIAL_METADATA, call.initial_metadata()))
self.assertTrue(test_common.metadata_transmitted(
@@ -192,9 +192,9 @@ class MetadataTest(unittest.TestCase):
def testStreamUnary(self):
multi_callable = self._channel.stream_unary(_STREAM_UNARY)
- unused_response, call = multi_callable(
+ unused_response, call = multi_callable.with_call(
[_REQUEST] * test_constants.STREAM_LENGTH,
- metadata=_CLIENT_METADATA, with_call=True)
+ metadata=_CLIENT_METADATA)
self.assertTrue(test_common.metadata_transmitted(
_SERVER_INITIAL_METADATA, call.initial_metadata()))
self.assertTrue(test_common.metadata_transmitted(
diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py
index 8407593c86..9814504edf 100644
--- a/src/python/grpcio/tests/unit/_rpc_test.py
+++ b/src/python/grpcio/tests/unit/_rpc_test.py
@@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Test of gRPC Python's application-layer API."""
+"""Test of RPCs made against gRPC Python's application-layer API."""
import itertools
import threading
@@ -216,10 +216,9 @@ class RPCTest(unittest.TestCase):
expected_response = self._handler.handle_unary_unary(request, None)
multi_callable = _unary_unary_multi_callable(self._channel)
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request, metadata=(
- (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),),
- with_call=True)
+ (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code())
@@ -266,11 +265,11 @@ class RPCTest(unittest.TestCase):
request_iterator = iter(requests)
multi_callable = _stream_unary_multi_callable(self._channel)
- response, call = multi_callable(
+ response, call = multi_callable.with_call(
request_iterator,
metadata=(
(b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),
- ), with_call=True)
+ ))
self.assertEqual(expected_response, response)
self.assertIs(grpc.StatusCode.OK, call.code())
@@ -525,10 +524,9 @@ class RPCTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(self._channel)
with self._control.pause():
with self.assertRaises(grpc.RpcError) as exception_context:
- multi_callable(
+ multi_callable.with_call(
request, timeout=test_constants.SHORT_TIMEOUT,
- metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),),
- with_call=True)
+ metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),))
self.assertIsNotNone(exception_context.exception.initial_metadata())
self.assertIs(
@@ -640,10 +638,9 @@ class RPCTest(unittest.TestCase):
multi_callable = _unary_unary_multi_callable(self._channel)
with self._control.fail():
with self.assertRaises(grpc.RpcError) as exception_context:
- multi_callable(
+ multi_callable.with_call(
request,
- metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),),
- with_call=True)
+ metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),))
self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code())