From 71e7e6ddc73175df0793748e290e29321934fd7c Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 26 Dec 2018 12:29:52 -0800 Subject: Add Watch method to health check service --- .../grpc_health/v1/health.py | 76 ++++++++- .../tests/health_check/_health_servicer_test.py | 174 ++++++++++++++++++--- 2 files changed, 223 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 0583659428..75c480b0a7 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -23,15 +23,61 @@ from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name +class _Watcher(): + + def __init__(self): + self._condition = threading.Condition() + self._responses = list() + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._responses and self._open: + self._condition.wait() + if self._responses: + return self._responses.pop(0) + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, response): + with self._condition: + self._responses.append(response) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + class HealthServicer(_health_pb2_grpc.HealthServicer): """Servicer handling RPCs for service statuses.""" def __init__(self): - self._server_status_lock = threading.Lock() + self._lock = threading.RLock() self._server_status = {} + self._watchers = {} + + def _on_close_callback(self, watcher, service): + + def callback(): + with self._lock: + self._watchers[service].remove(watcher) + watcher.close() + + return callback def Check(self, request, context): - with self._server_status_lock: + with self._lock: status = self._server_status.get(request.service) if status is None: context.set_code(grpc.StatusCode.NOT_FOUND) @@ -39,14 +85,30 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): else: return _health_pb2.HealthCheckResponse(status=status) + def Watch(self, request, context): + service = request.service + with self._lock: + status = self._server_status.get(service) + if status is None: + status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member + watcher = _Watcher() + watcher.add(_health_pb2.HealthCheckResponse(status=status)) + if service not in self._watchers: + self._watchers[service] = set() + self._watchers[service].add(watcher) + context.add_callback(self._on_close_callback(watcher, service)) + return watcher + def set(self, service, status): """Sets the status of a service. Args: - service: string, the name of the service. - NOTE, '' must be set. - status: HealthCheckResponse.status enum value indicating - the status of the service + service: string, the name of the service. NOTE, '' must be set. + status: HealthCheckResponse.status enum value indicating the status of + the service """ - with self._server_status_lock: + with self._lock: self._server_status[service] = status + if service in self._watchers: + for watcher in self._watchers[service]: + watcher.add(_health_pb2.HealthCheckResponse(status=status)) diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index c1d9436c2f..657ceef1e4 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests of grpc_health.v1.health.""" +import threading import unittest import grpc @@ -22,21 +23,36 @@ from grpc_health.v1 import health_pb2_grpc from tests.unit import test_common +from six.moves import queue + +_QUEUE_TIMEOUT_S = 5 + +_SERVING_SERVICE = 'grpc.test.TestServiceServing' +_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' +_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' +_WATCH_SERVICE = 'grpc.test.WatchService' + + +def _consume_responses(response_iterator, response_queue): + for response in response_iterator: + response_queue.put(response) + class HealthServicerTest(unittest.TestCase): def setUp(self): - servicer = health.HealthServicer() - servicer.set('', health_pb2.HealthCheckResponse.SERVING) - servicer.set('grpc.test.TestServiceServing', - health_pb2.HealthCheckResponse.SERVING) - servicer.set('grpc.test.TestServiceUnknown', - health_pb2.HealthCheckResponse.UNKNOWN) - servicer.set('grpc.test.TestServiceNotServing', - health_pb2.HealthCheckResponse.NOT_SERVING) + self._servicer = health.HealthServicer() + self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) + self._servicer.set(_SERVING_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + self._servicer.set(_UNKNOWN_SERVICE, + health_pb2.HealthCheckResponse.UNKNOWN) + self._servicer.set(_NOT_SERVING_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') - health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server) + health_pb2_grpc.add_HealthServicer_to_server(self._servicer, + self._server) self._server.start() self._channel = grpc.insecure_channel('localhost:%d' % port) @@ -46,37 +62,155 @@ class HealthServicerTest(unittest.TestCase): self._server.stop(None) self._channel.close() - def test_empty_service(self): + def test_check_empty_service(self): request = health_pb2.HealthCheckRequest() resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) - def test_serving_service(self): - request = health_pb2.HealthCheckRequest( - service='grpc.test.TestServiceServing') + def test_check_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) - def test_unknown_serivce(self): - request = health_pb2.HealthCheckRequest( - service='grpc.test.TestServiceUnknown') + def test_check_unknown_serivce(self): + request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) - def test_not_serving_service(self): - request = health_pb2.HealthCheckRequest( - service='grpc.test.TestServiceNotServing') + def test_check_not_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, resp.status) - def test_not_found_service(self): + def test_check_not_found_service(self): request = health_pb2.HealthCheckRequest(service='not-found') with self.assertRaises(grpc.RpcError) as context: resp = self._stub.Check(request) self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) + def test_watch_empty_service(self): + request = health_pb2.HealthCheckRequest(service='') + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + + def test_watch_new_service(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) + response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + response.status) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + + def test_watch_service_isolation(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + self._servicer.set('some-other-service', + health_pb2.HealthCheckResponse.SERVING) + with self.assertRaises(queue.Empty) as context: + response_queue.get(timeout=_QUEUE_TIMEOUT_S) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + + def test_two_watchers(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue1 = queue.Queue() + response_queue2 = queue.Queue() + rendezvous1 = self._stub.Watch(request) + rendezvous2 = self._stub.Watch(request) + thread1 = threading.Thread( + target=_consume_responses, args=(rendezvous1, response_queue1)) + thread2 = threading.Thread( + target=_consume_responses, args=(rendezvous2, response_queue2)) + thread1.start() + thread2.start() + + response1 = response_queue1.get(timeout=_QUEUE_TIMEOUT_S) + response2 = response_queue2.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response1.status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response2.status) + + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + response1 = response_queue1.get(timeout=_QUEUE_TIMEOUT_S) + response2 = response_queue2.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response1.status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response2.status) + + rendezvous1.cancel() + rendezvous2.cancel() + thread1.join() + thread2.join() + self.assertTrue(response_queue1.empty()) + self.assertTrue(response_queue2.empty()) + + def test_cancelled_watch_removed_from_watch_list(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + rendezvous.cancel() + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + thread.join() + self.assertFalse(self._servicer._watchers[_WATCH_SERVICE], + 'watch set should be empty') + self.assertTrue(response_queue.empty()) + def test_health_service_name(self): self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') -- cgit v1.2.3 From b74af8c70bad274c4955a0e242c725409872342d Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 26 Dec 2018 15:04:38 -0800 Subject: skip test with gevent --- src/python/grpcio_tests/commands.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index d5327711d3..582ce898de 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -141,6 +141,7 @@ class TestGevent(setuptools.Command): 'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call', 'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call', 'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call', + 'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list', # TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels', 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels_and_sockets', -- cgit v1.2.3 From e678187996bb7239315f30d9e50734c50ee4027b Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Thu, 27 Dec 2018 09:39:53 -0800 Subject: use test constants, fix formatting --- .../grpc_health/v1/health.py | 10 ++++---- .../tests/health_check/_health_servicer_test.py | 27 +++++++++++----------- 2 files changed, 18 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 75c480b0a7..0a5bbb5504 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -102,11 +102,11 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): def set(self, service, status): """Sets the status of a service. - Args: - service: string, the name of the service. NOTE, '' must be set. - status: HealthCheckResponse.status enum value indicating the status of - the service - """ + Args: + service: string, the name of the service. NOTE, '' must be set. + status: HealthCheckResponse.status enum value indicating the status of + the service + """ with self._lock: self._server_status[service] = status if service in self._watchers: diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index 657ceef1e4..bf90fa15c0 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -22,11 +22,10 @@ from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from tests.unit import test_common +from tests.unit.framework.common import test_constants from six.moves import queue -_QUEUE_TIMEOUT_S = 5 - _SERVING_SERVICE = 'grpc.test.TestServiceServing' _UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' _NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' @@ -98,7 +97,7 @@ class HealthServicerTest(unittest.TestCase): target=_consume_responses, args=(rendezvous, response_queue)) thread.start() - response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response.status) @@ -114,19 +113,19 @@ class HealthServicerTest(unittest.TestCase): target=_consume_responses, args=(rendezvous, response_queue)) thread.start() - response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status) self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING) - response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response.status) self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING) - response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, response.status) @@ -142,14 +141,14 @@ class HealthServicerTest(unittest.TestCase): target=_consume_responses, args=(rendezvous, response_queue)) thread.start() - response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status) self._servicer.set('some-other-service', health_pb2.HealthCheckResponse.SERVING) - with self.assertRaises(queue.Empty) as context: - response_queue.get(timeout=_QUEUE_TIMEOUT_S) + with self.assertRaises(queue.Empty): + response_queue.get(timeout=test_constants.SHORT_TIMEOUT) rendezvous.cancel() thread.join() @@ -168,8 +167,8 @@ class HealthServicerTest(unittest.TestCase): thread1.start() thread2.start() - response1 = response_queue1.get(timeout=_QUEUE_TIMEOUT_S) - response2 = response_queue2.get(timeout=_QUEUE_TIMEOUT_S) + response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT) + response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response1.status) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, @@ -177,8 +176,8 @@ class HealthServicerTest(unittest.TestCase): self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING) - response1 = response_queue1.get(timeout=_QUEUE_TIMEOUT_S) - response2 = response_queue2.get(timeout=_QUEUE_TIMEOUT_S) + response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT) + response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response1.status) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, @@ -199,7 +198,7 @@ class HealthServicerTest(unittest.TestCase): target=_consume_responses, args=(rendezvous, response_queue)) thread.start() - response = response_queue.get(timeout=_QUEUE_TIMEOUT_S) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status) -- cgit v1.2.3 From 4e3e46df2249bbd6ba8f3330c0a44ca508d0f35c Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Thu, 27 Dec 2018 11:53:54 -0800 Subject: fix test --- src/python/grpcio_tests/tests/health_check/BUILD.bazel | 1 + src/python/grpcio_tests/tests/health_check/_health_servicer_test.py | 6 ++++++ 2 files changed, 7 insertions(+) (limited to 'src') diff --git a/src/python/grpcio_tests/tests/health_check/BUILD.bazel b/src/python/grpcio_tests/tests/health_check/BUILD.bazel index 19e1e1b2e1..77bc61aa30 100644 --- a/src/python/grpcio_tests/tests/health_check/BUILD.bazel +++ b/src/python/grpcio_tests/tests/health_check/BUILD.bazel @@ -9,6 +9,7 @@ py_test( "//src/python/grpcio/grpc:grpcio", "//src/python/grpcio_health_checking/grpc_health/v1:grpc_health", "//src/python/grpcio_tests/tests/unit:test_common", + "//src/python/grpcio_tests/tests/unit/framework/common:common", ], imports = ["../../",], ) diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index bf90fa15c0..35794987bc 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -14,6 +14,7 @@ """Tests of grpc_health.v1.health.""" import threading +import time import unittest import grpc @@ -206,6 +207,11 @@ class HealthServicerTest(unittest.TestCase): self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING) thread.join() + + # Wait, if necessary, for serving thread to process client cancellation + timeout = time.time() + test_constants.SHORT_TIMEOUT + while time.time() < timeout and self._servicer._watchers[_WATCH_SERVICE]: + time.sleep(1) self.assertFalse(self._servicer._watchers[_WATCH_SERVICE], 'watch set should be empty') self.assertTrue(response_queue.empty()) -- cgit v1.2.3