aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python
diff options
context:
space:
mode:
authorGravatar Eric Gribkoff <ericgribkoff@google.com>2018-12-07 09:23:54 -0800
committerGravatar Eric Gribkoff <ericgribkoff@google.com>2018-12-17 08:53:09 -0800
commit082b63e09592502b0900a5dff214dc70efb9de56 (patch)
treeb5cf6239a84cabbd08e831f63468f3854d580ff5 /src/python
parentb250f34b1225cde1bb19496c5cc5d66e40111052 (diff)
Refactor server deallocation
Diffstat (limited to 'src/python')
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi2
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi10
-rw-r--r--src/python/grpcio/grpc/_server.py106
-rw-r--r--src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py2
4 files changed, 73 insertions, 47 deletions
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 141116df5d..3c33b46dbb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -49,7 +49,7 @@ cdef grpc_event _next(grpc_completion_queue *c_completion_queue, deadline):
cdef _interpret_event(grpc_event c_event):
cdef _Tag tag
if c_event.type == GRPC_QUEUE_TIMEOUT:
- # NOTE(nathaniel): For now we coopt ConnectivityEvent here.
+ # TODO(ericgribkoff) Do not coopt ConnectivityEvent here.
return None, ConnectivityEvent(GRPC_QUEUE_TIMEOUT, False, None)
elif c_event.type == GRPC_QUEUE_SHUTDOWN:
# NOTE(nathaniel): For now we coopt ConnectivityEvent here.
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index ce701724fd..1b8d6790fb 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -128,7 +128,9 @@ cdef class Server:
with nogil:
grpc_server_cancel_all_calls(self.c_server)
- def __dealloc__(self):
+ # TODO(ericgribkoff) Determine what, if any, portion of this is safe to call
+ # from __dealloc__, and potentially remove backup_shutdown_queue.
+ def destroy(self):
if self.c_server != NULL:
if not self.is_started:
pass
@@ -146,4 +148,8 @@ cdef class Server:
while not self.is_shutdown:
time.sleep(0)
grpc_server_destroy(self.c_server)
- grpc_shutdown()
+ self.c_server = NULL
+
+ def __dealloc(self):
+ if self.c_server == NULL:
+ grpc_shutdown()
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 3bbfa47da5..eb750ef1a8 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -48,7 +48,7 @@ _CANCELLED = 'cancelled'
_EMPTY_FLAGS = 0
-_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
+_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
def _serialized_request(request_event):
@@ -676,6 +676,9 @@ class _ServerState(object):
self.rpc_states = set()
self.due = set()
+ # A "volatile" flag to interrupt the daemon serving thread
+ self.server_deallocated = False
+
def _add_generic_handlers(state, generic_handlers):
with state.lock:
@@ -702,6 +705,7 @@ def _request_call(state):
# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
def _stop_serving(state):
if not state.rpc_states and not state.due:
+ state.server.destroy()
for shutdown_event in state.shutdown_events:
shutdown_event.set()
state.stage = _ServerStage.STOPPED
@@ -715,49 +719,69 @@ def _on_call_completed(state):
state.active_rpc_count -= 1
-def _serve(state):
- while True:
- event = state.completion_queue.poll()
- if event.tag is _SHUTDOWN_TAG:
+def _process_event_and_continue(state, event):
+ should_continue = True
+ if event.tag is _SHUTDOWN_TAG:
+ with state.lock:
+ state.due.remove(_SHUTDOWN_TAG)
+ if _stop_serving(state):
+ should_continue = False
+ elif event.tag is _REQUEST_CALL_TAG:
+ with state.lock:
+ state.due.remove(_REQUEST_CALL_TAG)
+ concurrency_exceeded = (
+ state.maximum_concurrent_rpcs is not None and
+ state.active_rpc_count >= state.maximum_concurrent_rpcs)
+ rpc_state, rpc_future = _handle_call(
+ event, state.generic_handlers, state.interceptor_pipeline,
+ state.thread_pool, concurrency_exceeded)
+ if rpc_state is not None:
+ state.rpc_states.add(rpc_state)
+ if rpc_future is not None:
+ state.active_rpc_count += 1
+ rpc_future.add_done_callback(
+ lambda unused_future: _on_call_completed(state))
+ if state.stage is _ServerStage.STARTED:
+ _request_call(state)
+ elif _stop_serving(state):
+ should_continue = False
+ else:
+ rpc_state, callbacks = event.tag(event)
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(callback,
+ 'Exception calling callback!')
+ if rpc_state is not None:
with state.lock:
- state.due.remove(_SHUTDOWN_TAG)
+ state.rpc_states.remove(rpc_state)
if _stop_serving(state):
- return
- elif event.tag is _REQUEST_CALL_TAG:
- with state.lock:
- state.due.remove(_REQUEST_CALL_TAG)
- concurrency_exceeded = (
- state.maximum_concurrent_rpcs is not None and
- state.active_rpc_count >= state.maximum_concurrent_rpcs)
- rpc_state, rpc_future = _handle_call(
- event, state.generic_handlers, state.interceptor_pipeline,
- state.thread_pool, concurrency_exceeded)
- if rpc_state is not None:
- state.rpc_states.add(rpc_state)
- if rpc_future is not None:
- state.active_rpc_count += 1
- rpc_future.add_done_callback(
- lambda unused_future: _on_call_completed(state))
- if state.stage is _ServerStage.STARTED:
- _request_call(state)
- elif _stop_serving(state):
- return
- else:
- rpc_state, callbacks = event.tag(event)
- for callback in callbacks:
- callable_util.call_logging_exceptions(
- callback, 'Exception calling callback!')
- if rpc_state is not None:
- with state.lock:
- state.rpc_states.remove(rpc_state)
- if _stop_serving(state):
- return
+ should_continue = False
+ return should_continue
+
+
+def _serve(state):
+ while True:
+ timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
+ event = state.completion_queue.poll(timeout)
+ if state.server_deallocated:
+ _begin_shutdown_once(state)
+ if event.completion_type != cygrpc.CompletionType.queue_timeout:
+ if not _process_event_and_continue(state, event):
+ return
# We want to force the deletion of the previous event
# ~before~ we poll again; if the event has a reference
# to a shutdown Call object, this can induce spinlock.
event = None
+def _begin_shutdown_once(state):
+ with state.lock:
+ if state.stage is _ServerStage.STARTED:
+ state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+ state.stage = _ServerStage.GRACE
+ state.shutdown_events = []
+ state.due.add(_SHUTDOWN_TAG)
+
+
def _stop(state, grace):
with state.lock:
if state.stage is _ServerStage.STOPPED:
@@ -765,11 +789,7 @@ def _stop(state, grace):
shutdown_event.set()
return shutdown_event
else:
- if state.stage is _ServerStage.STARTED:
- state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
- state.stage = _ServerStage.GRACE
- state.shutdown_events = []
- state.due.add(_SHUTDOWN_TAG)
+ _begin_shutdown_once(state)
shutdown_event = threading.Event()
state.shutdown_events.append(shutdown_event)
if grace is None:
@@ -840,7 +860,9 @@ class _Server(grpc.Server):
return _stop(self._state, grace)
def __del__(self):
- _stop(self._state, None)
+ # We can not grab a lock in __del__(), so set a flag to signal the
+ # serving daemon thread (if it exists) to initiate shutdown.
+ self._state.server_deallocated = True
def create_server(thread_pool, generic_rpc_handlers, interceptors, options,
diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
index 8ca5189522..5265911a0b 100644
--- a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
+++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
@@ -88,8 +88,6 @@ def _generate_channel_server_pairs(n):
def _close_channel_server_pairs(pairs):
for pair in pairs:
pair.server.stop(None)
- # TODO(ericgribkoff) This del should not be required
- del pair.server
pair.channel.close()