aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 10:30:59 -0800
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 10:30:59 -0800
commitb472e2364af6ba2a33c7321cfa9cbbfd6cbb4eb7 (patch)
tree1d22521593e3a7b270cbb8378118f485b81d75b9
parentc9a53f9568b418ab26b60c44b06ea0cc7c6f3425 (diff)
parent97993b65b31c0885597ee70c343a4e97710a18ef (diff)
Merge pull request #657 from nathanielmanistaatgoogle/thread-leak-fix
-rw-r--r--src/python/src/grpc/framework/base/packets/_ends.py2
-rw-r--r--src/python/src/grpc/framework/base/packets/_interfaces.py6
-rw-r--r--src/python/src/grpc/framework/base/packets/_termination.py5
-rw-r--r--src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py9
4 files changed, 18 insertions, 4 deletions
diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/packets/_ends.py
index 15bf3bf330..ac369c4fbd 100644
--- a/src/python/src/grpc/framework/base/packets/_ends.py
+++ b/src/python/src/grpc/framework/base/packets/_ends.py
@@ -215,6 +215,7 @@ def _front_operate(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
+ termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
@@ -340,6 +341,7 @@ def _back_operate(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
+ termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/packets/_interfaces.py
index 70d9572391..64dc33e8d5 100644
--- a/src/python/src/grpc/framework/base/packets/_interfaces.py
+++ b/src/python/src/grpc/framework/base/packets/_interfaces.py
@@ -42,6 +42,11 @@ class TerminationManager(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the ExpirationManager with which this object will cooperate."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
def is_active(self):
"""Reports whether or not the operation is active.
@@ -169,6 +174,7 @@ class IngestionManager(stream.Consumer):
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate."""
+ raise NotImplementedError()
@abc.abstractmethod
def start(self, requirement):
diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/packets/_termination.py
index 5c10da7aa8..575eee65a8 100644
--- a/src/python/src/grpc/framework/base/packets/_termination.py
+++ b/src/python/src/grpc/framework/base/packets/_termination.py
@@ -86,11 +86,15 @@ class _TerminationManager(_interfaces.TerminationManager):
self._action = action
self._local_failure = local_failure
self._has_locally_failed = False
+ self._expiration_manager = None
self._outstanding_requirements = set(requirements)
self._kind = None
self._callbacks = []
+ def set_expiration_manager(self, expiration_manager):
+ self._expiration_manager = expiration_manager
+
def _terminate(self, kind):
"""Terminates the operation.
@@ -100,6 +104,7 @@ class _TerminationManager(_interfaces.TerminationManager):
packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
packets.Kind.SERVICED_FAILURE.
"""
+ self._expiration_manager.abort()
self._outstanding_requirements = None
callbacks = list(self._callbacks)
self._callbacks = None
diff --git a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
index 993098f4ae..4cb0baf517 100644
--- a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
+++ b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -41,6 +41,7 @@ from grpc.framework.face.testing import stock_service
from grpc.framework.face.testing import test_case
_TIMEOUT = 3
+_LONG_TIMEOUT = 45
class BlockingInvocationInlineServiceTestCase(
@@ -82,7 +83,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, _LONG_TIMEOUT)
test_messages.verify(request, response, self)
@@ -93,7 +94,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, _LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@@ -105,7 +106,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), _LONG_TIMEOUT)
test_messages.verify(requests, response, self)
@@ -116,7 +117,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), _LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)