diff options
Diffstat (limited to 'src/python/grpcio/grpc')
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 41 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_auth.py | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_channel.py | 54 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi | 13 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi | 37 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi | 9 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi | 1 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 19 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_grpcio_metadata.py | 2 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 110 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_utilities.py | 3 |
12 files changed, 190 insertions, 103 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 6022fc3ef2..70d7618e05 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -23,6 +23,11 @@ from grpc._cython import cygrpc as _cygrpc logging.getLogger(__name__).addHandler(logging.NullHandler()) +try: + from ._grpcio_metadata import __version__ +except ImportError: + __version__ = "dev0" + ############################## Future Interface ############################### @@ -266,6 +271,22 @@ class StatusCode(enum.Enum): UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated') +############################# gRPC Status ################################ + + +class Status(six.with_metaclass(abc.ABCMeta)): + """Describes the status of an RPC. + + This is an EXPERIMENTAL API. + + Attributes: + code: A StatusCode object to be sent to the client. + details: An ASCII-encodable string to be sent to the client upon + termination of the RPC. + trailing_metadata: The trailing :term:`metadata` in the RPC. + """ + + ############################# gRPC Exceptions ################################ @@ -1119,6 +1140,25 @@ class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)): raise NotImplementedError() @abc.abstractmethod + def abort_with_status(self, status): + """Raises an exception to terminate the RPC with a non-OK status. + + The status passed as argument will supercede any existing status code, + status message and trailing metadata. + + This is an EXPERIMENTAL API. + + Args: + status: A grpc.Status object. The status code in it must not be + StatusCode.OK. + + Raises: + Exception: An exception is always raised to signal the abortion the + RPC to the gRPC runtime. + """ + raise NotImplementedError() + + @abc.abstractmethod def set_code(self, code): """Sets the value to be used as status code upon RPC completion. @@ -1747,6 +1787,7 @@ __all__ = ( 'Future', 'ChannelConnectivity', 'StatusCode', + 'Status', 'RpcError', 'RpcContext', 'Call', diff --git a/src/python/grpcio/grpc/_auth.py b/src/python/grpcio/grpc/_auth.py index c17824563d..9b990f490d 100644 --- a/src/python/grpcio/grpc/_auth.py +++ b/src/python/grpcio/grpc/_auth.py @@ -46,7 +46,7 @@ class GoogleCallCredentials(grpc.AuthMetadataPlugin): # Hack to determine if these are JWT creds and we need to pass # additional_claims when getting a token - self._is_jwt = 'additional_claims' in inspect.getargspec( + self._is_jwt = 'additional_claims' in inspect.getargspec( # pylint: disable=deprecated-method credentials.get_access_token).args def __call__(self, context, callback): diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 35fa82d56b..8051fb306c 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -499,6 +499,7 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def _prepare(self, request, timeout, metadata, wait_for_ready): deadline, serialized_request, rendezvous = _start_unary_request( @@ -525,17 +526,18 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata, wait_for_ready) if state is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: call = self._channel.segregated_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, (( operations, None, - ),)) + ),), self._context) event = call.next_event() _handle_event(event, state, self._response_deserializer) - return state, call, + return state, call def __call__(self, request, @@ -566,13 +568,14 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): state, operations, deadline, rendezvous = self._prepare( request, timeout, metadata, wait_for_ready) if state is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - (operations,), event_handler) + (operations,), event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -587,6 +590,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def __call__(self, request, @@ -599,7 +603,7 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) if serialized_request is None: - raise rendezvous + raise rendezvous # pylint: disable-msg=raising-bad-type else: state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) operationses = ( @@ -615,9 +619,10 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): ) event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, + self._method, None, deadline, metadata, None if credentials is None else credentials._credentials, - operationses, event_handler) + operationses, event_handler, self._context) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -632,6 +637,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def _blocking(self, request_iterator, timeout, metadata, credentials, wait_for_ready): @@ -640,10 +646,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) call = self._channel.segregated_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses_and_tags( - metadata, initial_metadata_flags)) + metadata, initial_metadata_flags), self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, None) while True: @@ -653,7 +660,7 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): state.condition.notify_all() if not state.due: break - return state, call, + return state, call def __call__(self, request_iterator, @@ -687,10 +694,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( wait_for_ready) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, _stream_unary_invocation_operationses( - metadata, initial_metadata_flags), event_handler) + metadata, initial_metadata_flags), event_handler, self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -706,6 +714,7 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): self._method = method self._request_serializer = request_serializer self._response_deserializer = response_deserializer + self._context = cygrpc.build_context() def __call__(self, request_iterator, @@ -727,9 +736,10 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): ) event_handler = _event_handler(state, self._response_deserializer) call = self._managed_call( - 0, self._method, None, deadline, metadata, None + cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method, + None, deadline, metadata, None if credentials is None else credentials._credentials, operationses, - event_handler) + event_handler, self._context) _consume_request_iterator(request_iterator, state, call, self._request_serializer, event_handler) return _Rendezvous(state, call, self._response_deserializer, deadline) @@ -745,10 +755,10 @@ class _InitialMetadataFlags(int): def with_wait_for_ready(self, wait_for_ready): if wait_for_ready is not None: if wait_for_ready: - self = self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ + return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) elif not wait_for_ready: - self = self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ + return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \ cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set) return self @@ -789,7 +799,7 @@ def _channel_managed_call_management(state): # pylint: disable=too-many-arguments def create(flags, method, host, deadline, metadata, credentials, - operationses, event_handler): + operationses, event_handler, context): """Creates a cygrpc.IntegratedCall. Args: @@ -804,7 +814,7 @@ def _channel_managed_call_management(state): started on the call. event_handler: A behavior to call to handle the events resultant from the operations on the call. - + context: Context object for distributed tracing. Returns: A cygrpc.IntegratedCall with which to conduct an RPC. """ @@ -815,7 +825,7 @@ def _channel_managed_call_management(state): with state.lock: call = state.channel.integrated_call(flags, method, host, deadline, metadata, credentials, - operationses_and_tags) + operationses_and_tags, context) if state.managed_calls == 0: state.managed_calls = 1 _run_channel_spin_thread(state) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi index e0e068e452..01b8237484 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi @@ -28,19 +28,22 @@ cdef tuple _wrap_grpc_arg(grpc_arg arg) cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg) -cdef class _ArgumentProcessor: +cdef class _ChannelArg: cdef grpc_arg c_argument cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except * -cdef class _ArgumentsProcessor: +cdef class _ChannelArgs: cdef readonly tuple _arguments - cdef list _argument_processors + cdef list _channel_args cdef readonly list _references cdef grpc_channel_args _c_arguments - cdef grpc_channel_args *c(self, grpc_arg_pointer_vtable *vtable) except * - cdef un_c(self) + cdef void _c(self, grpc_arg_pointer_vtable *vtable) except * + cdef grpc_channel_args *c_args(self) except * + + @staticmethod + cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi index b7a4277ff6..bf12871015 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi @@ -50,7 +50,7 @@ cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg): return wrapped.arg -cdef class _ArgumentProcessor: +cdef class _ChannelArg: cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except *: key, value = argument @@ -82,27 +82,34 @@ cdef class _ArgumentProcessor: 'Expected int, bytes, or behavior, got {}'.format(type(value))) -cdef class _ArgumentsProcessor: +cdef class _ChannelArgs: def __cinit__(self, arguments): self._arguments = () if arguments is None else tuple(arguments) - self._argument_processors = [] + self._channel_args = [] self._references = [] + self._c_arguments.arguments = NULL - cdef grpc_channel_args *c(self, grpc_arg_pointer_vtable *vtable) except *: + cdef void _c(self, grpc_arg_pointer_vtable *vtable) except *: self._c_arguments.arguments_length = len(self._arguments) - if self._c_arguments.arguments_length == 0: - return NULL - else: + if self._c_arguments.arguments_length != 0: self._c_arguments.arguments = <grpc_arg *>gpr_malloc( self._c_arguments.arguments_length * sizeof(grpc_arg)) for index, argument in enumerate(self._arguments): - argument_processor = _ArgumentProcessor() - argument_processor.c(argument, vtable, self._references) - self._c_arguments.arguments[index] = argument_processor.c_argument - self._argument_processors.append(argument_processor) - return &self._c_arguments - - cdef un_c(self): - if self._arguments: + channel_arg = _ChannelArg() + channel_arg.c(argument, vtable, self._references) + self._c_arguments.arguments[index] = channel_arg.c_argument + self._channel_args.append(channel_arg) + + cdef grpc_channel_args *c_args(self) except *: + return &self._c_arguments + + def __dealloc__(self): + if self._c_arguments.arguments != NULL: gpr_free(self._c_arguments.arguments) + + @staticmethod + cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable): + cdef _ChannelArgs channel_args = _ChannelArgs(arguments) + channel_args._c(vtable) + return channel_args diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 135d224095..70d4abb730 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -423,16 +423,15 @@ cdef class Channel: self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer - cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( - arguments) - cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) + cdef _ChannelArgs channel_args = _ChannelArgs.from_args( + arguments, &self._vtable) if channel_credentials is None: self._state.c_channel = grpc_insecure_channel_create( - <char *>target, c_arguments, NULL) + <char *>target, channel_args.c_args(), NULL) else: c_channel_credentials = channel_credentials.c() self._state.c_channel = grpc_secure_channel_create( - c_channel_credentials, <char *>target, c_arguments, NULL) + c_channel_credentials, <char *>target, channel_args.c_args(), NULL) grpc_channel_credentials_release(c_channel_credentials) self._state.c_call_completion_queue = ( grpc_completion_queue_create_for_next(NULL)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi index 141116df5d..3c33b46dbb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi @@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline): cdef _interpret_event(grpc_event c_event): cdef _Tag tag if c_event.type == GRPC_QUEUE_TIMEOUT: - # NOTE(nathaniel): For now we coopt ConnectivityEvent here. + # TODO(ericgribkoff) Do not coopt ConnectivityEvent here. return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None) elif c_event.type == GRPC_QUEUE_SHUTDOWN: # NOTE(nathaniel): For now we coopt ConnectivityEvent here. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi index 52cfccb677..4a6fbe0f96 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi @@ -16,7 +16,6 @@ cdef class Server: cdef grpc_arg_pointer_vtable _vtable - cdef readonly _ArgumentsProcessor _arguments_processor cdef grpc_server *c_server cdef bint is_started # start has been called cdef bint is_shutting_down # shutdown has been called diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index ce701724fd..d72648a35d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -29,11 +29,9 @@ cdef class Server: self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer - cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( - arguments) - cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) - self.c_server = grpc_server_create(c_arguments, NULL) - arguments_processor.un_c() + cdef _ChannelArgs channel_args = _ChannelArgs.from_args( + arguments, &self._vtable) + self.c_server = grpc_server_create(channel_args.c_args(), NULL) self.references.append(arguments) self.is_started = False self.is_shutting_down = False @@ -128,7 +126,10 @@ cdef class Server: with nogil: grpc_server_cancel_all_calls(self.c_server) - def __dealloc__(self): + # TODO(https://github.com/grpc/grpc/issues/17515) Determine what, if any, + # portion of this is safe to call from __dealloc__, and potentially remove + # backup_shutdown_queue. + def destroy(self): if self.c_server != NULL: if not self.is_started: pass @@ -146,4 +147,8 @@ cdef class Server: while not self.is_shutdown: time.sleep(0) grpc_server_destroy(self.c_server) - grpc_shutdown() + self.c_server = NULL + + def __dealloc(self): + if self.c_server == NULL: + grpc_shutdown() diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 7a9f173947..dd9d436c3f 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.18.0.dev0""" +__version__ = """1.19.0.dev0""" diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index e939f615df..eb750ef1a8 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -48,7 +48,7 @@ _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 -_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 +_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0 def _serialized_request(request_event): @@ -291,6 +291,10 @@ class _Context(grpc.ServicerContext): self._state.abortion = Exception() raise self._state.abortion + def abort_with_status(self, status): + self._state.trailing_metadata = status.trailing_metadata + self.abort(status.code, status.details) + def set_code(self, code): with self._state.condition: self._state.code = code @@ -672,6 +676,9 @@ class _ServerState(object): self.rpc_states = set() self.due = set() + # A "volatile" flag to interrupt the daemon serving thread + self.server_deallocated = False + def _add_generic_handlers(state, generic_handlers): with state.lock: @@ -698,6 +705,7 @@ def _request_call(state): # TODO(https://github.com/grpc/grpc/issues/6597): delete this function. def _stop_serving(state): if not state.rpc_states and not state.due: + state.server.destroy() for shutdown_event in state.shutdown_events: shutdown_event.set() state.stage = _ServerStage.STOPPED @@ -711,49 +719,69 @@ def _on_call_completed(state): state.active_rpc_count -= 1 -def _serve(state): - while True: - event = state.completion_queue.poll() - if event.tag is _SHUTDOWN_TAG: +def _process_event_and_continue(state, event): + should_continue = True + if event.tag is _SHUTDOWN_TAG: + with state.lock: + state.due.remove(_SHUTDOWN_TAG) + if _stop_serving(state): + should_continue = False + elif event.tag is _REQUEST_CALL_TAG: + with state.lock: + state.due.remove(_REQUEST_CALL_TAG) + concurrency_exceeded = ( + state.maximum_concurrent_rpcs is not None and + state.active_rpc_count >= state.maximum_concurrent_rpcs) + rpc_state, rpc_future = _handle_call( + event, state.generic_handlers, state.interceptor_pipeline, + state.thread_pool, concurrency_exceeded) + if rpc_state is not None: + state.rpc_states.add(rpc_state) + if rpc_future is not None: + state.active_rpc_count += 1 + rpc_future.add_done_callback( + lambda unused_future: _on_call_completed(state)) + if state.stage is _ServerStage.STARTED: + _request_call(state) + elif _stop_serving(state): + should_continue = False + else: + rpc_state, callbacks = event.tag(event) + for callback in callbacks: + callable_util.call_logging_exceptions(callback, + 'Exception calling callback!') + if rpc_state is not None: with state.lock: - state.due.remove(_SHUTDOWN_TAG) + state.rpc_states.remove(rpc_state) if _stop_serving(state): - return - elif event.tag is _REQUEST_CALL_TAG: - with state.lock: - state.due.remove(_REQUEST_CALL_TAG) - concurrency_exceeded = ( - state.maximum_concurrent_rpcs is not None and - state.active_rpc_count >= state.maximum_concurrent_rpcs) - rpc_state, rpc_future = _handle_call( - event, state.generic_handlers, state.interceptor_pipeline, - state.thread_pool, concurrency_exceeded) - if rpc_state is not None: - state.rpc_states.add(rpc_state) - if rpc_future is not None: - state.active_rpc_count += 1 - rpc_future.add_done_callback( - lambda unused_future: _on_call_completed(state)) - if state.stage is _ServerStage.STARTED: - _request_call(state) - elif _stop_serving(state): - return - else: - rpc_state, callbacks = event.tag(event) - for callback in callbacks: - callable_util.call_logging_exceptions( - callback, 'Exception calling callback!') - if rpc_state is not None: - with state.lock: - state.rpc_states.remove(rpc_state) - if _stop_serving(state): - return + should_continue = False + return should_continue + + +def _serve(state): + while True: + timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S + event = state.completion_queue.poll(timeout) + if state.server_deallocated: + _begin_shutdown_once(state) + if event.completion_type != cygrpc.CompletionType.queue_timeout: + if not _process_event_and_continue(state, event): + return # We want to force the deletion of the previous event # ~before~ we poll again; if the event has a reference # to a shutdown Call object, this can induce spinlock. event = None +def _begin_shutdown_once(state): + with state.lock: + if state.stage is _ServerStage.STARTED: + state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) + state.stage = _ServerStage.GRACE + state.shutdown_events = [] + state.due.add(_SHUTDOWN_TAG) + + def _stop(state, grace): with state.lock: if state.stage is _ServerStage.STOPPED: @@ -761,11 +789,7 @@ def _stop(state, grace): shutdown_event.set() return shutdown_event else: - if state.stage is _ServerStage.STARTED: - state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) - state.stage = _ServerStage.GRACE - state.shutdown_events = [] - state.due.add(_SHUTDOWN_TAG) + _begin_shutdown_once(state) shutdown_event = threading.Event() state.shutdown_events.append(shutdown_event) if grace is None: @@ -836,7 +860,9 @@ class _Server(grpc.Server): return _stop(self._state, grace) def __del__(self): - _stop(self._state, None) + # We can not grab a lock in __del__(), so set a flag to signal the + # serving daemon thread (if it exists) to initiate shutdown. + self._state.server_deallocated = True def create_server(thread_pool, generic_rpc_handlers, interceptors, options, diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index d90b34bcbd..2938a38b44 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -132,15 +132,12 @@ class _ChannelReadyFuture(grpc.Future): def result(self, timeout=None): self._block(timeout) - return None def exception(self, timeout=None): self._block(timeout) - return None def traceback(self, timeout=None): self._block(timeout) - return None def add_done_callback(self, fn): with self._condition: |