aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 18:14:00 +0000
committerGravatar Nathaniel Manista <nathaniel@google.com>2015-02-20 18:14:00 +0000
commit97993b65b31c0885597ee70c343a4e97710a18ef (patch)
treeee4e646c1522a869c424e2477e75b41e96e9beb7
parentaad032532cebc98eb3053b53a20ff8b91cf64d29 (diff)
Fix a thread leak bug.
Successful operations were leaking the thread used for expiration monitoring. This change ensures that the ExpirationManager for the operation always has its abort() method called when the TerminationManager for the operation judges the operation to have terminated.
-rw-r--r--src/python/src/grpc/framework/base/packets/_ends.py2
-rw-r--r--src/python/src/grpc/framework/base/packets/_interfaces.py6
-rw-r--r--src/python/src/grpc/framework/base/packets/_termination.py5
-rw-r--r--src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py9
4 files changed, 18 insertions, 4 deletions
diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/packets/_ends.py
index 15bf3bf330..ac369c4fbd 100644
--- a/src/python/src/grpc/framework/base/packets/_ends.py
+++ b/src/python/src/grpc/framework/base/packets/_ends.py
@@ -215,6 +215,7 @@ def _front_operate(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
+ termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
@@ -340,6 +341,7 @@ def _back_operate(
lock, termination_manager, transmission_manager, ingestion_manager,
expiration_manager)
+ termination_manager.set_expiration_manager(expiration_manager)
transmission_manager.set_ingestion_and_expiration_managers(
ingestion_manager, expiration_manager)
operation_context.set_ingestion_and_expiration_managers(
diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/packets/_interfaces.py
index 70d9572391..64dc33e8d5 100644
--- a/src/python/src/grpc/framework/base/packets/_interfaces.py
+++ b/src/python/src/grpc/framework/base/packets/_interfaces.py
@@ -42,6 +42,11 @@ class TerminationManager(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
+ def set_expiration_manager(self, expiration_manager):
+ """Sets the ExpirationManager with which this object will cooperate."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
def is_active(self):
"""Reports whether or not the operation is active.
@@ -169,6 +174,7 @@ class IngestionManager(stream.Consumer):
@abc.abstractmethod
def set_expiration_manager(self, expiration_manager):
"""Sets the ExpirationManager with which this object will cooperate."""
+ raise NotImplementedError()
@abc.abstractmethod
def start(self, requirement):
diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/packets/_termination.py
index 5c10da7aa8..575eee65a8 100644
--- a/src/python/src/grpc/framework/base/packets/_termination.py
+++ b/src/python/src/grpc/framework/base/packets/_termination.py
@@ -86,11 +86,15 @@ class _TerminationManager(_interfaces.TerminationManager):
self._action = action
self._local_failure = local_failure
self._has_locally_failed = False
+ self._expiration_manager = None
self._outstanding_requirements = set(requirements)
self._kind = None
self._callbacks = []
+ def set_expiration_manager(self, expiration_manager):
+ self._expiration_manager = expiration_manager
+
def _terminate(self, kind):
"""Terminates the operation.
@@ -100,6 +104,7 @@ class _TerminationManager(_interfaces.TerminationManager):
packets.Kind.TRANSMISSION_FAILURE, packets.Kind.SERVICER_FAILURE, or
packets.Kind.SERVICED_FAILURE.
"""
+ self._expiration_manager.abort()
self._outstanding_requirements = None
callbacks = list(self._callbacks)
self._callbacks = None
diff --git a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
index 993098f4ae..4cb0baf517 100644
--- a/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
+++ b/src/python/src/grpc/framework/face/testing/blocking_invocation_inline_service_test_case.py
@@ -41,6 +41,7 @@ from grpc.framework.face.testing import stock_service
from grpc.framework.face.testing import test_case
_TIMEOUT = 3
+_LONG_TIMEOUT = 45
class BlockingInvocationInlineServiceTestCase(
@@ -82,7 +83,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response = self.stub.blocking_value_in_value_out(
- name, request, _TIMEOUT)
+ name, request, _LONG_TIMEOUT)
test_messages.verify(request, response, self)
@@ -93,7 +94,7 @@ class BlockingInvocationInlineServiceTestCase(
request = test_messages.request()
response_iterator = self.stub.inline_value_in_stream_out(
- name, request, _TIMEOUT)
+ name, request, _LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(request, responses, self)
@@ -105,7 +106,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response = self.stub.blocking_stream_in_value_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), _LONG_TIMEOUT)
test_messages.verify(requests, response, self)
@@ -116,7 +117,7 @@ class BlockingInvocationInlineServiceTestCase(
requests = test_messages.requests()
response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), _TIMEOUT)
+ name, iter(requests), _LONG_TIMEOUT)
responses = list(response_iterator)
test_messages.verify(requests, responses, self)