aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Ken Payson <kpayson@google.com>2017-12-14 11:53:22 -0800
committerGravatar Ken Payson <kpayson@google.com>2017-12-14 12:07:07 -0800
commit8179b9523a5f56c1cea63ca5060ccebf6f30c7b6 (patch)
tree0f9c6a72ade43d08db484ccb01ce610426de6c96 /src
parent04fb1c0b316ecfdda54edaf15410e54b88d57be3 (diff)
Fixes race condition in Python server shutdown
When we set the call state to "CANCELLED" after grpc_cancel_all_calls, we would block other start batch operations from happening. The rpc_state for the cancelled call would still be in the server's rpc_states set, but it would never get removed because there were no active batches for the call, and the only place we remove from rpc_states is when a batch completes. It is better to rely on c-core's cancellation. Once a call is cancelled, all subsequent ops on that call will return immediately with a cancellation error. The RLock() change is due to the possibility that _on_call_completed gets invoked immediately when the call has already completed when the rpc_future callback is created.
Diffstat (limited to 'src')
-rw-r--r--src/python/grpcio/grpc/_server.py12
1 files changed, 1 insertions, 11 deletions
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 8857bd3eb5..02d3af8706 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -634,7 +634,7 @@ class _ServerState(object):
# pylint: disable=too-many-arguments
def __init__(self, completion_queue, server, generic_handlers,
interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
- self.lock = threading.Lock()
+ self.lock = threading.RLock()
self.completion_queue = completion_queue
self.server = server
self.generic_handlers = list(generic_handlers)
@@ -747,22 +747,12 @@ def _stop(state, grace):
state.shutdown_events.append(shutdown_event)
if grace is None:
state.server.cancel_all_calls()
- # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
- for rpc_state in state.rpc_states:
- with rpc_state.condition:
- rpc_state.client = _CANCELLED
- rpc_state.condition.notify_all()
else:
def cancel_all_calls_after_grace():
shutdown_event.wait(timeout=grace)
with state.lock:
state.server.cancel_all_calls()
- # TODO(https://github.com/grpc/grpc/issues/6597): delete this loop.
- for rpc_state in state.rpc_states:
- with rpc_state.condition:
- rpc_state.client = _CANCELLED
- rpc_state.condition.notify_all()
thread = threading.Thread(target=cancel_all_calls_after_grace)
thread.start()