From e0abeeaa6d360a35d6a66da16d047fd2abc2a5d7 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Thu, 3 Dec 2015 00:45:56 +0000 Subject: Migrate _termination_action into _End For a termination action to properly mark an _End as having stopped it must clear the _End's _cycle attribute. To be able to do that the termination action must hold a reference to the _End instance. Migrating the _termination_action behavior that creates termination actions into the scope of the _End instance is the best way to afford the needed instance reference. --- src/python/grpcio/grpc/framework/core/_end.py | 60 ++++++++++++--------------- 1 file changed, 27 insertions(+), 33 deletions(-) (limited to 'src') diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py index 8e07d9061e..9c615672aa 100644 --- a/src/python/grpcio/grpc/framework/core/_end.py +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -85,35 +85,6 @@ def _future_shutdown(lock, cycle, event): return in_future -def _termination_action(lock, stats, operation_id, cycle): - """Constructs the termination action for a single operation. - - Args: - lock: A lock to hold during the termination action. - stats: A mapping from base.Outcome.Kind values to integers to increment - with the outcome kind given to the termination action. - operation_id: The operation ID for the termination action. - cycle: A _Cycle value to be updated during the termination action. - - Returns: - A callable that takes an operation outcome kind as its sole parameter and - that should be used as the termination action for the operation - associated with the given operation ID. - """ - def termination_action(outcome_kind): - with lock: - stats[outcome_kind] += 1 - cycle.operations.pop(operation_id, None) - if not cycle.operations: - for action in cycle.idle_actions: - cycle.pool.submit(action) - cycle.idle_actions = [] - if cycle.grace: - _cancel_futures(cycle.futures) - cycle.pool.shutdown(wait=False) - return termination_action - - class _End(End): """An End implementation.""" @@ -133,6 +104,31 @@ class _End(End): self._cycle = None + def _termination_action(self, operation_id): + """Constructs the termination action for a single operation. + + Args: + operation_id: The operation ID for the termination action. + + Returns: + A callable that takes an operation outcome kind as its sole parameter and + that should be used as the termination action for the operation + associated with the given operation ID. + """ + def termination_action(outcome_kind): + with self._lock: + self._stats[outcome_kind] += 1 + self._cycle.operations.pop(operation_id, None) + if not self._cycle.operations: + for action in self._cycle.idle_actions: + self._cycle.pool.submit(action) + self._cycle.idle_actions = [] + if self._cycle.grace: + _cancel_futures(self._cycle.futures) + self._cycle.pool.shutdown(wait=False) + self._cycle = None + return termination_action + def start(self): """See base.End.start for specification.""" with self._lock: @@ -174,8 +170,7 @@ class _End(End): with self._lock: if self._cycle is None or self._cycle.grace: raise ValueError('Can\'t operate on stopped or stopping End!') - termination_action = _termination_action( - self._lock, self._stats, operation_id, self._cycle) + termination_action = self._termination_action(operation_id) operation = _operation.invocation_operate( operation_id, group, method, subscription, timeout, protocol_options, initial_metadata, payload, completion, self._mate.accept_ticket, @@ -208,8 +203,7 @@ class _End(End): if operation is not None: operation.handle_ticket(ticket) elif self._servicer_package is not None and not self._cycle.grace: - termination_action = _termination_action( - self._lock, self._stats, ticket.operation_id, self._cycle) + termination_action = self._termination_action(ticket.operation_id) operation = _operation.service_operate( self._servicer_package, ticket, self._mate.accept_ticket, termination_action, self._cycle.pool) -- cgit v1.2.3 From 2533365bf41fb6c65053e110aa1e1030b1d64c46 Mon Sep 17 00:00:00 2001 From: Nathaniel Manista Date: Thu, 3 Dec 2015 00:55:25 +0000 Subject: Fixes to stub and server lifecycle Context management is implemented. Stub deletion now cancels all RPCs immediately. --- src/python/grpcio/grpc/beta/_server.py | 175 ++++++++++++++------- src/python/grpcio/grpc/beta/_stub.py | 124 ++++++++++----- .../grpc_test/beta/_beta_features_test.py | 73 +++++++++ 3 files changed, 273 insertions(+), 99 deletions(-) (limited to 'src') diff --git a/src/python/grpcio/grpc/beta/_server.py b/src/python/grpcio/grpc/beta/_server.py index 05b954d186..4f454437c0 100644 --- a/src/python/grpcio/grpc/beta/_server.py +++ b/src/python/grpcio/grpc/beta/_server.py @@ -44,6 +44,12 @@ _DEFAULT_TIMEOUT = 300 _MAXIMUM_TIMEOUT = 24 * 60 * 60 +def _set_event(): + event = threading.Event() + event.set() + return event + + class _GRPCServicer(base.Servicer): def __init__(self, delegate): @@ -61,86 +67,143 @@ class _GRPCServicer(base.Servicer): raise -def _disassemble(grpc_link, end_link, pool, event, grace): - grpc_link.begin_stop() - end_link.stop(grace).wait() - grpc_link.end_stop() - grpc_link.join_link(utilities.NULL_LINK) - end_link.join_link(utilities.NULL_LINK) - if pool is not None: - pool.shutdown(wait=True) - event.set() +class _Server(interfaces.Server): + def __init__( + self, implementations, multi_implementation, pool, pool_size, + default_timeout, maximum_timeout, grpc_link): + self._lock = threading.Lock() + self._implementations = implementations + self._multi_implementation = multi_implementation + self._customer_pool = pool + self._pool_size = pool_size + self._default_timeout = default_timeout + self._maximum_timeout = maximum_timeout + self._grpc_link = grpc_link -class Server(interfaces.Server): + self._end_link = None + self._stop_events = None + self._pool = None - def __init__(self, grpc_link, end_link, pool): - self._grpc_link = grpc_link - self._end_link = end_link - self._pool = pool + def _start(self): + with self._lock: + if self._end_link is not None: + raise ValueError('Cannot start already-started server!') + + if self._customer_pool is None: + self._pool = logging_pool.pool(self._pool_size) + assembly_pool = self._pool + else: + assembly_pool = self._customer_pool + + servicer = _GRPCServicer( + _crust_implementations.servicer( + self._implementations, self._multi_implementation, assembly_pool)) + + self._end_link = _core_implementations.service_end_link( + servicer, self._default_timeout, self._maximum_timeout) + + self._grpc_link.join_link(self._end_link) + self._end_link.join_link(self._grpc_link) + self._grpc_link.start() + self._end_link.start() + + def _dissociate_links_and_shut_down_pool(self): + self._grpc_link.end_stop() + self._grpc_link.join_link(utilities.NULL_LINK) + self._end_link.join_link(utilities.NULL_LINK) + self._end_link = None + if self._pool is not None: + self._pool.shutdown(wait=True) + self._pool = None + + def _stop_stopping(self): + self._dissociate_links_and_shut_down_pool() + for stop_event in self._stop_events: + stop_event.set() + self._stop_events = None + + def _stop_started(self): + self._grpc_link.begin_stop() + self._end_link.stop(0).wait() + self._dissociate_links_and_shut_down_pool() + + def _foreign_thread_stop(self, end_stop_event, stop_events): + end_stop_event.wait() + with self._lock: + if self._stop_events is stop_events: + self._stop_stopping() + + def _schedule_stop(self, grace): + with self._lock: + if self._end_link is None: + return _set_event() + server_stop_event = threading.Event() + if self._stop_events is None: + self._stop_events = [server_stop_event] + self._grpc_link.begin_stop() + else: + self._stop_events.append(server_stop_event) + end_stop_event = self._end_link.stop(grace) + end_stop_thread = threading.Thread( + target=self._foreign_thread_stop, + args=(end_stop_event, self._stop_events)) + end_stop_thread.start() + return server_stop_event + + def _stop_now(self): + with self._lock: + if self._end_link is not None: + if self._stop_events is None: + self._stop_started() + else: + self._stop_stopping() def add_insecure_port(self, address): - return self._grpc_link.add_port(address, None) + with self._lock: + if self._end_link is None: + return self._grpc_link.add_port(address, None) + else: + raise ValueError('Can\'t add port to serving server!') def add_secure_port(self, address, server_credentials): - return self._grpc_link.add_port( - address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access - - def _start(self): - self._grpc_link.join_link(self._end_link) - self._end_link.join_link(self._grpc_link) - self._grpc_link.start() - self._end_link.start() - - def _stop(self, grace): - stop_event = threading.Event() - if 0 < grace: - disassembly_thread = threading.Thread( - target=_disassemble, - args=( - self._grpc_link, self._end_link, self._pool, stop_event, grace,)) - disassembly_thread.start() - return stop_event - else: - _disassemble(self._grpc_link, self._end_link, self._pool, stop_event, 0) - return stop_event + with self._lock: + if self._end_link is None: + return self._grpc_link.add_port( + address, server_credentials._intermediary_low_credentials) # pylint: disable=protected-access + else: + raise ValueError('Can\'t add port to serving server!') def start(self): self._start() def stop(self, grace): - return self._stop(grace) + if 0 < grace: + return self._schedule_stop(grace) + else: + self._stop_now() + return _set_event() def __enter__(self): self._start() return self def __exit__(self, exc_type, exc_val, exc_tb): - self._stop(0).wait() + self._stop_now() return False + def __del__(self): + self._stop_now() + def server( implementations, multi_implementation, request_deserializers, response_serializers, thread_pool, thread_pool_size, default_timeout, maximum_timeout): - if thread_pool is None: - service_thread_pool = logging_pool.pool( - _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) - assembly_thread_pool = service_thread_pool - else: - service_thread_pool = thread_pool - assembly_thread_pool = None - - servicer = _GRPCServicer( - _crust_implementations.servicer( - implementations, multi_implementation, service_thread_pool)) - grpc_link = service.service_link(request_deserializers, response_serializers) - - end_link = _core_implementations.service_end_link( - servicer, + return _Server( + implementations, multi_implementation, thread_pool, + _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size, _DEFAULT_TIMEOUT if default_timeout is None else default_timeout, - _MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout) - - return Server(grpc_link, end_link, assembly_thread_pool) + _MAXIMUM_TIMEOUT if maximum_timeout is None else maximum_timeout, + grpc_link) diff --git a/src/python/grpcio/grpc/beta/_stub.py b/src/python/grpcio/grpc/beta/_stub.py index 11dab889cd..2af019309a 100644 --- a/src/python/grpcio/grpc/beta/_stub.py +++ b/src/python/grpcio/grpc/beta/_stub.py @@ -42,76 +42,114 @@ _DEFAULT_POOL_SIZE = 6 class _AutoIntermediary(object): - def __init__(self, delegate, on_deletion): + def __init__(self, up, down, delegate): + self._lock = threading.Lock() + self._up = up + self._down = down + self._in_context = False self._delegate = delegate - self._on_deletion = on_deletion def __getattr__(self, attr): - return getattr(self._delegate, attr) + with self._lock: + if self._delegate is None: + raise AttributeError('No useful attributes out of context!') + else: + return getattr(self._delegate, attr) def __enter__(self): - return self + with self._lock: + if self._in_context: + raise ValueError('Already in context!') + elif self._delegate is None: + self._delegate = self._up() + self._in_context = True + return self def __exit__(self, exc_type, exc_val, exc_tb): - return False + with self._lock: + if not self._in_context: + raise ValueError('Not in context!') + self._down() + self._in_context = False + self._delegate = None + return False def __del__(self): - self._on_deletion() + with self._lock: + if self._delegate is not None: + self._down() + self._delegate = None + + +class _StubAssemblyManager(object): + + def __init__( + self, thread_pool, thread_pool_size, end_link, grpc_link, stub_creator): + self._thread_pool = thread_pool + self._pool_size = thread_pool_size + self._end_link = end_link + self._grpc_link = grpc_link + self._stub_creator = stub_creator + self._own_pool = None + + def up(self): + if self._thread_pool is None: + self._own_pool = logging_pool.pool( + _DEFAULT_POOL_SIZE if self._pool_size is None else self._pool_size) + assembly_pool = self._own_pool + else: + assembly_pool = self._thread_pool + self._end_link.join_link(self._grpc_link) + self._grpc_link.join_link(self._end_link) + self._end_link.start() + self._grpc_link.start() + return self._stub_creator(self._end_link, assembly_pool) + + def down(self): + self._end_link.stop(0).wait() + self._grpc_link.stop() + self._end_link.join_link(utilities.NULL_LINK) + self._grpc_link.join_link(utilities.NULL_LINK) + if self._own_pool is not None: + self._own_pool.shutdown(wait=True) + self._own_pool = None def _assemble( channel, host, metadata_transformer, request_serializers, - response_deserializers, thread_pool, thread_pool_size): + response_deserializers, thread_pool, thread_pool_size, stub_creator): end_link = _core_implementations.invocation_end_link() grpc_link = invocation.invocation_link( channel, host, metadata_transformer, request_serializers, response_deserializers) - if thread_pool is None: - invocation_pool = logging_pool.pool( - _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size) - assembly_pool = invocation_pool - else: - invocation_pool = thread_pool - assembly_pool = None - end_link.join_link(grpc_link) - grpc_link.join_link(end_link) - end_link.start() - grpc_link.start() - return end_link, grpc_link, invocation_pool, assembly_pool - - -def _disassemble(end_link, grpc_link, pool): - end_link.stop(24 * 60 * 60).wait() - grpc_link.stop() - end_link.join_link(utilities.NULL_LINK) - grpc_link.join_link(utilities.NULL_LINK) - if pool is not None: - pool.shutdown(wait=True) - - -def _wrap_assembly(stub, end_link, grpc_link, assembly_pool): - disassembly_thread = threading.Thread( - target=_disassemble, args=(end_link, grpc_link, assembly_pool)) - return _AutoIntermediary(stub, disassembly_thread.start) + stub_assembly_manager = _StubAssemblyManager( + thread_pool, thread_pool_size, end_link, grpc_link, stub_creator) + stub = stub_assembly_manager.up() + return _AutoIntermediary( + stub_assembly_manager.up, stub_assembly_manager.down, stub) + + +def _dynamic_stub_creator(service, cardinalities): + def create_dynamic_stub(end_link, invocation_pool): + return _crust_implementations.dynamic_stub( + end_link, service, cardinalities, invocation_pool) + return create_dynamic_stub def generic_stub( channel, host, metadata_transformer, request_serializers, response_deserializers, thread_pool, thread_pool_size): - end_link, grpc_link, invocation_pool, assembly_pool = _assemble( + return _assemble( channel, host, metadata_transformer, request_serializers, - response_deserializers, thread_pool, thread_pool_size) - stub = _crust_implementations.generic_stub(end_link, invocation_pool) - return _wrap_assembly(stub, end_link, grpc_link, assembly_pool) + response_deserializers, thread_pool, thread_pool_size, + _crust_implementations.generic_stub) def dynamic_stub( channel, host, service, cardinalities, metadata_transformer, request_serializers, response_deserializers, thread_pool, thread_pool_size): - end_link, grpc_link, invocation_pool, assembly_pool = _assemble( + return _assemble( channel, host, metadata_transformer, request_serializers, - response_deserializers, thread_pool, thread_pool_size) - stub = _crust_implementations.dynamic_stub( - end_link, service, cardinalities, invocation_pool) - return _wrap_assembly(stub, end_link, grpc_link, assembly_pool) + response_deserializers, thread_pool, thread_pool_size, + _dynamic_stub_creator(service, cardinalities)) diff --git a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py index 5916a9e3ea..6b5090f8bb 100644 --- a/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py +++ b/src/python/grpcio_test/grpc_test/beta/_beta_features_test.py @@ -224,5 +224,78 @@ class BetaFeaturesTest(unittest.TestCase): self.assertEqual(_RESPONSE, response) +class ContextManagementAndLifecycleTest(unittest.TestCase): + + def setUp(self): + self._servicer = _Servicer() + self._method_implementations = { + (_GROUP, _UNARY_UNARY): + utilities.unary_unary_inline(self._servicer.unary_unary), + (_GROUP, _UNARY_STREAM): + utilities.unary_stream_inline(self._servicer.unary_stream), + (_GROUP, _STREAM_UNARY): + utilities.stream_unary_inline(self._servicer.stream_unary), + (_GROUP, _STREAM_STREAM): + utilities.stream_stream_inline(self._servicer.stream_stream), + } + + self._cardinalities = { + _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY, + _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM, + _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY, + _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM, + } + + self._server_options = implementations.server_options( + thread_pool_size=test_constants.POOL_SIZE) + self._server_credentials = implementations.ssl_server_credentials( + [(resources.private_key(), resources.certificate_chain(),),]) + self._client_credentials = implementations.ssl_client_credentials( + resources.test_root_certificates(), None, None) + self._stub_options = implementations.stub_options( + thread_pool_size=test_constants.POOL_SIZE) + + def test_stub_context(self): + server = implementations.server( + self._method_implementations, options=self._server_options) + port = server.add_secure_port('[::]:0', self._server_credentials) + server.start() + + channel = test_utilities.not_really_secure_channel( + 'localhost', port, self._client_credentials, _SERVER_HOST_OVERRIDE) + dynamic_stub = implementations.dynamic_stub( + channel, _GROUP, self._cardinalities, options=self._stub_options) + for _ in range(100): + with dynamic_stub: + pass + for _ in range(10): + with dynamic_stub: + call_options = interfaces.grpc_call_options( + disable_compression=True) + response = getattr(dynamic_stub, _UNARY_UNARY)( + _REQUEST, test_constants.LONG_TIMEOUT, + protocol_options=call_options) + self.assertEqual(_RESPONSE, response) + self.assertIsNotNone(self._servicer.peer()) + + server.stop(test_constants.SHORT_TIMEOUT).wait() + + def test_server_lifecycle(self): + for _ in range(100): + server = implementations.server( + self._method_implementations, options=self._server_options) + port = server.add_secure_port('[::]:0', self._server_credentials) + server.start() + server.stop(test_constants.SHORT_TIMEOUT).wait() + for _ in range(100): + server = implementations.server( + self._method_implementations, options=self._server_options) + server.add_secure_port('[::]:0', self._server_credentials) + server.add_insecure_port('[::]:0') + with server: + server.stop(test_constants.SHORT_TIMEOUT) + server.stop(test_constants.SHORT_TIMEOUT) + + if __name__ == '__main__': unittest.main(verbosity=2) -- cgit v1.2.3