# Copyright 2016 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Tests of grpc_health.v1.health.""" import threading import time import unittest import grpc from grpc_health.v1 import health from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from tests.unit import test_common from tests.unit.framework.common import test_constants from six.moves import queue _SERVING_SERVICE = 'grpc.test.TestServiceServing' _UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' _NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' _WATCH_SERVICE = 'grpc.test.WatchService' def _consume_responses(response_iterator, response_queue): for response in response_iterator: response_queue.put(response) class HealthServicerTest(unittest.TestCase): def setUp(self): self._servicer = health.HealthServicer() self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) self._servicer.set(_SERVING_SERVICE, health_pb2.HealthCheckResponse.SERVING) self._servicer.set(_UNKNOWN_SERVICE, health_pb2.HealthCheckResponse.UNKNOWN) self._servicer.set(_NOT_SERVING_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING) self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') health_pb2_grpc.add_HealthServicer_to_server(self._servicer, self._server) self._server.start() self._channel = grpc.insecure_channel('localhost:%d' % port) self._stub = health_pb2_grpc.HealthStub(self._channel) def tearDown(self): self._server.stop(None) self._channel.close() def test_check_empty_service(self): request = health_pb2.HealthCheckRequest() resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) def test_check_serving_service(self): request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) def test_check_unknown_serivce(self): request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) def test_check_not_serving_service(self): request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, resp.status) def test_check_not_found_service(self): request = health_pb2.HealthCheckRequest(service='not-found') with self.assertRaises(grpc.RpcError) as context: resp = self._stub.Check(request) self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) def test_watch_empty_service(self): request = health_pb2.HealthCheckRequest(service='') response_queue = queue.Queue() rendezvous = self._stub.Watch(request) thread = threading.Thread( target=_consume_responses, args=(rendezvous, response_queue)) thread.start() response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response.status) rendezvous.cancel() thread.join() self.assertTrue(response_queue.empty()) def test_watch_new_service(self): request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) response_queue = queue.Queue() rendezvous = self._stub.Watch(request) thread = threading.Thread( target=_consume_responses, args=(rendezvous, response_queue)) thread.start() response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status) self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING) response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response.status) self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING) response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, response.status) rendezvous.cancel() thread.join() self.assertTrue(response_queue.empty()) def test_watch_service_isolation(self): request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) response_queue = queue.Queue() rendezvous = self._stub.Watch(request) thread = threading.Thread( target=_consume_responses, args=(rendezvous, response_queue)) thread.start() response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status) self._servicer.set('some-other-service', health_pb2.HealthCheckResponse.SERVING) with self.assertRaises(queue.Empty): response_queue.get(timeout=test_constants.SHORT_TIMEOUT) rendezvous.cancel() thread.join() self.assertTrue(response_queue.empty()) def test_two_watchers(self): request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) response_queue1 = queue.Queue() response_queue2 = queue.Queue() rendezvous1 = self._stub.Watch(request) rendezvous2 = self._stub.Watch(request) thread1 = threading.Thread( target=_consume_responses, args=(rendezvous1, response_queue1)) thread2 = threading.Thread( target=_consume_responses, args=(rendezvous2, response_queue2)) thread1.start() thread2.start() response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT) response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response1.status) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response2.status) self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING) response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT) response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response1.status) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, response2.status) rendezvous1.cancel() rendezvous2.cancel() thread1.join() thread2.join() self.assertTrue(response_queue1.empty()) self.assertTrue(response_queue2.empty()) def test_cancelled_watch_removed_from_watch_list(self): request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) response_queue = queue.Queue() rendezvous = self._stub.Watch(request) thread = threading.Thread( target=_consume_responses, args=(rendezvous, response_queue)) thread.start() response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status) rendezvous.cancel() self._servicer.set(_WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING) thread.join() # Wait, if necessary, for serving thread to process client cancellation timeout = time.time() + test_constants.SHORT_TIMEOUT while time.time() < timeout and self._servicer._watchers[_WATCH_SERVICE]: time.sleep(1) self.assertFalse(self._servicer._watchers[_WATCH_SERVICE], 'watch set should be empty') self.assertTrue(response_queue.empty()) def test_health_service_name(self): self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') if __name__ == '__main__': unittest.main(verbosity=2)