aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/grpc/_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/grpc/_server.py')
-rw-r--r--src/python/grpcio/grpc/_server.py106
1 files changed, 64 insertions, 42 deletions
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 3bbfa47da5..eb750ef1a8 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -48,7 +48,7 @@ _CANCELLED = 'cancelled'
_EMPTY_FLAGS = 0
-_UNEXPECTED_EXIT_SERVER_GRACE = 1.0
+_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
def _serialized_request(request_event):
@@ -676,6 +676,9 @@ class _ServerState(object):
self.rpc_states = set()
self.due = set()
+ # A "volatile" flag to interrupt the daemon serving thread
+ self.server_deallocated = False
+
def _add_generic_handlers(state, generic_handlers):
with state.lock:
@@ -702,6 +705,7 @@ def _request_call(state):
# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
def _stop_serving(state):
if not state.rpc_states and not state.due:
+ state.server.destroy()
for shutdown_event in state.shutdown_events:
shutdown_event.set()
state.stage = _ServerStage.STOPPED
@@ -715,49 +719,69 @@ def _on_call_completed(state):
state.active_rpc_count -= 1
-def _serve(state):
- while True:
- event = state.completion_queue.poll()
- if event.tag is _SHUTDOWN_TAG:
+def _process_event_and_continue(state, event):
+ should_continue = True
+ if event.tag is _SHUTDOWN_TAG:
+ with state.lock:
+ state.due.remove(_SHUTDOWN_TAG)
+ if _stop_serving(state):
+ should_continue = False
+ elif event.tag is _REQUEST_CALL_TAG:
+ with state.lock:
+ state.due.remove(_REQUEST_CALL_TAG)
+ concurrency_exceeded = (
+ state.maximum_concurrent_rpcs is not None and
+ state.active_rpc_count >= state.maximum_concurrent_rpcs)
+ rpc_state, rpc_future = _handle_call(
+ event, state.generic_handlers, state.interceptor_pipeline,
+ state.thread_pool, concurrency_exceeded)
+ if rpc_state is not None:
+ state.rpc_states.add(rpc_state)
+ if rpc_future is not None:
+ state.active_rpc_count += 1
+ rpc_future.add_done_callback(
+ lambda unused_future: _on_call_completed(state))
+ if state.stage is _ServerStage.STARTED:
+ _request_call(state)
+ elif _stop_serving(state):
+ should_continue = False
+ else:
+ rpc_state, callbacks = event.tag(event)
+ for callback in callbacks:
+ callable_util.call_logging_exceptions(callback,
+ 'Exception calling callback!')
+ if rpc_state is not None:
with state.lock:
- state.due.remove(_SHUTDOWN_TAG)
+ state.rpc_states.remove(rpc_state)
if _stop_serving(state):
- return
- elif event.tag is _REQUEST_CALL_TAG:
- with state.lock:
- state.due.remove(_REQUEST_CALL_TAG)
- concurrency_exceeded = (
- state.maximum_concurrent_rpcs is not None and
- state.active_rpc_count >= state.maximum_concurrent_rpcs)
- rpc_state, rpc_future = _handle_call(
- event, state.generic_handlers, state.interceptor_pipeline,
- state.thread_pool, concurrency_exceeded)
- if rpc_state is not None:
- state.rpc_states.add(rpc_state)
- if rpc_future is not None:
- state.active_rpc_count += 1
- rpc_future.add_done_callback(
- lambda unused_future: _on_call_completed(state))
- if state.stage is _ServerStage.STARTED:
- _request_call(state)
- elif _stop_serving(state):
- return
- else:
- rpc_state, callbacks = event.tag(event)
- for callback in callbacks:
- callable_util.call_logging_exceptions(
- callback, 'Exception calling callback!')
- if rpc_state is not None:
- with state.lock:
- state.rpc_states.remove(rpc_state)
- if _stop_serving(state):
- return
+ should_continue = False
+ return should_continue
+
+
+def _serve(state):
+ while True:
+ timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
+ event = state.completion_queue.poll(timeout)
+ if state.server_deallocated:
+ _begin_shutdown_once(state)
+ if event.completion_type != cygrpc.CompletionType.queue_timeout:
+ if not _process_event_and_continue(state, event):
+ return
# We want to force the deletion of the previous event
# ~before~ we poll again; if the event has a reference
# to a shutdown Call object, this can induce spinlock.
event = None
+def _begin_shutdown_once(state):
+ with state.lock:
+ if state.stage is _ServerStage.STARTED:
+ state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
+ state.stage = _ServerStage.GRACE
+ state.shutdown_events = []
+ state.due.add(_SHUTDOWN_TAG)
+
+
def _stop(state, grace):
with state.lock:
if state.stage is _ServerStage.STOPPED:
@@ -765,11 +789,7 @@ def _stop(state, grace):
shutdown_event.set()
return shutdown_event
else:
- if state.stage is _ServerStage.STARTED:
- state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
- state.stage = _ServerStage.GRACE
- state.shutdown_events = []
- state.due.add(_SHUTDOWN_TAG)
+ _begin_shutdown_once(state)
shutdown_event = threading.Event()
state.shutdown_events.append(shutdown_event)
if grace is None:
@@ -840,7 +860,9 @@ class _Server(grpc.Server):
return _stop(self._state, grace)
def __del__(self):
- _stop(self._state, None)
+ # We can not grab a lock in __del__(), so set a flag to signal the
+ # serving daemon thread (if it exists) to initiate shutdown.
+ self._state.server_deallocated = True
def create_server(thread_pool, generic_rpc_handlers, interceptors, options,