diff options
4 files changed, 18 insertions, 4 deletions
diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/packets/_ends.py index 15bf3bf330..ac369c4fbd 100644 --- a/src/python/src/grpc/framework/base/packets/_ends.py +++ b/src/python/src/grpc/framework/base/packets/_ends.py @@ -215,6 +215,7 @@ def _front_operate( lock, termination_manager, transmission_manager, ingestion_manager, expiration_manager) + termination_manager.set_expiration_manager(expiration_manager) transmission_manager.set_ingestion_and_expiration_managers( ingestion_manager, expiration_manager) operation_context.set_ingestion_and_expiration_managers( @@ -340,6 +341,7 @@ def _back_operate( lock, termination_manager, transmission_manager, ingestion_manager, expiration_manager) + termination_manager.set_expiration_manager(expiration_manager) transmission_manager.set_ingestion_and_expiration_managers( ingestion_manager, expiration_manager) operation_context.set_ingestion_and_expiration_managers( diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/packets/_interfaces.py index 70d9572391..64dc33e8d5 100644 --- a/src/python/src/grpc/framework/base/packets/_interfaces.py +++ b/src/python/src/grpc/framework/base/packets/_interfaces.py @@ -42,6 +42,11 @@ class TerminationManager(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod + def set_expiration_manager(self, expiration_manager): + """Sets the ExpirationManager with which this object will cooperate.""" + raise NotImplementedError() + + @abc.abstractmethod def is_active(self): """Reports whether or not the operation is active. @@ -169,6 +174,7 @@ class IngestionManager(stream.Consumer): @abc.abstractmethod def set_expiration_manager(self, expiration_manager): """Sets the ExpirationManager with which this object will cooperate.""" + raise NotImplementedError() @abc.abstractmethod def start(self, requirement): diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/packets/_termination.py index 5c10da7aa8..575eee65a8 100644 --- a/src/python/src/grpc/framework/base/packets/_termination.py +++ b/src/python/src/grpc/framework/base/packets/_termination.py @@ -86,11 +86,15 @@ class _TerminationManager(_interfaces.TerminationManager): self._action = action self._local_failure = local_failure self._has_locally_failed = False + self._expiration_manager = None self._outstanding_requirements = set(requirements) self._kind = None self._callbacks = [] + def set_expiration_manager(self, expiration_manager): + self._expiration_manager = expiration_manager + def _terminate(self, kind): """Terminates the operation. @@ -100,6 +104,7 @@ class _TerminationManager(_interfaces.TerminationManager): packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or packets.Kind.SERVICED_FAILURE. """ + self._expiration_manager.abort() self._outstanding_requirements = None callbacks = list(self._callbacks) self._callbacks = None diff --git a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py index 993098f4ae..4cb0baf517 100644 --- a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -41,6 +41,7 @@ from grpc.framework.face.testing import stock_service from grpc.framework.face.testing import test_case _TIMEOUT = 3 +_LONG_TIMEOUT = 45 class BlockingInvocationInlineServiceTestCase( @@ -82,7 +83,7 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() response = self.stub.blocking_value_in_value_out( - name, request, _TIMEOUT) + name, request, _LONG_TIMEOUT) test_messages.verify(request, response, self) @@ -93,7 +94,7 @@ class BlockingInvocationInlineServiceTestCase( request = test_messages.request() response_iterator = self.stub.inline_value_in_stream_out( - name, request, _TIMEOUT) + name, request, _LONG_TIMEOUT) responses = list(response_iterator) test_messages.verify(request, responses, self) @@ -105,7 +106,7 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() response = self.stub.blocking_stream_in_value_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), _LONG_TIMEOUT) test_messages.verify(requests, response, self) @@ -116,7 +117,7 @@ class BlockingInvocationInlineServiceTestCase( requests = test_messages.requests() response_iterator = self.stub.inline_stream_in_stream_out( - name, iter(requests), _TIMEOUT) + name, iter(requests), _LONG_TIMEOUT) responses = list(response_iterator) test_messages.verify(requests, responses, self) |