diff options
Diffstat (limited to 'src/python/grpcio')
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 42 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 26 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi | 14 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 88 | ||||
-rw-r--r-- | src/python/grpcio/grpc_core_dependencies.py | 16 |
5 files changed, 118 insertions, 68 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 526bd9e14f..6087276d51 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -905,6 +905,21 @@ class Server(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() @abc.abstractmethod + def add_shutdown_handler(self, shutdown_handler): + """Adds a handler to be called on server shutdown. + + Shutdown handlers are run on server stop() or in the event that a running + server is destroyed unexpectedly. The handlers are run in series before + the stop grace period. + + Args: + shutdown_handler: A function taking a single arg, a time in seconds + within which the handler should complete. None indicates the handler can + run for any duration. + """ + raise NotImplementedError() + + @abc.abstractmethod def start(self): """Starts this Server's service of RPCs. @@ -914,7 +929,7 @@ class Server(six.with_metaclass(abc.ABCMeta)): raise NotImplementedError() @abc.abstractmethod - def stop(self, grace): + def stop(self, grace, shutdown_handler_grace=None): """Stops this Server's service of RPCs. All calls to this method immediately stop service of new RPCs. When existing @@ -927,10 +942,18 @@ class Server(six.with_metaclass(abc.ABCMeta)): passed in a previous call will not have the effect of stopping the server later. + This method does not block for any significant length of time. If None is + passed as the grace value, existing RPCs are immediately aborted and this + method blocks until this Server is completely stopped. + Args: - grace: A duration of time in seconds to allow existing RPCs to complete - before being aborted by this Server's stopping. If None, this method - will block until the server is completely stopped. + grace: A duration of time in seconds or None. If a duration of time in + seconds, the time to allow existing RPCs to complete before being + aborted by this Server's stopping. If None, all RPCs will be aborted + immediately and this method will block until this Server is completely + stopped. + shutdown_handler_grace: A duration of time in seconds or None. This + value is passed to all shutdown handlers. Returns: A threading.Event that will be set when this Server has completely @@ -1225,7 +1248,8 @@ def secure_channel(target, credentials, options=None): credentials._credentials) -def server(thread_pool, handlers=None, options=None): +def server(thread_pool, handlers=None, options=None, exit_grace=None, + exit_shutdown_handler_grace=None): """Creates a Server with which RPCs can be serviced. Args: @@ -1238,13 +1262,19 @@ def server(thread_pool, handlers=None, options=None): returned Server is started. options: A sequence of string-value pairs according to which to configure the created server. + exit_grace: The grace period to use when terminating + running servers at interpreter exit. None indicates unspecified. + exit_shutdown_handler_grace: The shutdown handler grace to use when + terminating running servers at interpreter exit. None indicates + unspecified. Returns: A Server with which RPCs can be serviced. """ from grpc import _server return _server.Server(thread_pool, () if handlers is None else handlers, - () if options is None else options) + () if options is None else options, exit_grace, + exit_shutdown_handler_grace) ################################### __all__ ################################# diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 9560fad137..ad766186bd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -53,23 +53,23 @@ cdef extern from "grpc/byte_buffer_reader.h": cdef extern from "grpc/grpc.h": - ctypedef struct gpr_slice: - # don't worry about writing out the members of gpr_slice; we never access + ctypedef struct grpc_slice: + # don't worry about writing out the members of grpc_slice; we never access # them directly. pass - gpr_slice gpr_slice_ref(gpr_slice s) nogil - void gpr_slice_unref(gpr_slice s) nogil - gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil - gpr_slice gpr_slice_new_with_len( + grpc_slice grpc_slice_ref(grpc_slice s) nogil + void grpc_slice_unref(grpc_slice s) nogil + grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil + grpc_slice grpc_slice_new_with_len( void *p, size_t len, void (*destroy)(void *, size_t)) nogil - gpr_slice gpr_slice_malloc(size_t length) nogil - gpr_slice gpr_slice_from_copied_string(const char *source) nogil - gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil + grpc_slice grpc_slice_malloc(size_t length) nogil + grpc_slice grpc_slice_from_copied_string(const char *source) nogil + grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil # Declare functions for function-like macros (because Cython)... - void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil - size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil + void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil + size_t grpc_slice_length "GRPC_SLICE_LENGTH" (grpc_slice s) nogil ctypedef enum gpr_clock_type: GPR_CLOCK_MONOTONIC @@ -101,7 +101,7 @@ cdef extern from "grpc/grpc.h": # We don't care about the internals. pass - grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices, + grpc_byte_buffer *grpc_raw_byte_buffer_create(grpc_slice *slices, size_t nslices) nogil size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil @@ -109,7 +109,7 @@ cdef extern from "grpc/grpc.h": int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer) nogil int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, - gpr_slice *slice) nogil + grpc_slice *slice) nogil void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil ctypedef enum grpc_status_code: diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 8a4eef4d2e..cadfce6ee6 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -242,19 +242,19 @@ cdef class ByteBuffer: return cdef char *c_data = data - cdef gpr_slice data_slice + cdef grpc_slice data_slice cdef size_t data_length = len(data) with nogil: - data_slice = gpr_slice_from_copied_buffer(c_data, data_length) + data_slice = grpc_slice_from_copied_buffer(c_data, data_length) with nogil: self.c_byte_buffer = grpc_raw_byte_buffer_create( &data_slice, 1) with nogil: - gpr_slice_unref(data_slice) + grpc_slice_unref(data_slice) def bytes(self): cdef grpc_byte_buffer_reader reader - cdef gpr_slice data_slice + cdef grpc_slice data_slice cdef size_t data_slice_length cdef void *data_slice_pointer cdef bint reader_status @@ -267,11 +267,11 @@ cdef class ByteBuffer: result = bytearray() with nogil: while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = gpr_slice_start_ptr(data_slice) - data_slice_length = gpr_slice_length(data_slice) + data_slice_pointer = grpc_slice_start_ptr(data_slice) + data_slice_length = grpc_slice_length(data_slice) with gil: result += (<char *>data_slice_pointer)[:data_slice_length] - gpr_slice_unref(data_slice) + grpc_slice_unref(data_slice) with nogil: grpc_byte_buffer_reader_destroy(&reader) return bytes(result) diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 5223712dfa..d83a2e6ded 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -60,7 +60,8 @@ _CANCELLED = 'cancelled' _EMPTY_FLAGS = 0 _EMPTY_METADATA = cygrpc.Metadata(()) -_UNEXPECTED_EXIT_SERVER_GRACE = 1.0 +_DEFAULT_EXIT_GRACE = 1.0 +_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE = 5.0 def _serialized_request(request_event): @@ -595,14 +596,18 @@ class _ServerStage(enum.Enum): class _ServerState(object): - def __init__(self, completion_queue, server, generic_handlers, thread_pool): + def __init__(self, completion_queue, server, generic_handlers, thread_pool, + exit_grace, exit_shutdown_handler_grace): self.lock = threading.Lock() self.completion_queue = completion_queue self.server = server self.generic_handlers = list(generic_handlers) self.thread_pool = thread_pool + self.exit_grace = exit_grace + self.exit_shutdown_handler_grace = exit_shutdown_handler_grace self.stage = _ServerStage.STOPPED self.shutdown_events = None + self.shutdown_handlers = [] # TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields. self.rpc_states = set() @@ -672,41 +677,45 @@ def _serve(state): return -def _stop(state, grace): - with state.lock: - if state.stage is _ServerStage.STOPPED: - shutdown_event = threading.Event() - shutdown_event.set() - return shutdown_event - else: - if state.stage is _ServerStage.STARTED: - state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) +def _stop(state, grace, shutdown_handler_grace): + shutdown_event = threading.Event() + + def cancel_all_calls_after_grace(): + with state.lock: + if state.stage is _ServerStage.STOPPED: + shutdown_event.set() + return + elif state.stage is _ServerStage.STARTED: + do_shutdown = True state.stage = _ServerStage.GRACE state.shutdown_events = [] - state.due.add(_SHUTDOWN_TAG) - shutdown_event = threading.Event() + else: + do_shutdown = False state.shutdown_events.append(shutdown_event) - if grace is None: + + if do_shutdown: + # Run Shutdown Handlers without the lock + for handler in state.shutdown_handlers: + handler(shutdown_handler_grace) + with state.lock: + state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG) + state.stage = _ServerStage.GRACE + state.due.add(_SHUTDOWN_TAG) + + if not shutdown_event.wait(timeout=grace): + with state.lock: state.server.cancel_all_calls() # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. for rpc_state in state.rpc_states: with rpc_state.condition: rpc_state.client = _CANCELLED rpc_state.condition.notify_all() - else: - def cancel_all_calls_after_grace(): - shutdown_event.wait(timeout=grace) - with state.lock: - state.server.cancel_all_calls() - # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. - for rpc_state in state.rpc_states: - with rpc_state.condition: - rpc_state.client = _CANCELLED - rpc_state.condition.notify_all() - thread = threading.Thread(target=cancel_all_calls_after_grace) - thread.start() - return shutdown_event - shutdown_event.wait() + + if grace is None: + cancel_all_calls_after_grace() + else: + threading.Thread(target=cancel_all_calls_after_grace).start() + return shutdown_event @@ -716,12 +725,12 @@ def _start(state): raise ValueError('Cannot start already-started server!') state.server.start() state.stage = _ServerStage.STARTED - _request_call(state) + _request_call(state) def cleanup_server(timeout): if timeout is None: - _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait() + _stop(state, state.exit_grace, state.exit_shutdown_handler_grace).wait() else: - _stop(state, timeout).wait() + _stop(state, timeout, 0).wait() thread = _common.CleanupThread( cleanup_server, target=_serve, args=(state,)) @@ -729,12 +738,16 @@ def _start(state): class Server(grpc.Server): - def __init__(self, thread_pool, generic_handlers, options): + def __init__(self, thread_pool, generic_handlers, options, exit_grace, + exit_shutdown_handler_grace): completion_queue = cygrpc.CompletionQueue() server = cygrpc.Server(_common.channel_args(options)) server.register_completion_queue(completion_queue) self._state = _ServerState( - completion_queue, server, generic_handlers, thread_pool) + completion_queue, server, generic_handlers, thread_pool, + _DEFAULT_EXIT_GRACE if exit_grace is None else exit_grace, + _DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE if exit_shutdown_handler_grace + is None else exit_shutdown_handler_grace) def add_generic_rpc_handlers(self, generic_rpc_handlers): _add_generic_handlers(self._state, generic_rpc_handlers) @@ -745,11 +758,14 @@ class Server(grpc.Server): def add_secure_port(self, address, server_credentials): return _add_secure_port(self._state, _common.encode(address), server_credentials) + def add_shutdown_handler(self, handler): + self._state.shutdown_handlers.append(handler) + def start(self): _start(self._state) - def stop(self, grace): - return _stop(self._state, grace) + def stop(self, grace, shutdown_handler_grace=None): + return _stop(self._state, grace, shutdown_handler_grace) def __del__(self): - _stop(self._state, None) + _stop(self._state, None, None) diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 1ad423909f..d43f93b94f 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -52,9 +52,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/log_windows.c', 'src/core/lib/support/mpscq.c', 'src/core/lib/support/murmur_hash.c', - 'src/core/lib/support/percent_encoding.c', - 'src/core/lib/support/slice.c', - 'src/core/lib/support/slice_buffer.c', 'src/core/lib/support/stack_lockfree.c', 'src/core/lib/support/string.c', 'src/core/lib/support/string_posix.c', @@ -102,7 +99,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/endpoint_pair_windows.c', 'src/core/lib/iomgr/error.c', 'src/core/lib/iomgr/ev_epoll_linux.c', - 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c', 'src/core/lib/iomgr/ev_poll_posix.c', 'src/core/lib/iomgr/ev_posix.c', 'src/core/lib/iomgr/exec_ctx.c', @@ -124,6 +120,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/resolve_address_windows.c', 'src/core/lib/iomgr/resource_quota.c', 'src/core/lib/iomgr/sockaddr_utils.c', + 'src/core/lib/iomgr/socket_mutator.c', 'src/core/lib/iomgr/socket_utils_common_posix.c', 'src/core/lib/iomgr/socket_utils_linux.c', 'src/core/lib/iomgr/socket_utils_posix.c', @@ -157,6 +154,10 @@ CORE_SOURCE_FILES = [ 'src/core/lib/json/json_reader.c', 'src/core/lib/json/json_string.c', 'src/core/lib/json/json_writer.c', + 'src/core/lib/slice/percent_encoding.c', + 'src/core/lib/slice/slice.c', + 'src/core/lib/slice/slice_buffer.c', + 'src/core/lib/slice/slice_string_helpers.c', 'src/core/lib/surface/alarm.c', 'src/core/lib/surface/api_trace.c', 'src/core/lib/surface/byte_buffer.c', @@ -180,7 +181,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/transport/mdstr_hash_table.c', 'src/core/lib/transport/metadata.c', 'src/core/lib/transport/metadata_batch.c', - 'src/core/lib/transport/method_config.c', + 'src/core/lib/transport/pid_controller.c', + 'src/core/lib/transport/service_config.c', 'src/core/lib/transport/static_metadata.c', 'src/core/lib/transport/timeout_encoding.c', 'src/core/lib/transport/transport.c', @@ -224,9 +226,9 @@ CORE_SOURCE_FILES = [ 'src/core/lib/security/credentials/plugin/plugin_credentials.c', 'src/core/lib/security/credentials/ssl/ssl_credentials.c', 'src/core/lib/security/transport/client_auth_filter.c', - 'src/core/lib/security/transport/handshake.c', 'src/core/lib/security/transport/secure_endpoint.c', 'src/core/lib/security/transport/security_connector.c', + 'src/core/lib/security/transport/security_handshaker.c', 'src/core/lib/security/transport/server_auth_filter.c', 'src/core/lib/security/transport/tsi_error.c', 'src/core/lib/security/util/b64.c', @@ -235,6 +237,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/tsi/fake_transport_security.c', 'src/core/lib/tsi/ssl_transport_security.c', 'src/core/lib/tsi/transport_security.c', + 'src/core/ext/transport/chttp2/server/chttp2_server.c', 'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c', 'src/core/ext/client_channel/channel_connectivity.c', 'src/core/ext/client_channel/client_channel.c', @@ -254,6 +257,7 @@ CORE_SOURCE_FILES = [ 'src/core/ext/client_channel/subchannel.c', 'src/core/ext/client_channel/subchannel_index.c', 'src/core/ext/client_channel/uri_parser.c', + 'src/core/ext/transport/chttp2/client/chttp2_connector.c', 'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c', 'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c', 'src/core/ext/transport/chttp2/client/insecure/channel_create.c', |