aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio')
-rw-r--r--src/python/grpcio/grpc/__init__.py42
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi26
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi14
-rw-r--r--src/python/grpcio/grpc/_server.py88
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py16
5 files changed, 118 insertions, 68 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 526bd9e14f..6087276d51 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -905,6 +905,21 @@ class Server(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError()
@abc.abstractmethod
+ def add_shutdown_handler(self, shutdown_handler):
+ """Adds a handler to be called on server shutdown.
+
+ Shutdown handlers are run on server stop() or in the event that a running
+ server is destroyed unexpectedly. The handlers are run in series before
+ the stop grace period.
+
+ Args:
+ shutdown_handler: A function taking a single arg, a time in seconds
+ within which the handler should complete. None indicates the handler can
+ run for any duration.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
def start(self):
"""Starts this Server's service of RPCs.
@@ -914,7 +929,7 @@ class Server(six.with_metaclass(abc.ABCMeta)):
raise NotImplementedError()
@abc.abstractmethod
- def stop(self, grace):
+ def stop(self, grace, shutdown_handler_grace=None):
"""Stops this Server's service of RPCs.
All calls to this method immediately stop service of new RPCs. When existing
@@ -927,10 +942,18 @@ class Server(six.with_metaclass(abc.ABCMeta)):
passed in a previous call will not have the effect of stopping the server
later.
+ This method does not block for any significant length of time. If None is
+ passed as the grace value, existing RPCs are immediately aborted and this
+ method blocks until this Server is completely stopped.
+
Args:
- grace: A duration of time in seconds to allow existing RPCs to complete
- before being aborted by this Server's stopping. If None, this method
- will block until the server is completely stopped.
+ grace: A duration of time in seconds or None. If a duration of time in
+ seconds, the time to allow existing RPCs to complete before being
+ aborted by this Server's stopping. If None, all RPCs will be aborted
+ immediately and this method will block until this Server is completely
+ stopped.
+ shutdown_handler_grace: A duration of time in seconds or None. This
+ value is passed to all shutdown handlers.
Returns:
A threading.Event that will be set when this Server has completely
@@ -1225,7 +1248,8 @@ def secure_channel(target, credentials, options=None):
credentials._credentials)
-def server(thread_pool, handlers=None, options=None):
+def server(thread_pool, handlers=None, options=None, exit_grace=None,
+ exit_shutdown_handler_grace=None):
"""Creates a Server with which RPCs can be serviced.
Args:
@@ -1238,13 +1262,19 @@ def server(thread_pool, handlers=None, options=None):
returned Server is started.
options: A sequence of string-value pairs according to which to configure
the created server.
+ exit_grace: The grace period to use when terminating
+ running servers at interpreter exit. None indicates unspecified.
+ exit_shutdown_handler_grace: The shutdown handler grace to use when
+ terminating running servers at interpreter exit. None indicates
+ unspecified.
Returns:
A Server with which RPCs can be serviced.
"""
from grpc import _server
return _server.Server(thread_pool, () if handlers is None else handlers,
- () if options is None else options)
+ () if options is None else options, exit_grace,
+ exit_shutdown_handler_grace)
################################### __all__ #################################
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 9560fad137..ad766186bd 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -53,23 +53,23 @@ cdef extern from "grpc/byte_buffer_reader.h":
cdef extern from "grpc/grpc.h":
- ctypedef struct gpr_slice:
- # don't worry about writing out the members of gpr_slice; we never access
+ ctypedef struct grpc_slice:
+ # don't worry about writing out the members of grpc_slice; we never access
# them directly.
pass
- gpr_slice gpr_slice_ref(gpr_slice s) nogil
- void gpr_slice_unref(gpr_slice s) nogil
- gpr_slice gpr_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
- gpr_slice gpr_slice_new_with_len(
+ grpc_slice grpc_slice_ref(grpc_slice s) nogil
+ void grpc_slice_unref(grpc_slice s) nogil
+ grpc_slice grpc_slice_new(void *p, size_t len, void (*destroy)(void *)) nogil
+ grpc_slice grpc_slice_new_with_len(
void *p, size_t len, void (*destroy)(void *, size_t)) nogil
- gpr_slice gpr_slice_malloc(size_t length) nogil
- gpr_slice gpr_slice_from_copied_string(const char *source) nogil
- gpr_slice gpr_slice_from_copied_buffer(const char *source, size_t len) nogil
+ grpc_slice grpc_slice_malloc(size_t length) nogil
+ grpc_slice grpc_slice_from_copied_string(const char *source) nogil
+ grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil
# Declare functions for function-like macros (because Cython)...
- void *gpr_slice_start_ptr "GPR_SLICE_START_PTR" (gpr_slice s) nogil
- size_t gpr_slice_length "GPR_SLICE_LENGTH" (gpr_slice s) nogil
+ void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil
+ size_t grpc_slice_length "GRPC_SLICE_LENGTH" (grpc_slice s) nogil
ctypedef enum gpr_clock_type:
GPR_CLOCK_MONOTONIC
@@ -101,7 +101,7 @@ cdef extern from "grpc/grpc.h":
# We don't care about the internals.
pass
- grpc_byte_buffer *grpc_raw_byte_buffer_create(gpr_slice *slices,
+ grpc_byte_buffer *grpc_raw_byte_buffer_create(grpc_slice *slices,
size_t nslices) nogil
size_t grpc_byte_buffer_length(grpc_byte_buffer *bb) nogil
void grpc_byte_buffer_destroy(grpc_byte_buffer *byte_buffer) nogil
@@ -109,7 +109,7 @@ cdef extern from "grpc/grpc.h":
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
grpc_byte_buffer *buffer) nogil
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
- gpr_slice *slice) nogil
+ grpc_slice *slice) nogil
void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) nogil
ctypedef enum grpc_status_code:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 8a4eef4d2e..cadfce6ee6 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -242,19 +242,19 @@ cdef class ByteBuffer:
return
cdef char *c_data = data
- cdef gpr_slice data_slice
+ cdef grpc_slice data_slice
cdef size_t data_length = len(data)
with nogil:
- data_slice = gpr_slice_from_copied_buffer(c_data, data_length)
+ data_slice = grpc_slice_from_copied_buffer(c_data, data_length)
with nogil:
self.c_byte_buffer = grpc_raw_byte_buffer_create(
&data_slice, 1)
with nogil:
- gpr_slice_unref(data_slice)
+ grpc_slice_unref(data_slice)
def bytes(self):
cdef grpc_byte_buffer_reader reader
- cdef gpr_slice data_slice
+ cdef grpc_slice data_slice
cdef size_t data_slice_length
cdef void *data_slice_pointer
cdef bint reader_status
@@ -267,11 +267,11 @@ cdef class ByteBuffer:
result = bytearray()
with nogil:
while grpc_byte_buffer_reader_next(&reader, &data_slice):
- data_slice_pointer = gpr_slice_start_ptr(data_slice)
- data_slice_length = gpr_slice_length(data_slice)
+ data_slice_pointer = grpc_slice_start_ptr(data_slice)
+ data_slice_length = grpc_slice_length(data_slice)
with gil:
result += (<char *>data_slice_pointer)[:data_slice_length]
- gpr_slice_unref(data_slice)
+ grpc_slice_unref(data_slice)
with nogil:
grpc_byte_buffer_reader_destroy(&reader)
return bytes(result)
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 5223712dfa..d83a2e6ded 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -60,7 +60,8 @@ _CANCELLED = 'cancelled'
_EMPTY_FLAGS = 0
_EMPTY_METADATA = cygrpc.Metadata(())
-_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
+_DEFAULT_EXIT_GRACE = 1.0
+_DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE = 5.0
def _serialized_request(request_event):
@@ -595,14 +596,18 @@ class _ServerStage(enum.Enum):
class _ServerState(object):
- def __init__(self, completion_queue, server, generic_handlers, thread_pool):
+ def __init__(self, completion_queue, server, generic_handlers, thread_pool,
+ exit_grace, exit_shutdown_handler_grace):
self.lock = threading.Lock()
self.completion_queue = completion_queue
self.server = server
self.generic_handlers = list(generic_handlers)
self.thread_pool = thread_pool
+ self.exit_grace = exit_grace
+ self.exit_shutdown_handler_grace = exit_shutdown_handler_grace
self.stage = _ServerStage.STOPPED
self.shutdown_events = None
+ self.shutdown_handlers = []
# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
self.rpc_states = set()
@@ -672,41 +677,45 @@ def _serve(state):
return
-def _stop(state, grace):
- with state.lock:
- if state.stage is _ServerStage.STOPPED:
- shutdown_event = threading.Event()
- shutdown_event.set()
- return shutdown_event
- else:
- if state.stage is _ServerStage.STARTED:
- state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+def _stop(state, grace, shutdown_handler_grace):
+ shutdown_event = threading.Event()
+
+ def cancel_all_calls_after_grace():
+ with state.lock:
+ if state.stage is _ServerStage.STOPPED:
+ shutdown_event.set()
+ return
+ elif state.stage is _ServerStage.STARTED:
+ do_shutdown = True
state.stage = _ServerStage.GRACE
state.shutdown_events = []
- state.due.add(_SHUTDOWN_TAG)
- shutdown_event = threading.Event()
+ else:
+ do_shutdown = False
state.shutdown_events.append(shutdown_event)
- if grace is None:
+
+ if do_shutdown:
+ # Run Shutdown Handlers without the lock
+ for handler in state.shutdown_handlers:
+ handler(shutdown_handler_grace)
+ with state.lock:
+ state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+ state.stage = _ServerStage.GRACE
+ state.due.add(_SHUTDOWN_TAG)
+
+ if not shutdown_event.wait(timeout=grace):
+ with state.lock:
state.server.cancel_all_calls()
# TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
for rpc_state in state.rpc_states:
with rpc_state.condition:
rpc_state.client = _CANCELLED
rpc_state.condition.notify_all()
- else:
- def cancel_all_calls_after_grace():
- shutdown_event.wait(timeout=grace)
- with state.lock:
- state.server.cancel_all_calls()
- # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
- for rpc_state in state.rpc_states:
- with rpc_state.condition:
- rpc_state.client = _CANCELLED
- rpc_state.condition.notify_all()
- thread = threading.Thread(target=cancel_all_calls_after_grace)
- thread.start()
- return shutdown_event
- shutdown_event.wait()
+
+ if grace is None:
+ cancel_all_calls_after_grace()
+ else:
+ threading.Thread(target=cancel_all_calls_after_grace).start()
+
return shutdown_event
@@ -716,12 +725,12 @@ def _start(state):
raise ValueError('Cannot start already-started server!')
state.server.start()
state.stage = _ServerStage.STARTED
- _request_call(state)
+ _request_call(state)
def cleanup_server(timeout):
if timeout is None:
- _stop(state, _UNEXPECTED_EXIT_SERVER_GRACE).wait()
+ _stop(state, state.exit_grace, state.exit_shutdown_handler_grace).wait()
else:
- _stop(state, timeout).wait()
+ _stop(state, timeout, 0).wait()
thread = _common.CleanupThread(
cleanup_server, target=_serve, args=(state,))
@@ -729,12 +738,16 @@ def _start(state):
class Server(grpc.Server):
- def __init__(self, thread_pool, generic_handlers, options):
+ def __init__(self, thread_pool, generic_handlers, options, exit_grace,
+ exit_shutdown_handler_grace):
completion_queue = cygrpc.CompletionQueue()
server = cygrpc.Server(_common.channel_args(options))
server.register_completion_queue(completion_queue)
self._state = _ServerState(
- completion_queue, server, generic_handlers, thread_pool)
+ completion_queue, server, generic_handlers, thread_pool,
+ _DEFAULT_EXIT_GRACE if exit_grace is None else exit_grace,
+ _DEFAULT_EXIT_SHUTDOWN_HANDLER_GRACE if exit_shutdown_handler_grace
+ is None else exit_shutdown_handler_grace)
def add_generic_rpc_handlers(self, generic_rpc_handlers):
_add_generic_handlers(self._state, generic_rpc_handlers)
@@ -745,11 +758,14 @@ class Server(grpc.Server):
def add_secure_port(self, address, server_credentials):
return _add_secure_port(self._state, _common.encode(address), server_credentials)
+ def add_shutdown_handler(self, handler):
+ self._state.shutdown_handlers.append(handler)
+
def start(self):
_start(self._state)
- def stop(self, grace):
- return _stop(self._state, grace)
+ def stop(self, grace, shutdown_handler_grace=None):
+ return _stop(self._state, grace, shutdown_handler_grace)
def __del__(self):
- _stop(self._state, None)
+ _stop(self._state, None, None)
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 1ad423909f..d43f93b94f 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -52,9 +52,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/support/log_windows.c',
'src/core/lib/support/mpscq.c',
'src/core/lib/support/murmur_hash.c',
- 'src/core/lib/support/percent_encoding.c',
- 'src/core/lib/support/slice.c',
- 'src/core/lib/support/slice_buffer.c',
'src/core/lib/support/stack_lockfree.c',
'src/core/lib/support/string.c',
'src/core/lib/support/string_posix.c',
@@ -102,7 +99,6 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/error.c',
'src/core/lib/iomgr/ev_epoll_linux.c',
- 'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
@@ -124,6 +120,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/resolve_address_windows.c',
'src/core/lib/iomgr/resource_quota.c',
'src/core/lib/iomgr/sockaddr_utils.c',
+ 'src/core/lib/iomgr/socket_mutator.c',
'src/core/lib/iomgr/socket_utils_common_posix.c',
'src/core/lib/iomgr/socket_utils_linux.c',
'src/core/lib/iomgr/socket_utils_posix.c',
@@ -157,6 +154,10 @@ CORE_SOURCE_FILES = [
'src/core/lib/json/json_reader.c',
'src/core/lib/json/json_string.c',
'src/core/lib/json/json_writer.c',
+ 'src/core/lib/slice/percent_encoding.c',
+ 'src/core/lib/slice/slice.c',
+ 'src/core/lib/slice/slice_buffer.c',
+ 'src/core/lib/slice/slice_string_helpers.c',
'src/core/lib/surface/alarm.c',
'src/core/lib/surface/api_trace.c',
'src/core/lib/surface/byte_buffer.c',
@@ -180,7 +181,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/transport/mdstr_hash_table.c',
'src/core/lib/transport/metadata.c',
'src/core/lib/transport/metadata_batch.c',
- 'src/core/lib/transport/method_config.c',
+ 'src/core/lib/transport/pid_controller.c',
+ 'src/core/lib/transport/service_config.c',
'src/core/lib/transport/static_metadata.c',
'src/core/lib/transport/timeout_encoding.c',
'src/core/lib/transport/transport.c',
@@ -224,9 +226,9 @@ CORE_SOURCE_FILES = [
'src/core/lib/security/credentials/plugin/plugin_credentials.c',
'src/core/lib/security/credentials/ssl/ssl_credentials.c',
'src/core/lib/security/transport/client_auth_filter.c',
- 'src/core/lib/security/transport/handshake.c',
'src/core/lib/security/transport/secure_endpoint.c',
'src/core/lib/security/transport/security_connector.c',
+ 'src/core/lib/security/transport/security_handshaker.c',
'src/core/lib/security/transport/server_auth_filter.c',
'src/core/lib/security/transport/tsi_error.c',
'src/core/lib/security/util/b64.c',
@@ -235,6 +237,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/tsi/fake_transport_security.c',
'src/core/lib/tsi/ssl_transport_security.c',
'src/core/lib/tsi/transport_security.c',
+ 'src/core/ext/transport/chttp2/server/chttp2_server.c',
'src/core/ext/transport/chttp2/client/secure/secure_channel_create.c',
'src/core/ext/client_channel/channel_connectivity.c',
'src/core/ext/client_channel/client_channel.c',
@@ -254,6 +257,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_channel/subchannel.c',
'src/core/ext/client_channel/subchannel_index.c',
'src/core/ext/client_channel/uri_parser.c',
+ 'src/core/ext/transport/chttp2/client/chttp2_connector.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',