diff options
author | Ken Payson <kpayson@google.com> | 2017-12-14 11:53:22 -0800 |
---|---|---|
committer | Ken Payson <kpayson@google.com> | 2017-12-14 12:07:07 -0800 |
commit | 8179b9523a5f56c1cea63ca5060ccebf6f30c7b6 (patch) | |
tree | 0f9c6a72ade43d08db484ccb01ce610426de6c96 /src/python | |
parent | 04fb1c0b316ecfdda54edaf15410e54b88d57be3 (diff) |
Fixes race condition in Python server shutdown
When we set the call state to "CANCELLED" after
grpc_cancel_all_calls, we would block other start batch
operations from happening. The rpc_state for the cancelled
call would still be in the server's rpc_states set, but it
would never get removed because there were no active batches
for the call, and the only place we remove from rpc_states is
when a batch completes.
It is better to rely on c-core's cancellation. Once a call
is cancelled, all subsequent ops on that call will return
immediately with a cancellation error.
The RLock() change is due to the possibility that
_on_call_completed
gets invoked immediately when the call has already completed when the
rpc_future callback is created.
Diffstat (limited to 'src/python')
-rw-r--r-- | src/python/grpcio/grpc/_server.py | 12 |
1 files changed, 1 insertions, 11 deletions
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 8857bd3eb5..02d3af8706 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -634,7 +634,7 @@ class _ServerState(object): # pylint: disable=too-many-arguments def __init__(self, completion_queue, server, generic_handlers, interceptor_pipeline, thread_pool, maximum_concurrent_rpcs): - self.lock = threading.Lock() + self.lock = threading.RLock() self.completion_queue = completion_queue self.server = server self.generic_handlers = list(generic_handlers) @@ -747,22 +747,12 @@ def _stop(state, grace): state.shutdown_events.append(shutdown_event) if grace is None: state.server.cancel_all_calls() - # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. - for rpc_state in state.rpc_states: - with rpc_state.condition: - rpc_state.client = _CANCELLED - rpc_state.condition.notify_all() else: def cancel_all_calls_after_grace(): shutdown_event.wait(timeout=grace) with state.lock: state.server.cancel_all_calls() - # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop. - for rpc_state in state.rpc_states: - with rpc_state.condition: - rpc_state.client = _CANCELLED - rpc_state.condition.notify_all() thread = threading.Thread(target=cancel_all_calls_after_grace) thread.start() |