aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Eric Gribkoff <ericgribkoff@google.com>2018-12-27 14:36:01 -0800
committerGravatar GitHub <noreply@github.com>2018-12-27 14:36:01 -0800
commitc4d5b7807780bdd6de7234d131ffe5935707d710 (patch)
treeaba80a3d58bc6f9e0b9593101ea5dd3fc896baca /src
parentfc7d0911a3a44d7bc926d3db99b7300a0c0f33dc (diff)
parent4e3e46df2249bbd6ba8f3330c0a44ca508d0f35c (diff)
Merge pull request #17597 from ericgribkoff/watch_method
Add Watch method to health check service
Diffstat (limited to 'src')
-rw-r--r--src/python/grpcio_health_checking/grpc_health/v1/health.py80
-rw-r--r--src/python/grpcio_tests/commands.py1
-rw-r--r--src/python/grpcio_tests/tests/health_check/BUILD.bazel1
-rw-r--r--src/python/grpcio_tests/tests/health_check/_health_servicer_test.py179
4 files changed, 232 insertions, 29 deletions
diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py
index 0583659428..0a5bbb5504 100644
--- a/src/python/grpcio_health_checking/grpc_health/v1/health.py
+++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py
@@ -23,15 +23,61 @@ from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name
+class _Watcher():
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._responses = list()
+ self._open = True
+
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ with self._condition:
+ while not self._responses and self._open:
+ self._condition.wait()
+ if self._responses:
+ return self._responses.pop(0)
+ else:
+ raise StopIteration()
+
+ def next(self):
+ return self._next()
+
+ def __next__(self):
+ return self._next()
+
+ def add(self, response):
+ with self._condition:
+ self._responses.append(response)
+ self._condition.notify()
+
+ def close(self):
+ with self._condition:
+ self._open = False
+ self._condition.notify()
+
+
class HealthServicer(_health_pb2_grpc.HealthServicer):
"""Servicer handling RPCs for service statuses."""
def __init__(self):
- self._server_status_lock = threading.Lock()
+ self._lock = threading.RLock()
self._server_status = {}
+ self._watchers = {}
+
+ def _on_close_callback(self, watcher, service):
+
+ def callback():
+ with self._lock:
+ self._watchers[service].remove(watcher)
+ watcher.close()
+
+ return callback
def Check(self, request, context):
- with self._server_status_lock:
+ with self._lock:
status = self._server_status.get(request.service)
if status is None:
context.set_code(grpc.StatusCode.NOT_FOUND)
@@ -39,14 +85,30 @@ class HealthServicer(_health_pb2_grpc.HealthServicer):
else:
return _health_pb2.HealthCheckResponse(status=status)
+ def Watch(self, request, context):
+ service = request.service
+ with self._lock:
+ status = self._server_status.get(service)
+ if status is None:
+ status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member
+ watcher = _Watcher()
+ watcher.add(_health_pb2.HealthCheckResponse(status=status))
+ if service not in self._watchers:
+ self._watchers[service] = set()
+ self._watchers[service].add(watcher)
+ context.add_callback(self._on_close_callback(watcher, service))
+ return watcher
+
def set(self, service, status):
"""Sets the status of a service.
- Args:
- service: string, the name of the service.
- NOTE, '' must be set.
- status: HealthCheckResponse.status enum value indicating
- the status of the service
- """
- with self._server_status_lock:
+ Args:
+ service: string, the name of the service. NOTE, '' must be set.
+ status: HealthCheckResponse.status enum value indicating the status of
+ the service
+ """
+ with self._lock:
self._server_status[service] = status
+ if service in self._watchers:
+ for watcher in self._watchers[service]:
+ watcher.add(_health_pb2.HealthCheckResponse(status=status))
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index d5327711d3..582ce898de 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -141,6 +141,7 @@ class TestGevent(setuptools.Command):
'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call',
'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call',
'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call',
+ 'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list',
# TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests
'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels',
'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels_and_sockets',
diff --git a/src/python/grpcio_tests/tests/health_check/BUILD.bazel b/src/python/grpcio_tests/tests/health_check/BUILD.bazel
index 19e1e1b2e1..77bc61aa30 100644
--- a/src/python/grpcio_tests/tests/health_check/BUILD.bazel
+++ b/src/python/grpcio_tests/tests/health_check/BUILD.bazel
@@ -9,6 +9,7 @@ py_test(
"//src/python/grpcio/grpc:grpcio",
"//src/python/grpcio_health_checking/grpc_health/v1:grpc_health",
"//src/python/grpcio_tests/tests/unit:test_common",
+ "//src/python/grpcio_tests/tests/unit/framework/common:common",
],
imports = ["../../",],
)
diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
index c1d9436c2f..35794987bc 100644
--- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
+++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py
@@ -13,6 +13,8 @@
# limitations under the License.
"""Tests of grpc_health.v1.health."""
+import threading
+import time
import unittest
import grpc
@@ -21,22 +23,36 @@ from grpc_health.v1 import health_pb2
from grpc_health.v1 import health_pb2_grpc
from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+
+from six.moves import queue
+
+_SERVING_SERVICE = 'grpc.test.TestServiceServing'
+_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
+_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
+_WATCH_SERVICE = 'grpc.test.WatchService'
+
+
+def _consume_responses(response_iterator, response_queue):
+ for response in response_iterator:
+ response_queue.put(response)
class HealthServicerTest(unittest.TestCase):
def setUp(self):
- servicer = health.HealthServicer()
- servicer.set('', health_pb2.HealthCheckResponse.SERVING)
- servicer.set('grpc.test.TestServiceServing',
- health_pb2.HealthCheckResponse.SERVING)
- servicer.set('grpc.test.TestServiceUnknown',
- health_pb2.HealthCheckResponse.UNKNOWN)
- servicer.set('grpc.test.TestServiceNotServing',
- health_pb2.HealthCheckResponse.NOT_SERVING)
+ self._servicer = health.HealthServicer()
+ self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
+ self._servicer.set(_SERVING_SERVICE,
+ health_pb2.HealthCheckResponse.SERVING)
+ self._servicer.set(_UNKNOWN_SERVICE,
+ health_pb2.HealthCheckResponse.UNKNOWN)
+ self._servicer.set(_NOT_SERVING_SERVICE,
+ health_pb2.HealthCheckResponse.NOT_SERVING)
self._server = test_common.test_server()
port = self._server.add_insecure_port('[::]:0')
- health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server)
+ health_pb2_grpc.add_HealthServicer_to_server(self._servicer,
+ self._server)
self._server.start()
self._channel = grpc.insecure_channel('localhost:%d' % port)
@@ -46,37 +62,160 @@ class HealthServicerTest(unittest.TestCase):
self._server.stop(None)
self._channel.close()
- def test_empty_service(self):
+ def test_check_empty_service(self):
request = health_pb2.HealthCheckRequest()
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
- def test_serving_service(self):
- request = health_pb2.HealthCheckRequest(
- service='grpc.test.TestServiceServing')
+ def test_check_serving_service(self):
+ request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
- def test_unknown_serivce(self):
- request = health_pb2.HealthCheckRequest(
- service='grpc.test.TestServiceUnknown')
+ def test_check_unknown_serivce(self):
+ request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
- def test_not_serving_service(self):
- request = health_pb2.HealthCheckRequest(
- service='grpc.test.TestServiceNotServing')
+ def test_check_not_serving_service(self):
+ request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
resp = self._stub.Check(request)
self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
resp.status)
- def test_not_found_service(self):
+ def test_check_not_found_service(self):
request = health_pb2.HealthCheckRequest(service='not-found')
with self.assertRaises(grpc.RpcError) as context:
resp = self._stub.Check(request)
self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
+ def test_watch_empty_service(self):
+ request = health_pb2.HealthCheckRequest(service='')
+ response_queue = queue.Queue()
+ rendezvous = self._stub.Watch(request)
+ thread = threading.Thread(
+ target=_consume_responses, args=(rendezvous, response_queue))
+ thread.start()
+
+ response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
+ response.status)
+
+ rendezvous.cancel()
+ thread.join()
+ self.assertTrue(response_queue.empty())
+
+ def test_watch_new_service(self):
+ request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
+ response_queue = queue.Queue()
+ rendezvous = self._stub.Watch(request)
+ thread = threading.Thread(
+ target=_consume_responses, args=(rendezvous, response_queue))
+ thread.start()
+
+ response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
+ response.status)
+
+ self._servicer.set(_WATCH_SERVICE,
+ health_pb2.HealthCheckResponse.SERVING)
+ response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
+ response.status)
+
+ self._servicer.set(_WATCH_SERVICE,
+ health_pb2.HealthCheckResponse.NOT_SERVING)
+ response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
+ response.status)
+
+ rendezvous.cancel()
+ thread.join()
+ self.assertTrue(response_queue.empty())
+
+ def test_watch_service_isolation(self):
+ request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
+ response_queue = queue.Queue()
+ rendezvous = self._stub.Watch(request)
+ thread = threading.Thread(
+ target=_consume_responses, args=(rendezvous, response_queue))
+ thread.start()
+
+ response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
+ response.status)
+
+ self._servicer.set('some-other-service',
+ health_pb2.HealthCheckResponse.SERVING)
+ with self.assertRaises(queue.Empty):
+ response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+
+ rendezvous.cancel()
+ thread.join()
+ self.assertTrue(response_queue.empty())
+
+ def test_two_watchers(self):
+ request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
+ response_queue1 = queue.Queue()
+ response_queue2 = queue.Queue()
+ rendezvous1 = self._stub.Watch(request)
+ rendezvous2 = self._stub.Watch(request)
+ thread1 = threading.Thread(
+ target=_consume_responses, args=(rendezvous1, response_queue1))
+ thread2 = threading.Thread(
+ target=_consume_responses, args=(rendezvous2, response_queue2))
+ thread1.start()
+ thread2.start()
+
+ response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT)
+ response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
+ response1.status)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
+ response2.status)
+
+ self._servicer.set(_WATCH_SERVICE,
+ health_pb2.HealthCheckResponse.SERVING)
+ response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT)
+ response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
+ response1.status)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
+ response2.status)
+
+ rendezvous1.cancel()
+ rendezvous2.cancel()
+ thread1.join()
+ thread2.join()
+ self.assertTrue(response_queue1.empty())
+ self.assertTrue(response_queue2.empty())
+
+ def test_cancelled_watch_removed_from_watch_list(self):
+ request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
+ response_queue = queue.Queue()
+ rendezvous = self._stub.Watch(request)
+ thread = threading.Thread(
+ target=_consume_responses, args=(rendezvous, response_queue))
+ thread.start()
+
+ response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
+ self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
+ response.status)
+
+ rendezvous.cancel()
+ self._servicer.set(_WATCH_SERVICE,
+ health_pb2.HealthCheckResponse.SERVING)
+ thread.join()
+
+ # Wait, if necessary, for serving thread to process client cancellation
+ timeout = time.time() + test_constants.SHORT_TIMEOUT
+ while time.time() < timeout and self._servicer._watchers[_WATCH_SERVICE]:
+ time.sleep(1)
+ self.assertFalse(self._servicer._watchers[_WATCH_SERVICE],
+ 'watch set should be empty')
+ self.assertTrue(response_queue.empty())
+
def test_health_service_name(self):
self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')