diff options
Diffstat (limited to 'src/python')
24 files changed, 382 insertions, 110 deletions
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index daf869b156..70d7618e05 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -23,6 +23,11 @@ from grpc._cython import cygrpc as _cygrpc logging.getLogger(__name__).addHandler(logging.NullHandler()) +try: + from ._grpcio_metadata import __version__ +except ImportError: + __version__ = "dev0" + ############################## Future Interface ############################### diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi index e0e068e452..01b8237484 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pxd.pxi @@ -28,19 +28,22 @@ cdef tuple _wrap_grpc_arg(grpc_arg arg) cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg) -cdef class _ArgumentProcessor: +cdef class _ChannelArg: cdef grpc_arg c_argument cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except * -cdef class _ArgumentsProcessor: +cdef class _ChannelArgs: cdef readonly tuple _arguments - cdef list _argument_processors + cdef list _channel_args cdef readonly list _references cdef grpc_channel_args _c_arguments - cdef grpc_channel_args *c(self, grpc_arg_pointer_vtable *vtable) except * - cdef un_c(self) + cdef void _c(self, grpc_arg_pointer_vtable *vtable) except * + cdef grpc_channel_args *c_args(self) except * + + @staticmethod + cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi index b7a4277ff6..bf12871015 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/arguments.pyx.pxi @@ -50,7 +50,7 @@ cdef grpc_arg _unwrap_grpc_arg(tuple wrapped_arg): return wrapped.arg -cdef class _ArgumentProcessor: +cdef class _ChannelArg: cdef void c(self, argument, grpc_arg_pointer_vtable *vtable, references) except *: key, value = argument @@ -82,27 +82,34 @@ cdef class _ArgumentProcessor: 'Expected int, bytes, or behavior, got {}'.format(type(value))) -cdef class _ArgumentsProcessor: +cdef class _ChannelArgs: def __cinit__(self, arguments): self._arguments = () if arguments is None else tuple(arguments) - self._argument_processors = [] + self._channel_args = [] self._references = [] + self._c_arguments.arguments = NULL - cdef grpc_channel_args *c(self, grpc_arg_pointer_vtable *vtable) except *: + cdef void _c(self, grpc_arg_pointer_vtable *vtable) except *: self._c_arguments.arguments_length = len(self._arguments) - if self._c_arguments.arguments_length == 0: - return NULL - else: + if self._c_arguments.arguments_length != 0: self._c_arguments.arguments = <grpc_arg *>gpr_malloc( self._c_arguments.arguments_length * sizeof(grpc_arg)) for index, argument in enumerate(self._arguments): - argument_processor = _ArgumentProcessor() - argument_processor.c(argument, vtable, self._references) - self._c_arguments.arguments[index] = argument_processor.c_argument - self._argument_processors.append(argument_processor) - return &self._c_arguments - - cdef un_c(self): - if self._arguments: + channel_arg = _ChannelArg() + channel_arg.c(argument, vtable, self._references) + self._c_arguments.arguments[index] = channel_arg.c_argument + self._channel_args.append(channel_arg) + + cdef grpc_channel_args *c_args(self) except *: + return &self._c_arguments + + def __dealloc__(self): + if self._c_arguments.arguments != NULL: gpr_free(self._c_arguments.arguments) + + @staticmethod + cdef _ChannelArgs from_args(object arguments, grpc_arg_pointer_vtable * vtable): + cdef _ChannelArgs channel_args = _ChannelArgs(arguments) + channel_args._c(vtable) + return channel_args diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi index 135d224095..70d4abb730 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi @@ -423,16 +423,15 @@ cdef class Channel: self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer - cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( - arguments) - cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) + cdef _ChannelArgs channel_args = _ChannelArgs.from_args( + arguments, &self._vtable) if channel_credentials is None: self._state.c_channel = grpc_insecure_channel_create( - <char *>target, c_arguments, NULL) + <char *>target, channel_args.c_args(), NULL) else: c_channel_credentials = channel_credentials.c() self._state.c_channel = grpc_secure_channel_create( - c_channel_credentials, <char *>target, c_arguments, NULL) + c_channel_credentials, <char *>target, channel_args.c_args(), NULL) grpc_channel_credentials_release(c_channel_credentials) self._state.c_call_completion_queue = ( grpc_completion_queue_create_for_next(NULL)) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi index 52cfccb677..4a6fbe0f96 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pxd.pxi @@ -16,7 +16,6 @@ cdef class Server: cdef grpc_arg_pointer_vtable _vtable - cdef readonly _ArgumentsProcessor _arguments_processor cdef grpc_server *c_server cdef bint is_started # start has been called cdef bint is_shutting_down # shutdown has been called diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index e89e02b171..d72648a35d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -29,11 +29,9 @@ cdef class Server: self._vtable.copy = &_copy_pointer self._vtable.destroy = &_destroy_pointer self._vtable.cmp = &_compare_pointer - cdef _ArgumentsProcessor arguments_processor = _ArgumentsProcessor( - arguments) - cdef grpc_channel_args *c_arguments = arguments_processor.c(&self._vtable) - self.c_server = grpc_server_create(c_arguments, NULL) - arguments_processor.un_c() + cdef _ChannelArgs channel_args = _ChannelArgs.from_args( + arguments, &self._vtable) + self.c_server = grpc_server_create(channel_args.c_args(), NULL) self.references.append(arguments) self.is_started = False self.is_shutting_down = False diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 7a9f173947..dd9d436c3f 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.18.0.dev0""" +__version__ = """1.19.0.dev0""" diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 6a1fd676ca..06de23903c 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -102,6 +102,8 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/grpc_if_nametoindex_posix.cc', + 'src/core/lib/iomgr/grpc_if_nametoindex_unsupported.cc', 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 2e91818d2c..8e2f4d30bb 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_channelz/grpc_version.py b/src/python/grpcio_channelz/grpc_version.py index 16356ea402..5f3a894a2a 100644 --- a/src/python/grpcio_channelz/grpc_version.py +++ b/src/python/grpcio_channelz/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_channelz/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/v1/health.py index 0583659428..0a5bbb5504 100644 --- a/src/python/grpcio_health_checking/grpc_health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc_health/v1/health.py @@ -23,15 +23,61 @@ from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name +class _Watcher(): + + def __init__(self): + self._condition = threading.Condition() + self._responses = list() + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._responses and self._open: + self._condition.wait() + if self._responses: + return self._responses.pop(0) + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, response): + with self._condition: + self._responses.append(response) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + class HealthServicer(_health_pb2_grpc.HealthServicer): """Servicer handling RPCs for service statuses.""" def __init__(self): - self._server_status_lock = threading.Lock() + self._lock = threading.RLock() self._server_status = {} + self._watchers = {} + + def _on_close_callback(self, watcher, service): + + def callback(): + with self._lock: + self._watchers[service].remove(watcher) + watcher.close() + + return callback def Check(self, request, context): - with self._server_status_lock: + with self._lock: status = self._server_status.get(request.service) if status is None: context.set_code(grpc.StatusCode.NOT_FOUND) @@ -39,14 +85,30 @@ class HealthServicer(_health_pb2_grpc.HealthServicer): else: return _health_pb2.HealthCheckResponse(status=status) + def Watch(self, request, context): + service = request.service + with self._lock: + status = self._server_status.get(service) + if status is None: + status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member + watcher = _Watcher() + watcher.add(_health_pb2.HealthCheckResponse(status=status)) + if service not in self._watchers: + self._watchers[service] = set() + self._watchers[service].add(watcher) + context.add_callback(self._on_close_callback(watcher, service)) + return watcher + def set(self, service, status): """Sets the status of a service. - Args: - service: string, the name of the service. - NOTE, '' must be set. - status: HealthCheckResponse.status enum value indicating - the status of the service - """ - with self._server_status_lock: + Args: + service: string, the name of the service. NOTE, '' must be set. + status: HealthCheckResponse.status enum value indicating the status of + the service + """ + with self._lock: self._server_status[service] = status + if service in self._watchers: + for watcher in self._watchers[service]: + watcher.add(_health_pb2.HealthCheckResponse(status=status)) diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index 85fa762f7e..4c2d434066 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index e62ab169a2..6b88b2dfc5 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_status/grpc_version.py b/src/python/grpcio_status/grpc_version.py index e009843b94..2e58eb3b26 100644 --- a/src/python/grpcio_status/grpc_version.py +++ b/src/python/grpcio_status/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_status/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 7b4c1695fa..d4c5d94ecb 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index d5327711d3..582ce898de 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -141,6 +141,7 @@ class TestGevent(setuptools.Command): 'unit._exit_test.ExitTest.test_in_flight_partial_unary_stream_call', 'unit._exit_test.ExitTest.test_in_flight_partial_stream_unary_call', 'unit._exit_test.ExitTest.test_in_flight_partial_stream_stream_call', + 'health_check._health_servicer_test.HealthServicerTest.test_cancelled_watch_removed_from_watch_list', # TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels', 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels_and_sockets', diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index 2fcd1ad617..e1645ab1b8 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.18.0.dev0' +VERSION = '1.19.0.dev0' diff --git a/src/python/grpcio_tests/tests/health_check/BUILD.bazel b/src/python/grpcio_tests/tests/health_check/BUILD.bazel index 19e1e1b2e1..77bc61aa30 100644 --- a/src/python/grpcio_tests/tests/health_check/BUILD.bazel +++ b/src/python/grpcio_tests/tests/health_check/BUILD.bazel @@ -9,6 +9,7 @@ py_test( "//src/python/grpcio/grpc:grpcio", "//src/python/grpcio_health_checking/grpc_health/v1:grpc_health", "//src/python/grpcio_tests/tests/unit:test_common", + "//src/python/grpcio_tests/tests/unit/framework/common:common", ], imports = ["../../",], ) diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index c1d9436c2f..35794987bc 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -13,6 +13,8 @@ # limitations under the License. """Tests of grpc_health.v1.health.""" +import threading +import time import unittest import grpc @@ -21,22 +23,36 @@ from grpc_health.v1 import health_pb2 from grpc_health.v1 import health_pb2_grpc from tests.unit import test_common +from tests.unit.framework.common import test_constants + +from six.moves import queue + +_SERVING_SERVICE = 'grpc.test.TestServiceServing' +_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' +_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' +_WATCH_SERVICE = 'grpc.test.WatchService' + + +def _consume_responses(response_iterator, response_queue): + for response in response_iterator: + response_queue.put(response) class HealthServicerTest(unittest.TestCase): def setUp(self): - servicer = health.HealthServicer() - servicer.set('', health_pb2.HealthCheckResponse.SERVING) - servicer.set('grpc.test.TestServiceServing', - health_pb2.HealthCheckResponse.SERVING) - servicer.set('grpc.test.TestServiceUnknown', - health_pb2.HealthCheckResponse.UNKNOWN) - servicer.set('grpc.test.TestServiceNotServing', - health_pb2.HealthCheckResponse.NOT_SERVING) + self._servicer = health.HealthServicer() + self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) + self._servicer.set(_SERVING_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + self._servicer.set(_UNKNOWN_SERVICE, + health_pb2.HealthCheckResponse.UNKNOWN) + self._servicer.set(_NOT_SERVING_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) self._server = test_common.test_server() port = self._server.add_insecure_port('[::]:0') - health_pb2_grpc.add_HealthServicer_to_server(servicer, self._server) + health_pb2_grpc.add_HealthServicer_to_server(self._servicer, + self._server) self._server.start() self._channel = grpc.insecure_channel('localhost:%d' % port) @@ -46,37 +62,160 @@ class HealthServicerTest(unittest.TestCase): self._server.stop(None) self._channel.close() - def test_empty_service(self): + def test_check_empty_service(self): request = health_pb2.HealthCheckRequest() resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) - def test_serving_service(self): - request = health_pb2.HealthCheckRequest( - service='grpc.test.TestServiceServing') + def test_check_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) - def test_unknown_serivce(self): - request = health_pb2.HealthCheckRequest( - service='grpc.test.TestServiceUnknown') + def test_check_unknown_serivce(self): + request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) - def test_not_serving_service(self): - request = health_pb2.HealthCheckRequest( - service='grpc.test.TestServiceNotServing') + def test_check_not_serving_service(self): + request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) resp = self._stub.Check(request) self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, resp.status) - def test_not_found_service(self): + def test_check_not_found_service(self): request = health_pb2.HealthCheckRequest(service='not-found') with self.assertRaises(grpc.RpcError) as context: resp = self._stub.Check(request) self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) + def test_watch_empty_service(self): + request = health_pb2.HealthCheckRequest(service='') + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + + def test_watch_new_service(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response.status) + + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.NOT_SERVING) + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, + response.status) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + + def test_watch_service_isolation(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + self._servicer.set('some-other-service', + health_pb2.HealthCheckResponse.SERVING) + with self.assertRaises(queue.Empty): + response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + + rendezvous.cancel() + thread.join() + self.assertTrue(response_queue.empty()) + + def test_two_watchers(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue1 = queue.Queue() + response_queue2 = queue.Queue() + rendezvous1 = self._stub.Watch(request) + rendezvous2 = self._stub.Watch(request) + thread1 = threading.Thread( + target=_consume_responses, args=(rendezvous1, response_queue1)) + thread2 = threading.Thread( + target=_consume_responses, args=(rendezvous2, response_queue2)) + thread1.start() + thread2.start() + + response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT) + response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response1.status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response2.status) + + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + response1 = response_queue1.get(timeout=test_constants.SHORT_TIMEOUT) + response2 = response_queue2.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response1.status) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, + response2.status) + + rendezvous1.cancel() + rendezvous2.cancel() + thread1.join() + thread2.join() + self.assertTrue(response_queue1.empty()) + self.assertTrue(response_queue2.empty()) + + def test_cancelled_watch_removed_from_watch_list(self): + request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) + response_queue = queue.Queue() + rendezvous = self._stub.Watch(request) + thread = threading.Thread( + target=_consume_responses, args=(rendezvous, response_queue)) + thread.start() + + response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) + self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, + response.status) + + rendezvous.cancel() + self._servicer.set(_WATCH_SERVICE, + health_pb2.HealthCheckResponse.SERVING) + thread.join() + + # Wait, if necessary, for serving thread to process client cancellation + timeout = time.time() + test_constants.SHORT_TIMEOUT + while time.time() < timeout and self._servicer._watchers[_WATCH_SERVICE]: + time.sleep(1) + self.assertFalse(self._servicer._watchers[_WATCH_SERVICE], + 'watch set should be empty') + self.assertTrue(response_queue.empty()) + def test_health_service_name(self): self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index f202a3f932..de4c2c1fdd 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -64,6 +64,7 @@ "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", "unit._server_test.ServerTest", "unit._session_cache_test.SSLSessionCacheTest", + "unit._version_test.VersionTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", diff --git a/src/python/grpcio_tests/tests/unit/BUILD.bazel b/src/python/grpcio_tests/tests/unit/BUILD.bazel index 1b462ec67a..a9bcd9f304 100644 --- a/src/python/grpcio_tests/tests/unit/BUILD.bazel +++ b/src/python/grpcio_tests/tests/unit/BUILD.bazel @@ -7,6 +7,7 @@ GRPCIO_TESTS_UNIT = [ "_api_test.py", "_auth_context_test.py", "_auth_test.py", + "_version_test.py", "_channel_args_test.py", "_channel_close_test.py", "_channel_connectivity_test.py", diff --git a/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py b/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py index aeb02458a7..5a5dedd5f2 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_fork_test.py @@ -27,6 +27,7 @@ def _get_number_active_threads(): class ForkPosixTester(unittest.TestCase): def setUp(self): + self._saved_fork_support_flag = cygrpc._GRPC_ENABLE_FORK_SUPPORT cygrpc._GRPC_ENABLE_FORK_SUPPORT = True def testForkManagedThread(self): @@ -50,6 +51,9 @@ class ForkPosixTester(unittest.TestCase): thread.join() self.assertEqual(0, _get_number_active_threads()) + def tearDown(self): + cygrpc._GRPC_ENABLE_FORK_SUPPORT = self._saved_fork_support_flag + @unittest.skipUnless(os.name == 'nt', 'Windows-specific tests') class ForkWindowsTester(unittest.TestCase): diff --git a/src/python/grpcio_tests/tests/unit/_logging_test.py b/src/python/grpcio_tests/tests/unit/_logging_test.py index 631b9de9db..8ff127f506 100644 --- a/src/python/grpcio_tests/tests/unit/_logging_test.py +++ b/src/python/grpcio_tests/tests/unit/_logging_test.py @@ -14,66 +14,86 @@ """Test of gRPC Python's interaction with the python logging module""" import unittest -import six -from six.moves import reload_module import logging import grpc -import functools +import subprocess import sys +INTERPRETER = sys.executable -def patch_stderr(f): - @functools.wraps(f) - def _impl(*args, **kwargs): - old_stderr = sys.stderr - sys.stderr = six.StringIO() - try: - f(*args, **kwargs) - finally: - sys.stderr = old_stderr +class LoggingTest(unittest.TestCase): - return _impl + def test_logger_not_occupied(self): + script = """if True: + import logging + import grpc -def isolated_logging(f): + if len(logging.getLogger().handlers) != 0: + raise Exception('expected 0 logging handlers') - @functools.wraps(f) - def _impl(*args, **kwargs): - reload_module(logging) - reload_module(grpc) - try: - f(*args, **kwargs) - finally: - reload_module(logging) + """ + self._verifyScriptSucceeds(script) - return _impl + def test_handler_found(self): + script = """if True: + import logging + import grpc + """ + out, err = self._verifyScriptSucceeds(script) + self.assertEqual(0, len(err), 'unexpected output to stderr') -class LoggingTest(unittest.TestCase): + def test_can_configure_logger(self): + script = """if True: + import logging + import six - @isolated_logging - def test_logger_not_occupied(self): - self.assertEqual(0, len(logging.getLogger().handlers)) + import grpc - @patch_stderr - @isolated_logging - def test_handler_found(self): - self.assertEqual(0, len(sys.stderr.getvalue())) - @isolated_logging - def test_can_configure_logger(self): - intended_stream = six.StringIO() - logging.basicConfig(stream=intended_stream) - self.assertEqual(1, len(logging.getLogger().handlers)) - self.assertIs(logging.getLogger().handlers[0].stream, intended_stream) + intended_stream = six.StringIO() + logging.basicConfig(stream=intended_stream) + + if len(logging.getLogger().handlers) != 1: + raise Exception('expected 1 logging handler') + + if logging.getLogger().handlers[0].stream is not intended_stream: + raise Exception('wrong handler stream') + + """ + self._verifyScriptSucceeds(script) - @isolated_logging def test_grpc_logger(self): - self.assertIn("grpc", logging.Logger.manager.loggerDict) - root_logger = logging.getLogger("grpc") - self.assertEqual(1, len(root_logger.handlers)) - self.assertIsInstance(root_logger.handlers[0], logging.NullHandler) + script = """if True: + import logging + + import grpc + + if "grpc" not in logging.Logger.manager.loggerDict: + raise Exception('grpc logger not found') + + root_logger = logging.getLogger("grpc") + if len(root_logger.handlers) != 1: + raise Exception('expected 1 root logger handler') + if not isinstance(root_logger.handlers[0], logging.NullHandler): + raise Exception('expected logging.NullHandler') + + """ + self._verifyScriptSucceeds(script) + + def _verifyScriptSucceeds(self, script): + process = subprocess.Popen( + [INTERPRETER, '-c', script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = process.communicate() + self.assertEqual( + 0, process.returncode, + 'process failed with exit code %d (stdout: %s, stderr: %s)' % + (process.returncode, out, err)) + return out, err if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_version_test.py b/src/python/grpcio_tests/tests/unit/_version_test.py new file mode 100644 index 0000000000..3d37b319e5 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_version_test.py @@ -0,0 +1,30 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test for grpc.__version__""" + +import unittest +import grpc +import logging +from grpc import _grpcio_metadata + + +class VersionTest(unittest.TestCase): + + def test_get_version(self): + self.assertEqual(grpc.__version__, _grpcio_metadata.__version__) + + +if __name__ == '__main__': + logging.basicConfig() + unittest.main(verbosity=2) |