diff options
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 36 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 40 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 4 | ||||
-rw-r--r-- | src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py | 3 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/tests.json | 1 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/unit/BUILD.bazel | 1 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/unit/_abort_test.py | 124 | ||||
-rw-r--r-- | src/python/grpcio_tests/tests/unit/_api_test.py | 1 |
8 files changed, 195 insertions, 15 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 6022fc3ef2..daf869b156 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -266,6 +266,22 @@ class StatusCode(enum.Enum): UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated') +############################# gRPC Status ################################ + + +class Status(six.with_metaclass(abc.ABCMeta)): + """Describes the status of an RPC. + + This is an EXPERIMENTAL API. + + Attributes: + code: A StatusCode object to be sent to the client. + details: An ASCII-encodable string to be sent to the client upon + termination of the RPC. + trailing_metadata: The trailing :term:`metadata` in the RPC. + """ + + ############################# gRPC Exceptions ################################ @@ -1119,6 +1135,25 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): raise NotImplementedError() @abc.abstractmethod + def abort_with_status(self, status): + """Raises an exception to terminate the RPC with a non-OK status. + + The status passed as argument will supercede any existing status code, + status message and trailing metadata. + + This is an EXPERIMENTAL API. + + Args: + status: A grpc.Status object. The status code in it must not be + StatusCode.OK. + + Raises: + Exception: An exception is always raised to signal the abortion the + RPC to the gRPC runtime. + """ + raise NotImplementedError() + + @abc.abstractmethod def set_code(self, code): """Sets the value to be used as status code upon RPC completion. @@ -1747,6 +1782,7 @@ __all__ = ( 'Future', 'ChannelConnectivity', 'StatusCode', + 'Status', 'RpcError', 'RpcContext', 'Call', diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 35fa82d56b..96118badad 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -499,6 +499,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def _prepare(self, request, timeout, metadata, wait_for_ready): deadline, serialized_request, rendezvous = _start_unary_request( @@ -528,11 +529,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): raise rendezvous else: call = self._channel.segregated_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, (( operations, None, - ),)) + ),), self._context) event = call.next_event() _handle_event(event, state, self._response_deserializer) return state, call, @@ -570,9 +572,10 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - (operations,), event_handler) + (operations,), event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -587,6 +590,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def __call__(self, request, @@ -615,9 +619,10 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): ) event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - operationses, event_handler) + operationses, event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -632,6 +637,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def _blocking(self, request_iterator, timeout, metadata, credentials, wait_for_ready): @@ -640,10 +646,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) call = self._channel.segregated_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses_and_tags( - metadata, initial_metadata_flags)) + metadata, initial_metadata_flags), self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, None) while True: @@ -687,10 +694,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses( - metadata, initial_metadata_flags), event_handler) + metadata, initial_metadata_flags), event_handler, self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -706,6 +714,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def __call__(self, request_iterator, @@ -727,9 +736,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): ) event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, operationses, - event_handler) + event_handler, self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -789,7 +799,7 @@ def _channel_managed_call_management(state): # pylint: disable=too-many-arguments def create(flags, method, host, deadline, metadata, credentials, - operationses, event_handler): + operationses, event_handler, context): """Creates a cygrpc.IntegratedCall. Args: @@ -804,7 +814,7 @@ def _channel_managed_call_management(state): started on the call. event_handler: A behavior to call to handle the events resultant from the operations on the call. - + context: Context object for distributed tracing. Returns: A cygrpc.IntegratedCall with which to conduct an RPC. """ @@ -815,7 +825,7 @@ def _channel_managed_call_management(state): with state.lock: call = state.channel.integrated_call(flags, method, host, deadline, metadata, credentials, - operationses_and_tags) + operationses_and_tags, context) if state.managed_calls == 0: state.managed_calls = 1 _run_channel_spin_thread(state) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index e939f615df..3bbfa47da5 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -291,6 +291,10 @@ class _Context(grpc.ServicerContext): self._state.abortion = Exception() raise self._state.abortion + def abort_with_status(self, status): + self._state.trailing_metadata = status.trailing_metadata + self.abort(status.code, status.details) + def set_code(self, code): with self._state.condition: self._state.code = code diff --git a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py index 90eeb130d3..5b1dfeacdf 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py @@ -70,6 +70,9 @@ class ServicerContext(grpc.ServicerContext): def abort(self, code, details): raise NotImplementedError() + def abort_with_status(self, status): + raise NotImplementedError() + def set_code(self, code): self._rpc.set_code(code) diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 9cffd3df19..44700e979e 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -19,6 +19,7 @@ "testing._server_test.FirstServiceServicerTest", "testing._time_test.StrictFakeTimeTest", "testing._time_test.StrictRealTimeTest", + "unit._abort_test.AbortTest", "unit._api_test.AllTest", "unit._api_test.ChannelConnectivityTest", "unit._api_test.ChannelTest", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index de33b81e32..4f850220f8 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -3,6 +3,7 @@ load("@grpc_python_dependencies//:requirements.bzl", "requirement") package(default_visibility = ["//visibility:public"]) GRPCIO_TESTS_UNIT = [ + "_abort_test.py", "_api_test.py", "_auth_context_test.py", "_auth_test.py", diff --git a/src/python/grpcio_tests/tests/unit/_abort_test.py b/src/python/grpcio_tests/tests/unit/_abort_test.py new file mode 100644 index 0000000000..6438f6897a --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_abort_test.py @@ -0,0 +1,124 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests server context abort mechanism""" + +import unittest +import collections +import logging + +import grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_ABORT = '/test/abort' +_ABORT_WITH_STATUS = '/test/AbortWithStatus' +_INVALID_CODE = '/test/InvalidCode' + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x00\x00\x00' + +_ABORT_DETAILS = 'Abandon ship!' +_ABORT_METADATA = (('a-trailing-metadata', '42'),) + + +class _Status( + collections.namedtuple( + '_Status', ('code', 'details', 'trailing_metadata')), grpc.Status): + pass + + +def abort_unary_unary(request, servicer_context): + servicer_context.abort( + grpc.StatusCode.INTERNAL, + _ABORT_DETAILS, + ) + raise Exception('This line should not be executed!') + + +def abort_with_status_unary_unary(request, servicer_context): + servicer_context.abort_with_status( + _Status( + code=grpc.StatusCode.INTERNAL, + details=_ABORT_DETAILS, + trailing_metadata=_ABORT_METADATA, + )) + raise Exception('This line should not be executed!') + + +def invalid_code_unary_unary(request, servicer_context): + servicer_context.abort( + 42, + _ABORT_DETAILS, + ) + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _ABORT: + return grpc.unary_unary_rpc_method_handler(abort_unary_unary) + elif handler_call_details.method == _ABORT_WITH_STATUS: + return grpc.unary_unary_rpc_method_handler( + abort_with_status_unary_unary) + elif handler_call_details.method == _INVALID_CODE: + return grpc.stream_stream_rpc_method_handler( + invalid_code_unary_unary) + else: + return None + + +class AbortTest(unittest.TestCase): + + def setUp(self): + self._server = test_common.test_server() + port = self._server.add_insecure_port('[::]:0') + self._server.add_generic_rpc_handlers((_GenericHandler(),)) + self._server.start() + + self._channel = grpc.insecure_channel('localhost:%d' % port) + + def tearDown(self): + self._channel.close() + self._server.stop(0) + + def test_abort(self): + with self.assertRaises(grpc.RpcError) as exception_context: + self._channel.unary_unary(_ABORT)(_REQUEST) + rpc_error = exception_context.exception + + self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) + self.assertEqual(rpc_error.details(), _ABORT_DETAILS) + + def test_abort_with_status(self): + with self.assertRaises(grpc.RpcError) as exception_context: + self._channel.unary_unary(_ABORT_WITH_STATUS)(_REQUEST) + rpc_error = exception_context.exception + + self.assertEqual(rpc_error.code(), grpc.StatusCode.INTERNAL) + self.assertEqual(rpc_error.details(), _ABORT_DETAILS) + self.assertEqual(rpc_error.trailing_metadata(), _ABORT_METADATA) + + def test_invalid_code(self): + with self.assertRaises(grpc.RpcError) as exception_context: + self._channel.unary_unary(_INVALID_CODE)(_REQUEST) + rpc_error = exception_context.exception + + self.assertEqual(rpc_error.code(), grpc.StatusCode.UNKNOWN) + self.assertEqual(rpc_error.details(), _ABORT_DETAILS) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_api_test.py b/src/python/grpcio_tests/tests/unit/_api_test.py index 38072861a4..427894bfe9 100644 --- a/src/python/grpcio_tests/tests/unit/_api_test.py +++ b/src/python/grpcio_tests/tests/unit/_api_test.py @@ -32,6 +32,7 @@ class AllTest(unittest.TestCase): 'Future', 'ChannelConnectivity', 'StatusCode', + 'Status', 'RpcError', 'RpcContext', 'Call', |