diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/channelz')
3 files changed, 522 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/channelz/BUILD.bazel b/src/python/grpcio_tests/tests/channelz/BUILD.bazel new file mode 100644 index 0000000000..63513616e7 --- /dev/null +++ b/src/python/grpcio_tests/tests/channelz/BUILD.bazel @@ -0,0 +1,15 @@ +package(default_visibility = ["//visibility:public"]) + +py_test( + name = "channelz_servicer_test", + srcs = ["_channelz_servicer_test.py"], + main = "_channelz_servicer_test.py", + size = "small", + deps = [ + "//src/python/grpcio/grpc:grpcio", + "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz", + "//src/python/grpcio_tests/tests/unit:test_common", + "//src/python/grpcio_tests/tests/unit/framework/common:common", + ], + imports = ["../../",], +) diff --git a/src/python/grpcio_tests/tests/channelz/__init__.py b/src/python/grpcio_tests/tests/channelz/__init__.py new file mode 100644 index 0000000000..38fdfc9c5c --- /dev/null +++ b/src/python/grpcio_tests/tests/channelz/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py new file mode 100644 index 0000000000..84f8594689 --- /dev/null +++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py @@ -0,0 +1,494 @@ +# Copyright 2018 The gRPC Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests of grpc_channelz.v1.channelz.""" + +import unittest + +from concurrent import futures + +import grpc +from grpc_channelz.v1 import channelz +from grpc_channelz.v1 import channelz_pb2 +from grpc_channelz.v1 import channelz_pb2_grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary' +_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary' +_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream' + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x01\x01\x01' + +_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),) +_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),) +_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),) + + +def _successful_unary_unary(request, servicer_context): + return _RESPONSE + + +def _failed_unary_unary(request, servicer_context): + servicer_context.set_code(grpc.StatusCode.INTERNAL) + servicer_context.set_details("Channelz Test Intended Failure") + + +def _successful_stream_stream(request_iterator, servicer_context): + for _ in request_iterator: + yield _RESPONSE + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) + elif handler_call_details.method == _FAILED_UNARY_UNARY: + return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) + elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: + return grpc.stream_stream_rpc_method_handler( + _successful_stream_stream) + else: + return None + + +class _ChannelServerPair(object): + + def __init__(self): + # Server will enable channelz service + # Bind as attribute, so its `del` can be called explicitly, during + # the destruction process. Otherwise, if the removal of server + # rely on gc cycle, the test will become non-deterministic. + self._server = grpc.server( + futures.ThreadPoolExecutor(max_workers=3), + options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ) + port = self._server.add_insecure_port('[::]:0') + self._server.add_generic_rpc_handlers((_GenericHandler(),)) + self._server.start() + + # Channel will enable channelz service... + self.channel = grpc.insecure_channel('localhost:%d' % port, + _ENABLE_CHANNELZ) + + def __del__(self): + self._server.__del__() + self.channel.close() + + +def _generate_channel_server_pairs(n): + return [_ChannelServerPair() for i in range(n)] + + +def _clean_channel_server_pairs(pairs): + for pair in pairs: + pair.__del__() + + +class ChannelzServicerTest(unittest.TestCase): + + def _send_successful_unary_unary(self, idx): + _, r = self._pairs[idx].channel.unary_unary( + _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST) + self.assertEqual(r.code(), grpc.StatusCode.OK) + + def _send_failed_unary_unary(self, idx): + try: + self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call( + _REQUEST) + except grpc.RpcError: + return + else: + self.fail("This call supposed to fail") + + def _send_successful_stream_stream(self, idx): + response_iterator = self._pairs[idx].channel.stream_stream( + _SUCCESSFUL_STREAM_STREAM).__call__( + iter([_REQUEST] * test_constants.STREAM_LENGTH)) + cnt = 0 + for _ in response_iterator: + cnt += 1 + self.assertEqual(cnt, test_constants.STREAM_LENGTH) + + def _get_channel_id(self, idx): + """Channel id may not be consecutive""" + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertGreater(len(resp.channel), idx) + return resp.channel[idx].ref.channel_id + + def setUp(self): + self._pairs = [] + # This server is for Channelz info fetching only + # It self should not enable Channelz + self._server = grpc.server( + futures.ThreadPoolExecutor(max_workers=3), + options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ) + port = self._server.add_insecure_port('[::]:0') + channelz.add_channelz_servicer(self._server) + self._server.start() + + # This channel is used to fetch Channelz info only + # Channelz should not be enabled + self._channel = grpc.insecure_channel('localhost:%d' % port, + _DISABLE_CHANNELZ) + self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) + + def tearDown(self): + self._server.__del__() + self._channel.close() + _clean_channel_server_pairs(self._pairs) + + def test_get_top_channels_basic(self): + self._pairs = _generate_channel_server_pairs(1) + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(resp.channel), 1) + self.assertEqual(resp.end, True) + + def test_get_top_channels_high_start_id(self): + self._pairs = _generate_channel_server_pairs(1) + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=10000)) + self.assertEqual(len(resp.channel), 0) + self.assertEqual(resp.end, True) + + def test_successful_request(self): + self._pairs = _generate_channel_server_pairs(1) + self._send_successful_unary_unary(0) + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, 1) + self.assertEqual(resp.channel.data.calls_succeeded, 1) + self.assertEqual(resp.channel.data.calls_failed, 0) + + def test_failed_request(self): + self._pairs = _generate_channel_server_pairs(1) + self._send_failed_unary_unary(0) + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, 1) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, 1) + + def test_many_requests(self): + self._pairs = _generate_channel_server_pairs(1) + k_success = 7 + k_failed = 9 + for i in range(k_success): + self._send_successful_unary_unary(0) + for i in range(k_failed): + self._send_failed_unary_unary(0) + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + def test_many_channel(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(resp.channel), k_channels) + + def test_many_requests_many_channel(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + k_success = 11 + k_failed = 13 + for i in range(k_success): + self._send_successful_unary_unary(0) + self._send_successful_unary_unary(2) + for i in range(k_failed): + self._send_failed_unary_unary(1) + self._send_failed_unary_unary(2) + + # The first channel saw only successes + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(resp.channel.data.calls_started, k_success) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, 0) + + # The second channel saw only failures + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1))) + self.assertEqual(resp.channel.data.calls_started, k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + # The third channel saw both successes and failures + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2))) + self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) + self.assertEqual(resp.channel.data.calls_succeeded, k_success) + self.assertEqual(resp.channel.data.calls_failed, k_failed) + + # The fourth channel saw nothing + resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3))) + self.assertEqual(resp.channel.data.calls_started, 0) + self.assertEqual(resp.channel.data.calls_succeeded, 0) + self.assertEqual(resp.channel.data.calls_failed, 0) + + def test_many_subchannels(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + k_success = 17 + k_failed = 19 + for i in range(k_success): + self._send_successful_unary_unary(0) + self._send_successful_unary_unary(2) + for i in range(k_failed): + self._send_failed_unary_unary(1) + self._send_failed_unary_unary(2) + + gtc_resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(gtc_resp.channel), k_channels) + for i in range(k_channels): + # If no call performed in the channel, there shouldn't be any subchannel + if gtc_resp.channel[i].data.calls_started == 0: + self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) + continue + + # Otherwise, the subchannel should exist + self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) + gsc_resp = self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gtc_resp.channel[i].subchannel_ref[ + 0].subchannel_id)) + self.assertEqual(gtc_resp.channel[i].data.calls_started, + gsc_resp.subchannel.data.calls_started) + self.assertEqual(gtc_resp.channel[i].data.calls_succeeded, + gsc_resp.subchannel.data.calls_succeeded) + self.assertEqual(gtc_resp.channel[i].data.calls_failed, + gsc_resp.subchannel.data.calls_failed) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_basic(self): + self._pairs = _generate_channel_server_pairs(1) + resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(resp.server), 1) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_get_one_server(self): + self._pairs = _generate_channel_server_pairs(1) + gss_resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(gss_resp.server), 1) + gs_resp = self._channelz_stub.GetServer( + channelz_pb2.GetServerRequest( + server_id=gss_resp.server[0].ref.server_id)) + self.assertEqual(gss_resp.server[0].ref.server_id, + gs_resp.server.ref.server_id) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_call(self): + self._pairs = _generate_channel_server_pairs(1) + k_success = 23 + k_failed = 29 + for i in range(k_success): + self._send_successful_unary_unary(0) + for i in range(k_failed): + self._send_failed_unary_unary(0) + + resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(resp.server), 1) + self.assertEqual(resp.server[0].data.calls_started, + k_success + k_failed) + self.assertEqual(resp.server[0].data.calls_succeeded, k_success) + self.assertEqual(resp.server[0].data.calls_failed, k_failed) + + def test_many_subchannels_and_sockets(self): + k_channels = 4 + self._pairs = _generate_channel_server_pairs(k_channels) + k_success = 3 + k_failed = 5 + for i in range(k_success): + self._send_successful_unary_unary(0) + self._send_successful_unary_unary(2) + for i in range(k_failed): + self._send_failed_unary_unary(1) + self._send_failed_unary_unary(2) + + gtc_resp = self._channelz_stub.GetTopChannels( + channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) + self.assertEqual(len(gtc_resp.channel), k_channels) + for i in range(k_channels): + # If no call performed in the channel, there shouldn't be any subchannel + if gtc_resp.channel[i].data.calls_started == 0: + self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) + continue + + # Otherwise, the subchannel should exist + self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) + gsc_resp = self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gtc_resp.channel[i].subchannel_ref[ + 0].subchannel_id)) + self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) + + gs_resp = self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.streams_started) + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.streams_succeeded) + # Calls started == messages sent, only valid for unary calls + self.assertEqual(gsc_resp.subchannel.data.calls_started, + gs_resp.socket.data.messages_sent) + # Only receive responses when the RPC was successful + self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, + gs_resp.socket.data.messages_received) + + def test_streaming_rpc(self): + self._pairs = _generate_channel_server_pairs(1) + # In C++, the argument for _send_successful_stream_stream is message length. + # Here the argument is still channel idx, to be consistent with the other two. + self._send_successful_stream_stream(0) + + gc_resp = self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) + self.assertEqual(gc_resp.channel.data.calls_started, 1) + self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) + self.assertEqual(gc_resp.channel.data.calls_failed, 0) + # Subchannel exists + self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) + + gsc_resp = self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest( + subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id)) + self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) + self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) + self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) + # Socket exists + self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) + + gs_resp = self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) + self.assertEqual(gs_resp.socket.data.streams_started, 1) + self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) + self.assertEqual(gs_resp.socket.data.streams_failed, 0) + self.assertEqual(gs_resp.socket.data.messages_sent, + test_constants.STREAM_LENGTH) + self.assertEqual(gs_resp.socket.data.messages_received, + test_constants.STREAM_LENGTH) + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_sockets(self): + self._pairs = _generate_channel_server_pairs(1) + self._send_successful_unary_unary(0) + self._send_failed_unary_unary(0) + + gs_resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(gs_resp.server), 1) + self.assertEqual(gs_resp.server[0].data.calls_started, 2) + self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1) + self.assertEqual(gs_resp.server[0].data.calls_failed, 1) + + gss_resp = self._channelz_stub.GetServerSockets( + channelz_pb2.GetServerSocketsRequest( + server_id=gs_resp.server[0].ref.server_id, start_socket_id=0)) + # If the RPC call failed, it will raise a grpc.RpcError + # So, if there is no exception raised, considered pass + + @unittest.skip('Servers in core are not guaranteed to be destroyed ' \ + 'immediately when the reference goes out of scope, so ' \ + 'servers from multiple test cases are not hermetic. ' \ + 'TODO(https://github.com/grpc/grpc/issues/17258)') + def test_server_listen_sockets(self): + self._pairs = _generate_channel_server_pairs(1) + + gss_resp = self._channelz_stub.GetServers( + channelz_pb2.GetServersRequest(start_server_id=0)) + self.assertEqual(len(gss_resp.server), 1) + self.assertEqual(len(gss_resp.server[0].listen_socket), 1) + + gs_resp = self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest( + socket_id=gss_resp.server[0].listen_socket[0].socket_id)) + # If the RPC call failed, it will raise a grpc.RpcError + # So, if there is no exception raised, considered pass + + def test_invalid_query_get_server(self): + try: + self._channelz_stub.GetServer( + channelz_pb2.GetServerRequest(server_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_channel(self): + try: + self._channelz_stub.GetChannel( + channelz_pb2.GetChannelRequest(channel_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_subchannel(self): + try: + self._channelz_stub.GetSubchannel( + channelz_pb2.GetSubchannelRequest(subchannel_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_socket(self): + try: + self._channelz_stub.GetSocket( + channelz_pb2.GetSocketRequest(socket_id=10000)) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + def test_invalid_query_get_server_sockets(self): + try: + self._channelz_stub.GetServerSockets( + channelz_pb2.GetServerSocketsRequest( + server_id=10000, + start_socket_id=0, + )) + except BaseException as e: + self.assertIn('StatusCode.NOT_FOUND', str(e)) + else: + self.fail('Invalid query not detected') + + +if __name__ == '__main__': + unittest.main(verbosity=2) |