diff options
Diffstat (limited to 'src/python/grpcio')
21 files changed, 1282 insertions, 102 deletions
diff --git a/src/python/grpcio/commands.py b/src/python/grpcio/commands.py index 3e974eba0a..f498ed4190 100644 --- a/src/python/grpcio/commands.py +++ b/src/python/grpcio/commands.py @@ -182,6 +182,8 @@ class BuildProtoModules(setuptools.Command): '--plugin=protoc-gen-python-grpc={}'.format( self.grpc_python_plugin_command), '-I {}'.format(GRPC_STEM), + '-I .', + '-I {}/third_party/protobuf/src'.format(GRPC_STEM), '--python_out={}'.format(PROTO_GEN_STEM), '--python-grpc_out={}'.format(PROTO_GEN_STEM), ] + [path] diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 9e784c8157..b3eeaad1f7 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -29,8 +29,6 @@ """gRPC's Python API.""" -__import__('pkg_resources').declare_namespace(__name__) - import abc import enum @@ -438,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): """Affords invoking a unary-unary RPC.""" @abc.abstractmethod - def __call__( - self, request, timeout=None, metadata=None, credentials=None, - with_call=False): + def __call__(self, request, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. Args: @@ -449,12 +445,30 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: An optional sequence of pairs of bytes to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. - with_call: Whether or not to include return a Call for the RPC in addition - to the response. Returns: - The response value for the RPC, and a Call for the RPC if with_call was - set to True at invocation. + The response value for the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + + @abc.abstractmethod + def with_call(self, request, timeout=None, metadata=None, credentials=None): + """Synchronously invokes the underlying RPC. + + Args: + request: The request value for the RPC. + timeout: An optional durating of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. + + Returns: + The response value for the RPC and a Call value for the RPC. Raises: RpcError: Indicating that the RPC terminated with non-OK status. The @@ -510,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): @abc.abstractmethod def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None, - with_call=False): + self, request_iterator, timeout=None, metadata=None, credentials=None): """Synchronously invokes the underlying RPC. Args: @@ -520,8 +533,6 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): metadata: An optional sequence of pairs of bytes to be transmitted to the service-side of the RPC. credentials: An optional CallCredentials for the RPC. - with_call: Whether or not to include return a Call for the RPC in addition - to the response. Returns: The response value for the RPC, and a Call for the RPC if with_call was @@ -535,6 +546,28 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() @abc.abstractmethod + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None): + """Synchronously invokes the underlying RPC. + + Args: + request_iterator: An iterator that yields request values for the RPC. + timeout: An optional duration of time in seconds to allow for the RPC. + metadata: An optional sequence of pairs of bytes to be transmitted to the + service-side of the RPC. + credentials: An optional CallCredentials for the RPC. + + Returns: + The response value for the RPC and a Call for the RPC. + + Raises: + RpcError: Indicating that the RPC terminated with non-OK status. The + raised RpcError will also be a Call for the RPC affording the RPC's + metadata, status code, and details. + """ + raise NotImplementedError() + + @abc.abstractmethod def future( self, request_iterator, timeout=None, metadata=None, credentials=None): """Asynchronously invokes the underlying RPC. diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 7cdd542de2..cf6175d031 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -179,6 +179,7 @@ def _event_handler(state, call, response_deserializer): def _consume_request_iterator( request_iterator, state, call, request_serializer): event_handler = _event_handler(state, call, None) + def consume_request_iterator(): for request in request_iterator: serialized_request = _common.serialize(request, request_serializer) @@ -212,8 +213,18 @@ def _consume_request_iterator( ) call.start_batch(cygrpc.Operations(operations), event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) - thread = threading.Thread(target=consume_request_iterator) - thread.start() + + def stop_consumption_thread(timeout): + with state.condition: + if state.code is None: + call.cancel() + state.cancelled = True + _abort(state, grpc.StatusCode.CANCELLED, 'Cancelled!') + state.condition.notify_all() + + consumption_thread = _common.CleanupThread( + stop_consumption_thread, target=consume_request_iterator) + consumption_thread.start() class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): @@ -449,9 +460,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): ) return state, operations, deadline, deadline_timespec, None - def __call__( - self, request, timeout=None, metadata=None, credentials=None, - with_call=False): + def _blocking(self, request, timeout, metadata, credentials): state, operations, deadline, deadline_timespec, rendezvous = self._prepare( request, timeout, metadata) if rendezvous: @@ -464,7 +473,15 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): call.set_credentials(credentials._credentials) call.start_batch(cygrpc.Operations(operations), None) _handle_event(completion_queue.poll(), state, self._response_deserializer) - return _end_unary_response_blocking(state, with_call, deadline) + return state, deadline + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, False, deadline) + + def with_call(self, request, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking(request, timeout, metadata, credentials) + return _end_unary_response_blocking(state, True, deadline) def future(self, request, timeout=None, metadata=None, credentials=None): state, operations, deadline, deadline_timespec, rendezvous = self._prepare( @@ -532,9 +549,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._request_serializer = request_serializer self._response_deserializer = response_deserializer - def __call__( - self, request_iterator, timeout=None, metadata=None, credentials=None, - with_call=False): + def _blocking(self, request_iterator, timeout, metadata, credentials): deadline, deadline_timespec = _deadline(timeout) state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) completion_queue = cygrpc.CompletionQueue() @@ -563,7 +578,19 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): state.condition.notify_all() if not state.due: break - return _end_unary_response_blocking(state, with_call, deadline) + return state, deadline + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking( + request_iterator, timeout, metadata, credentials) + return _end_unary_response_blocking(state, False, deadline) + + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None): + state, deadline, = self._blocking( + request_iterator, timeout, metadata, credentials) + return _end_unary_response_blocking(state, True, deadline) def future( self, request_iterator, timeout=None, metadata=None, credentials=None): @@ -636,16 +663,27 @@ class _ChannelCallState(object): self.managed_calls = None -def _call_spin(state): - while True: - event = state.completion_queue.poll() - completed_call = event.tag(event) - if completed_call is not None: - with state.lock: - state.managed_calls.remove(completed_call) - if not state.managed_calls: - state.managed_calls = None - return +def _run_channel_spin_thread(state): + def channel_spin(): + while True: + event = state.completion_queue.poll() + completed_call = event.tag(event) + if completed_call is not None: + with state.lock: + state.managed_calls.remove(completed_call) + if not state.managed_calls: + state.managed_calls = None + return + + def stop_channel_spin(timeout): + with state.lock: + if state.managed_calls is not None: + for call in state.managed_calls: + call.cancel() + + channel_spin_thread = _common.CleanupThread( + stop_channel_spin, target=channel_spin) + channel_spin_thread.start() def _create_channel_managed_call(state): @@ -674,8 +712,7 @@ def _create_channel_managed_call(state): parent, flags, state.completion_queue, method, host, deadline) if state.managed_calls is None: state.managed_calls = set((call,)) - spin_thread = threading.Thread(target=_call_spin, args=(state,)) - spin_thread.start() + _run_channel_spin_thread(state) else: state.managed_calls.add(call) return call @@ -768,11 +805,18 @@ def _poll_connectivity(state, channel, initial_try_to_connect): _spawn_delivery(state, callbacks) +def _moot(state): + with state.lock: + del state.callbacks_and_connectivities[:] + + def _subscribe(state, callback, try_to_connect): with state.lock: if not state.callbacks_and_connectivities and not state.polling: - polling_thread = threading.Thread( - target=_poll_connectivity, + def cancel_all_subscriptions(timeout): + _moot(state) + polling_thread = _common.CleanupThread( + cancel_all_subscriptions, target=_poll_connectivity, args=(state, state.channel, bool(try_to_connect))) polling_thread.start() state.polling = True @@ -796,11 +840,6 @@ def _unsubscribe(state, callback): break -def _moot(state): - with state.lock: - del state.callbacks_and_connectivities[:] - - def _options(options): if options is None: pairs = ((cygrpc.ChannelArgKey.primary_user_agent_string, _USER_AGENT),) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 168b9751aa..f3b3d61273 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -37,6 +37,7 @@ cdef extern from "grpc/_cython/loader.h": ctypedef long int64_t int pygrpc_load_core(char*) + int pygrpc_initialize_core() void *gpr_malloc(size_t size) nogil void gpr_free(void *ptr) nogil diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index cf146f5a04..c92a8d19a7 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -45,30 +45,20 @@ include "grpc/_cython/_cygrpc/security.pyx.pxi" include "grpc/_cython/_cygrpc/server.pyx.pxi" # -# Global state +# initialize gRPC # -cdef class _ModuleState: - cdef bint is_loaded +def _initialize(): + if 'win32' in sys.platform: + filename = pkg_resources.resource_filename( + 'grpc._cython', '_windows/grpc_c.64.python') + if not pygrpc_load_core(filename): + raise ImportError('failed to load core gRPC library') + if not pygrpc_initialize_core(): + raise ImportError('failed to initialize core gRPC library') - def __cinit__(self): - if 'win32' in sys.platform: - filename = pkg_resources.resource_filename( - 'grpc._cython', '_windows/grpc_c.64.python') - if not pygrpc_load_core(filename): - raise ImportError('failed to load core gRPC library') - with nogil: - grpc_init() - self.is_loaded = True - with nogil: - grpc_set_ssl_roots_override_callback( + grpc_set_ssl_roots_override_callback( <grpc_ssl_roots_override_callback>ssl_roots_override_callback) - def __dealloc__(self): - if self.is_loaded: - with nogil: - grpc_shutdown() - -_module_state = _ModuleState() - +_initialize() diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c index 8437e74ba0..d78ec2f66e 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.c +++ b/src/python/grpcio/grpc/_cython/imports.generated.c @@ -128,6 +128,7 @@ grpc_is_binary_header_type grpc_is_binary_header_import; grpc_call_error_to_string_type grpc_call_error_to_string_import; grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import; grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; +grpc_use_signal_type grpc_use_signal_import; grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import; grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import; grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import; @@ -403,6 +404,7 @@ void pygrpc_load_imports(HMODULE library) { grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string"); grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd"); grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd"); + grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal"); grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next"); grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator"); grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity"); diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h index d52e8591b3..b3e341fe25 100644 --- a/src/python/grpcio/grpc/_cython/imports.generated.h +++ b/src/python/grpcio/grpc/_cython/imports.generated.h @@ -335,6 +335,9 @@ extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_fr typedef void(*grpc_server_add_insecure_channel_from_fd_type)(grpc_server *server, grpc_completion_queue *cq, int fd); extern grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import; #define grpc_server_add_insecure_channel_from_fd grpc_server_add_insecure_channel_from_fd_import +typedef void(*grpc_use_signal_type)(int signum); +extern grpc_use_signal_type grpc_use_signal_import; +#define grpc_use_signal grpc_use_signal_import typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it); extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import; #define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import diff --git a/src/python/grpcio/grpc/_cython/loader.c b/src/python/grpcio/grpc/_cython/loader.c index b909ad594e..86b70dbb02 100644 --- a/src/python/grpcio/grpc/_cython/loader.c +++ b/src/python/grpcio/grpc/_cython/loader.c @@ -31,6 +31,7 @@ * */ +#include <Python.h> #include "loader.h" #ifdef __cplusplus @@ -62,6 +63,12 @@ int pygrpc_load_core(char *path) { return 1; } #endif /* !GPR_WINDOWS */ +// Cython doesn't have Py_AtExit bindings, so we call the C_API directly +int pygrpc_initialize_core(void) { + grpc_init(); + return Py_AtExit(grpc_shutdown) < 0 ? 0 : 1; +} + #ifdef __cplusplus } #endif /* __cpluslus */ diff --git a/src/python/grpcio/grpc/_cython/loader.h b/src/python/grpcio/grpc/_cython/loader.h index 3b8796d39f..eb4b1a1b01 100644 --- a/src/python/grpcio/grpc/_cython/loader.h +++ b/src/python/grpcio/grpc/_cython/loader.h @@ -46,6 +46,11 @@ extern "C" { /* Attempts to load the core if necessary, and return non-zero upon succes. */ int pygrpc_load_core(char *path); +/* Initializes grpc and registers grpc_shutdown() to be called right before + * interpreter exit. Returns non-zero upon success. + */ +int pygrpc_initialize_core(void); + #ifdef __cplusplus } #endif /* __cpluslus */ diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index 024808c540..56456cc117 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -186,9 +186,9 @@ def _blocking_unary_unary( response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) if with_call: - response, call = multi_callable( + response, call = multi_callable.with_call( request, timeout=timeout, metadata=effective_metadata, - credentials=_credentials(protocol_options), with_call=True) + credentials=_credentials(protocol_options)) return response, _Rendezvous(None, None, call) else: return multi_callable( @@ -237,9 +237,9 @@ def _blocking_stream_unary( response_deserializer=response_deserializer) effective_metadata = _effective_metadata(metadata, metadata_transformer) if with_call: - response, call = multi_callable( + response, call = multi_callable.with_call( request_iterator, timeout=timeout, metadata=effective_metadata, - credentials=_credentials(protocol_options), with_call=True) + credentials=_credentials(protocol_options)) return response, _Rendezvous(None, None, call) else: return multi_callable( diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index 79e6ca87eb..c695434dac 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -161,14 +161,24 @@ class _Callback(stream.Consumer): self._condition.wait() -def _pipe_requests(request_iterator, request_consumer, servicer_context): - for request in request_iterator: - if not servicer_context.is_active(): - return - request_consumer.consume(request) - if not servicer_context.is_active(): - return - request_consumer.terminate() +def _run_request_pipe_thread(request_iterator, request_consumer, + servicer_context): + thread_joined = threading.Event() + def pipe_requests(): + for request in request_iterator: + if not servicer_context.is_active() or thread_joined.is_set(): + return + request_consumer.consume(request) + if not servicer_context.is_active() or thread_joined.is_set(): + return + request_consumer.terminate() + + def stop_request_pipe(timeout): + thread_joined.set() + + request_pipe_thread = _common.CleanupThread( + stop_request_pipe, target=pipe_requests) + request_pipe_thread.start() def _adapt_unary_unary_event(unary_unary_event): @@ -206,10 +216,8 @@ def _adapt_stream_unary_event(stream_unary_event): raise abandonment.Abandoned() request_consumer = stream_unary_event( callback.consume_and_terminate, _FaceServicerContext(servicer_context)) - request_pipe_thread = threading.Thread( - target=_pipe_requests, - args=(request_iterator, request_consumer, servicer_context,)) - request_pipe_thread.start() + _run_request_pipe_thread( + request_iterator, request_consumer, servicer_context) return callback.draw_all_values()[0] return adaptation @@ -221,10 +229,8 @@ def _adapt_stream_stream_event(stream_stream_event): raise abandonment.Abandoned() request_consumer = stream_stream_event( callback, _FaceServicerContext(servicer_context)) - request_pipe_thread = threading.Thread( - target=_pipe_requests, - args=(request_iterator, request_consumer, servicer_context,)) - request_pipe_thread.start() + _run_request_pipe_thread( + request_iterator, request_consumer, servicer_context) while True: response = callback.draw_one_value() if response is None: diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 839c555f05..b37e27c27e 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -94,6 +94,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/endpoint_pair_posix.c', 'src/core/lib/iomgr/endpoint_pair_windows.c', 'src/core/lib/iomgr/error.c', + 'src/core/lib/iomgr/ev_epoll_linux.c', 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', @@ -104,6 +105,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/iomgr_posix.c', 'src/core/lib/iomgr/iomgr_windows.c', 'src/core/lib/iomgr/load_file.c', + 'src/core/lib/iomgr/network_status_tracker.c', 'src/core/lib/iomgr/polling_entity.c', 'src/core/lib/iomgr/pollset_set_windows.c', 'src/core/lib/iomgr/pollset_windows.c', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 0c13104d9d..0f4db9d972 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -29,4 +29,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION='0.15.0.dev0' +VERSION='0.16.0.dev0' diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json index 8e509621a8..9c118ef601 100644 --- a/src/python/grpcio/tests/tests.json +++ b/src/python/grpcio/tests/tests.json @@ -10,9 +10,11 @@ "_channel_connectivity_test.ChannelConnectivityTest", "_channel_ready_future_test.ChannelReadyFutureTest", "_channel_test.ChannelTest", + "_compression_test.CompressionTest", "_connectivity_channel_test.ChannelConnectivityTest", "_connectivity_channel_test.ConnectivityStatesTest", "_empty_message_test.EmptyMessageTest", + "_exit_test.ExitTest", "_face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", "_face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", "_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", @@ -24,6 +26,7 @@ "_implementations_test.ChannelCredentialsTest", "_insecure_interop_test.InsecureInteropTest", "_logging_pool_test.LoggingPoolTest", + "_metadata_code_details_test.MetadataCodeDetailsTest", "_metadata_test.MetadataTest", "_not_found_test.NotFoundTest", "_python_plugin_test.PythonPluginTest", diff --git a/src/python/grpcio/tests/unit/_channel_connectivity_test.py b/src/python/grpcio/tests/unit/_channel_connectivity_test.py index a1575efada..ae8de523ec 100644 --- a/src/python/grpcio/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio/tests/unit/_channel_connectivity_test.py @@ -135,12 +135,12 @@ class ChannelConnectivityTest(unittest.TestCase): self.assertNotIn( grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities) self.assertNotIn( - grpc.ChannelConnectivity.FATAL_FAILURE, third_connectivities) + grpc.ChannelConnectivity.SHUTDOWN, third_connectivities) self.assertNotIn( grpc.ChannelConnectivity.TRANSIENT_FAILURE, fourth_connectivities) self.assertNotIn( - grpc.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities) + grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities) def test_reachable_then_unreachable_channel_connectivity(self): server = _server.Server((), futures.ThreadPoolExecutor(max_workers=0)) diff --git a/src/python/grpcio/tests/unit/_compression_test.py b/src/python/grpcio/tests/unit/_compression_test.py new file mode 100644 index 0000000000..6eddb6f83f --- /dev/null +++ b/src/python/grpcio/tests/unit/_compression_test.py @@ -0,0 +1,133 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +"""Tests server and client side compression.""" + +import unittest + +import grpc +from grpc import _grpcio_metadata +from grpc.framework.foundation import logging_pool + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_UNARY_UNARY = b'/test/UnaryUnary' +_STREAM_STREAM = b'/test/StreamStream' + + +def handle_unary(request, servicer_context): + servicer_context.send_initial_metadata([ + ('grpc-internal-encoding-request', 'gzip')]) + return request + + +def handle_stream(request_iterator, servicer_context): + # TODO(issue:#6891) We should be able to remove this loop, + # and replace with return; yield + servicer_context.send_initial_metadata([ + ('grpc-internal-encoding-request', 'gzip')]) + for request in request_iterator: + yield request + + +class _MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, request_streaming, response_streaming): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + self.stream_stream = lambda x, y: handle_stream(x, y) + elif not self.request_streaming and not self.response_streaming: + self.unary_unary = lambda x, y: handle_unary(x, y) + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _UNARY_UNARY: + return _MethodHandler(False, False) + elif handler_call_details.method == _STREAM_STREAM: + return _MethodHandler(True, True) + else: + return None + + +class CompressionTest(unittest.TestCase): + + def setUp(self): + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server((_GenericHandler(),), self._server_pool) + self._port = self._server.add_insecure_port('[::]:0') + self._server.start() + + def testUnary(self): + request = b'\x00' * 100 + + # Client -> server compressed through default client channel compression + # settings. Server -> client compressed via server-side metadata setting. + # TODO(https://github.com/grpc/grpc/issues/4078): replace the "1" integer + # literal with proper use of the public API. + compressed_channel = grpc.insecure_channel('localhost:%d' % self._port, + options=[('grpc.default_compression_algorithm', 1)]) + multi_callable = compressed_channel.unary_unary(_UNARY_UNARY) + response = multi_callable(request) + self.assertEqual(request, response) + + # Client -> server compressed through client metadata setting. Server -> + # client compressed via server-side metadata setting. + # TODO(https://github.com/grpc/grpc/issues/4078): replace the "0" integer + # literal with proper use of the public API. + uncompressed_channel = grpc.insecure_channel('localhost:%d' % self._port, + options=[('grpc.default_compression_algorithm', 0)]) + multi_callable = compressed_channel.unary_unary(_UNARY_UNARY) + response = multi_callable(request, metadata=[ + ('grpc-internal-encoding-request', 'gzip')]) + self.assertEqual(request, response) + + def testStreaming(self): + request = b'\x00' * 100 + + # TODO(https://github.com/grpc/grpc/issues/4078): replace the "1" integer + # literal with proper use of the public API. + compressed_channel = grpc.insecure_channel('localhost:%d' % self._port, + options=[('grpc.default_compression_algorithm', 1)]) + multi_callable = compressed_channel.stream_stream(_STREAM_STREAM) + call = multi_callable([request] * test_constants.STREAM_LENGTH) + for response in call: + self.assertEqual(request, response) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_exit_scenarios.py b/src/python/grpcio/tests/unit/_exit_scenarios.py new file mode 100644 index 0000000000..24a2faef85 --- /dev/null +++ b/src/python/grpcio/tests/unit/_exit_scenarios.py @@ -0,0 +1,249 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Defines a number of module-scope gRPC scenarios to test clean exit.""" + +import argparse +import threading +import time + +import grpc + +from tests.unit.framework.common import test_constants + +WAIT_TIME = 1000 + +REQUEST = b'request' + +UNSTARTED_SERVER = 'unstarted_server' +RUNNING_SERVER = 'running_server' +POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server' +POLL_CONNECTIVITY = 'poll_connectivity' +IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call' +IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call' +IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call' +IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call' +IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call' +IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call' +IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call' + +UNARY_UNARY = b'/test/UnaryUnary' +UNARY_STREAM = b'/test/UnaryStream' +STREAM_UNARY = b'/test/StreamUnary' +STREAM_STREAM = b'/test/StreamStream' +PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream' +PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary' +PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream' + +TEST_TO_METHOD = { + IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY, + IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM, + IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY, + IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM, + IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM, + IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY, + IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM, +} + + +def hang_unary_unary(request, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_unary_stream(request, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_partial_unary_stream(request, servicer_context): + for _ in range(test_constants.STREAM_LENGTH // 2): + yield request + time.sleep(WAIT_TIME) + + +def hang_stream_unary(request_iterator, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_partial_stream_unary(request_iterator, servicer_context): + for _ in range(test_constants.STREAM_LENGTH // 2): + next(request_iterator) + time.sleep(WAIT_TIME) + + +def hang_stream_stream(request_iterator, servicer_context): + time.sleep(WAIT_TIME) + + +def hang_partial_stream_stream(request_iterator, servicer_context): + for _ in range(test_constants.STREAM_LENGTH // 2): + yield next(request_iterator) + time.sleep(WAIT_TIME) + + +class MethodHandler(grpc.RpcMethodHandler): + + def __init__(self, request_streaming, response_streaming, partial_hang): + self.request_streaming = request_streaming + self.response_streaming = response_streaming + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = None + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + if self.request_streaming and self.response_streaming: + if partial_hang: + self.stream_stream = hang_partial_stream_stream + else: + self.stream_stream = hang_stream_stream + elif self.request_streaming: + if partial_hang: + self.stream_unary = hang_partial_stream_unary + else: + self.stream_unary = hang_stream_unary + elif self.response_streaming: + if partial_hang: + self.unary_stream = hang_partial_unary_stream + else: + self.unary_stream = hang_unary_stream + else: + self.unary_unary = hang_unary_unary + + +class GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == UNARY_UNARY: + return MethodHandler(False, False, False) + elif handler_call_details.method == UNARY_STREAM: + return MethodHandler(False, True, False) + elif handler_call_details.method == STREAM_UNARY: + return MethodHandler(True, False, False) + elif handler_call_details.method == STREAM_STREAM: + return MethodHandler(True, True, False) + elif handler_call_details.method == PARTIAL_UNARY_STREAM: + return MethodHandler(False, True, True) + elif handler_call_details.method == PARTIAL_STREAM_UNARY: + return MethodHandler(True, False, True) + elif handler_call_details.method == PARTIAL_STREAM_STREAM: + return MethodHandler(True, True, True) + else: + return None + + +# Traditional executors will not exit until all their +# current jobs complete. Because we submit jobs that will +# never finish, we don't want to block exit on these jobs. +class DaemonPool(object): + + def submit(self, fn, *args, **kwargs): + thread = threading.Thread(target=fn, args=args, kwargs=kwargs) + thread.daemon = True + thread.start() + + def shutdown(self, wait=True): + pass + + +def infinite_request_iterator(): + while True: + yield REQUEST + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('scenario', type=str) + parser.add_argument( + '--wait_for_interrupt', dest='wait_for_interrupt', action='store_true') + args = parser.parse_args() + + if args.scenario == UNSTARTED_SERVER: + server = grpc.server((), DaemonPool()) + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + elif args.scenario == RUNNING_SERVER: + server = grpc.server((), DaemonPool()) + port = server.add_insecure_port('[::]:0') + server.start() + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + elif args.scenario == POLL_CONNECTIVITY_NO_SERVER: + channel = grpc.insecure_channel('localhost:12345') + + def connectivity_callback(connectivity): + pass + + channel.subscribe(connectivity_callback, try_to_connect=True) + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + elif args.scenario == POLL_CONNECTIVITY: + server = grpc.server((), DaemonPool()) + port = server.add_insecure_port('[::]:0') + server.start() + channel = grpc.insecure_channel('localhost:%d' % port) + + def connectivity_callback(connectivity): + pass + + channel.subscribe(connectivity_callback, try_to_connect=True) + if args.wait_for_interrupt: + time.sleep(WAIT_TIME) + + else: + handler = GenericHandler() + server = grpc.server((), DaemonPool()) + port = server.add_insecure_port('[::]:0') + server.add_generic_rpc_handlers((handler,)) + server.start() + channel = grpc.insecure_channel('localhost:%d' % port) + + method = TEST_TO_METHOD[args.scenario] + + if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL: + multi_callable = channel.unary_unary(method) + future = multi_callable.future(REQUEST) + result, call = multi_callable.with_call(REQUEST) + elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or + args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL): + multi_callable = channel.unary_stream(method) + response_iterator = multi_callable(REQUEST) + for response in response_iterator: + pass + elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or + args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL): + multi_callable = channel.stream_unary(method) + future = multi_callable.future(infinite_request_iterator()) + result, call = multi_callable.with_call( + [REQUEST] * test_constants.STREAM_LENGTH) + elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or + args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL): + multi_callable = channel.stream_stream(method) + response_iterator = multi_callable(infinite_request_iterator()) + for response in response_iterator: + pass diff --git a/src/python/grpcio/tests/unit/_exit_test.py b/src/python/grpcio/tests/unit/_exit_test.py new file mode 100644 index 0000000000..b0d6af73e5 --- /dev/null +++ b/src/python/grpcio/tests/unit/_exit_test.py @@ -0,0 +1,185 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests clean exit of server/client on Python Interpreter exit/sigint. + +The tests in this module spawn a subprocess for each test case, the +test is considered successful if it doesn't hang/timeout. +""" + +import atexit +import os +import signal +import six +import subprocess +import sys +import threading +import time +import unittest + +from tests.unit import _exit_scenarios + +SCENARIO_FILE = os.path.abspath(os.path.join( + os.path.dirname(os.path.realpath(__file__)), '_exit_scenarios.py')) +INTERPRETER = sys.executable +BASE_COMMAND = [INTERPRETER, SCENARIO_FILE] +BASE_SIGTERM_COMMAND = BASE_COMMAND + ['--wait_for_interrupt'] + +INIT_TIME = 1.0 + + +processes = [] +process_lock = threading.Lock() + + +# Make sure we attempt to clean up any +# processes we may have left running +def cleanup_processes(): + with process_lock: + for process in processes: + try: + process.kill() + except Exception: + pass +atexit.register(cleanup_processes) + + +def interrupt_and_wait(process): + with process_lock: + processes.append(process) + time.sleep(INIT_TIME) + os.kill(process.pid, signal.SIGINT) + process.wait() + + +def wait(process): + with process_lock: + processes.append(process) + process.wait() + + +class ExitTest(unittest.TestCase): + + def test_unstarted_server(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.UNSTARTED_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_unstarted_server_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.UNSTARTED_SERVER], + stdout=sys.stdout) + interrupt_and_wait(process) + + def test_running_server(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.RUNNING_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_running_server_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.RUNNING_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_poll_connectivity_no_server(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_poll_connectivity_no_server_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY_NO_SERVER], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_poll_connectivity(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY], + stdout=sys.stdout, stderr=sys.stderr) + wait(process) + + def test_poll_connectivity_terminate(self): + process = subprocess.Popen( + BASE_SIGTERM_COMMAND + [_exit_scenarios.POLL_CONNECTIVITY], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_in_flight_unary_unary_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_UNARY_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_unary_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_UNARY_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_in_flight_stream_unary_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_UNARY_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_stream_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_STREAM_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_partial_unary_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + def test_in_flight_partial_stream_unary_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + @unittest.skipIf(six.PY2, 'https://github.com/grpc/grpc/issues/6999') + def test_in_flight_partial_stream_stream_call(self): + process = subprocess.Popen( + BASE_COMMAND + [_exit_scenarios.IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL], + stdout=sys.stdout, stderr=sys.stderr) + interrupt_and_wait(process) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_metadata_code_details_test.py b/src/python/grpcio/tests/unit/_metadata_code_details_test.py new file mode 100644 index 0000000000..dd74268cbf --- /dev/null +++ b/src/python/grpcio/tests/unit/_metadata_code_details_test.py @@ -0,0 +1,523 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests application-provided metadata, status code, and details.""" + +import threading +import unittest + +import grpc +from grpc.framework.foundation import logging_pool + +from tests.unit import test_common +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control + +_SERIALIZED_REQUEST = b'\x46\x47\x48' +_SERIALIZED_RESPONSE = b'\x49\x50\x51' + +_REQUEST_SERIALIZER = lambda unused_request: _SERIALIZED_REQUEST +_REQUEST_DESERIALIZER = lambda unused_serialized_request: object() +_RESPONSE_SERIALIZER = lambda unused_response: _SERIALIZED_RESPONSE +_RESPONSE_DESERIALIZER = lambda unused_serialized_resopnse: object() + +_SERVICE = b'test.TestService' +_UNARY_UNARY = b'UnaryUnary' +_UNARY_STREAM = b'UnaryStream' +_STREAM_UNARY = b'StreamUnary' +_STREAM_STREAM = b'StreamStream' + +_CLIENT_METADATA = ( + (b'client-md-key', b'client-md-key'), + (b'client-md-key-bin', b'\x00\x01') +) + +_SERVER_INITIAL_METADATA = ( + (b'server-initial-md-key', b'server-initial-md-value'), + (b'server-initial-md-key-bin', b'\x00\x02') +) + +_SERVER_TRAILING_METADATA = ( + (b'server-trailing-md-key', b'server-trailing-md-value'), + (b'server-trailing-md-key-bin', b'\x00\x03') +) + +_NON_OK_CODE = grpc.StatusCode.NOT_FOUND +_DETAILS = b'Test details!' + + +class _Servicer(object): + + def __init__(self): + self._lock = threading.Lock() + self._code = None + self._details = None + self._exception = False + self._return_none = False + self._received_client_metadata = None + + def unary_unary(self, request, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + if self._exception: + raise test_control.Defect() + else: + return None if self._return_none else object() + + def unary_stream(self, request, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + for _ in range(test_constants.STREAM_LENGTH // 2): + yield _SERIALIZED_RESPONSE + if self._exception: + raise test_control.Defect() + + def stream_unary(self, request_iterator, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the + # request iterator. + for ignored_request in request_iterator: + pass + if self._exception: + raise test_control.Defect() + else: + return None if self._return_none else _SERIALIZED_RESPONSE + + def stream_stream(self, request_iterator, context): + with self._lock: + self._received_client_metadata = context.invocation_metadata() + context.send_initial_metadata(_SERVER_INITIAL_METADATA) + context.set_trailing_metadata(_SERVER_TRAILING_METADATA) + if self._code is not None: + context.set_code(self._code) + if self._details is not None: + context.set_details(self._details) + # TODO(https://github.com/grpc/grpc/issues/6891): just ignore the + # request iterator. + for ignored_request in request_iterator: + pass + for _ in range(test_constants.STREAM_LENGTH // 3): + yield object() + if self._exception: + raise test_control.Defect() + + def set_code(self, code): + with self._lock: + self._code = code + + def set_details(self, details): + with self._lock: + self._details = details + + def set_exception(self): + with self._lock: + self._exception = True + + def set_return_none(self): + with self._lock: + self._return_none = True + + def received_client_metadata(self): + with self._lock: + return self._received_client_metadata + + +def _generic_handler(servicer): + method_handlers = { + _UNARY_UNARY: grpc.unary_unary_rpc_method_handler( + servicer.unary_unary, request_deserializer=_REQUEST_DESERIALIZER, + response_serializer=_RESPONSE_SERIALIZER), + _UNARY_STREAM: grpc.unary_stream_rpc_method_handler( + servicer.unary_stream), + _STREAM_UNARY: grpc.stream_unary_rpc_method_handler( + servicer.stream_unary), + _STREAM_STREAM: grpc.stream_stream_rpc_method_handler( + servicer.stream_stream, request_deserializer=_REQUEST_DESERIALIZER, + response_serializer=_RESPONSE_SERIALIZER), + } + return grpc.method_handlers_generic_handler(_SERVICE, method_handlers) + + +class MetadataCodeDetailsTest(unittest.TestCase): + + def setUp(self): + self._servicer = _Servicer() + self._server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server( + (_generic_handler(self._servicer),), self._server_pool) + port = self._server.add_insecure_port('[::]:0') + self._server.start() + + channel = grpc.insecure_channel('localhost:{}'.format(port)) + self._unary_unary = channel.unary_unary( + b'/'.join((b'', _SERVICE, _UNARY_UNARY,)), + request_serializer=_REQUEST_SERIALIZER, + response_deserializer=_RESPONSE_DESERIALIZER,) + self._unary_stream = channel.unary_stream( + b'/'.join((b'', _SERVICE, _UNARY_STREAM,)),) + self._stream_unary = channel.stream_unary( + b'/'.join((b'', _SERVICE, _STREAM_UNARY,)),) + self._stream_stream = channel.stream_stream( + b'/'.join((b'', _SERVICE, _STREAM_STREAM,)), + request_serializer=_REQUEST_SERIALIZER, + response_deserializer=_RESPONSE_DESERIALIZER,) + + + def testSuccessfulUnaryUnary(self): + self._servicer.set_details(_DETAILS) + + unused_response, call = self._unary_unary.with_call( + object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testSuccessfulUnaryStream(self): + self._servicer.set_details(_DETAILS) + + call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testSuccessfulStreamUnary(self): + self._servicer.set_details(_DETAILS) + + unused_response, call = self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, call.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testSuccessfulStreamStream(self): + self._servicer.set_details(_DETAILS) + + call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(grpc.StatusCode.OK, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeUnaryUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeUnaryStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError): + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(_NON_OK_CODE, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeStreamUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeStreamStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + + call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError) as exception_context: + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeExceptionUnaryUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeExceptionUnaryStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + call = self._unary_stream(_SERIALIZED_REQUEST, metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError): + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(_NON_OK_CODE, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeExceptionStreamUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeExceptionStreamStream(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_exception() + + call = self._stream_stream( + iter([object()] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + received_initial_metadata = call.initial_metadata() + with self.assertRaises(grpc.RpcError): + for _ in call: + pass + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, received_initial_metadata)) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, call.trailing_metadata())) + self.assertIs(_NON_OK_CODE, call.code()) + self.assertEqual(_DETAILS, call.details()) + + def testCustomCodeReturnNoneUnaryUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_return_none() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._unary_unary.with_call(object(), metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + def testCustomCodeReturnNoneStreamUnary(self): + self._servicer.set_code(_NON_OK_CODE) + self._servicer.set_details(_DETAILS) + self._servicer.set_return_none() + + with self.assertRaises(grpc.RpcError) as exception_context: + self._stream_unary.with_call( + iter([_SERIALIZED_REQUEST] * test_constants.STREAM_LENGTH), + metadata=_CLIENT_METADATA) + + self.assertTrue( + test_common.metadata_transmitted( + _CLIENT_METADATA, self._servicer.received_client_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_INITIAL_METADATA, + exception_context.exception.initial_metadata())) + self.assertTrue( + test_common.metadata_transmitted( + _SERVER_TRAILING_METADATA, + exception_context.exception.trailing_metadata())) + self.assertIs(_NON_OK_CODE, exception_context.exception.code()) + self.assertEqual(_DETAILS, exception_context.exception.details()) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/_metadata_test.py b/src/python/grpcio/tests/unit/_metadata_test.py index 77b3901261..2cb13f236b 100644 --- a/src/python/grpcio/tests/unit/_metadata_test.py +++ b/src/python/grpcio/tests/unit/_metadata_test.py @@ -173,8 +173,8 @@ class MetadataTest(unittest.TestCase): def testUnaryUnary(self): multi_callable = self._channel.unary_unary(_UNARY_UNARY) - unused_response, call = multi_callable( - _REQUEST, metadata=_CLIENT_METADATA, with_call=True) + unused_response, call = multi_callable.with_call( + _REQUEST, metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) self.assertTrue(test_common.metadata_transmitted( @@ -192,9 +192,9 @@ class MetadataTest(unittest.TestCase): def testStreamUnary(self): multi_callable = self._channel.stream_unary(_STREAM_UNARY) - unused_response, call = multi_callable( + unused_response, call = multi_callable.with_call( [_REQUEST] * test_constants.STREAM_LENGTH, - metadata=_CLIENT_METADATA, with_call=True) + metadata=_CLIENT_METADATA) self.assertTrue(test_common.metadata_transmitted( _SERVER_INITIAL_METADATA, call.initial_metadata())) self.assertTrue(test_common.metadata_transmitted( diff --git a/src/python/grpcio/tests/unit/_rpc_test.py b/src/python/grpcio/tests/unit/_rpc_test.py index 8407593c86..9814504edf 100644 --- a/src/python/grpcio/tests/unit/_rpc_test.py +++ b/src/python/grpcio/tests/unit/_rpc_test.py @@ -27,7 +27,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Test of gRPC Python's application-layer API.""" +"""Test of RPCs made against gRPC Python's application-layer API.""" import itertools import threading @@ -216,10 +216,9 @@ class RPCTest(unittest.TestCase): expected_response = self._handler.handle_unary_unary(request, None) multi_callable = _unary_unary_multi_callable(self._channel) - response, call = multi_callable( + response, call = multi_callable.with_call( request, metadata=( - (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),), - with_call=True) + (b'test', b'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),)) self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) @@ -266,11 +265,11 @@ class RPCTest(unittest.TestCase): request_iterator = iter(requests) multi_callable = _stream_unary_multi_callable(self._channel) - response, call = multi_callable( + response, call = multi_callable.with_call( request_iterator, metadata=( (b'test', b'SuccessfulStreamRequestBlockingUnaryResponseWithCall'), - ), with_call=True) + )) self.assertEqual(expected_response, response) self.assertIs(grpc.StatusCode.OK, call.code()) @@ -525,10 +524,9 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(self._channel) with self._control.pause(): with self.assertRaises(grpc.RpcError) as exception_context: - multi_callable( + multi_callable.with_call( request, timeout=test_constants.SHORT_TIMEOUT, - metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),), - with_call=True) + metadata=((b'test', b'ExpiredUnaryRequestBlockingUnaryResponse'),)) self.assertIsNotNone(exception_context.exception.initial_metadata()) self.assertIs( @@ -640,10 +638,9 @@ class RPCTest(unittest.TestCase): multi_callable = _unary_unary_multi_callable(self._channel) with self._control.fail(): with self.assertRaises(grpc.RpcError) as exception_context: - multi_callable( + multi_callable.with_call( request, - metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),), - with_call=True) + metadata=((b'test', b'FailedUnaryRequestBlockingUnaryResponse'),)) self.assertIs(grpc.StatusCode.UNKNOWN, exception_context.exception.code()) |