diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit')
5 files changed, 375 insertions, 58 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py new file mode 100644 index 0000000000..ac66d1db3d --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -0,0 +1,118 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Common utilities for tests of the Cython layer of gRPC Python.""" + +import collections +import threading + +from grpc._cython import cygrpc + +RPC_COUNT = 4000 + +INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) +EMPTY_FLAGS = 0 + +INVOCATION_METADATA = cygrpc.Metadata( + (cygrpc.Metadatum(b'client-md-key', b'client-md-key'), + cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),)) + +INITIAL_METADATA = cygrpc.Metadata( + (cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'), + cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),)) + +TRAILING_METADATA = cygrpc.Metadata( + (cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'), + cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),)) + + +class QueueDriver(object): + + def __init__(self, condition, completion_queue): + self._condition = condition + self._completion_queue = completion_queue + self._due = collections.defaultdict(int) + self._events = collections.defaultdict(list) + + def add_due(self, tags): + if not self._due: + + def in_thread(): + while True: + event = self._completion_queue.poll() + with self._condition: + self._events[event.tag].append(event) + self._due[event.tag] -= 1 + self._condition.notify_all() + if self._due[event.tag] <= 0: + self._due.pop(event.tag) + if not self._due: + return + + thread = threading.Thread(target=in_thread) + thread.start() + for tag in tags: + self._due[tag] += 1 + + def event_with_tag(self, tag): + with self._condition: + while True: + if self._events[tag]: + return self._events[tag].pop(0) + else: + self._condition.wait() + + +def execute_many_times(behavior): + return tuple(behavior() for _ in range(RPC_COUNT)) + + +class OperationResult( + collections.namedtuple('OperationResult', ( + 'start_batch_result', 'completion_type', 'success',))): + pass + + +SUCCESSFUL_OPERATION_RESULT = OperationResult( + cygrpc.CallError.ok, cygrpc.CompletionType.operation_complete, True) + + +class RpcTest(object): + + def setUp(self): + self.server_completion_queue = cygrpc.CompletionQueue() + self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server.register_completion_queue(self.server_completion_queue) + port = self.server.add_http2_port(b'[::]:0') + self.server.start() + self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), + cygrpc.ChannelArgs([])) + + self._server_shutdown_tag = 'server_shutdown_tag' + self.server_condition = threading.Condition() + self.server_driver = QueueDriver(self.server_condition, + self.server_completion_queue) + with self.server_condition: + self.server_driver.add_due({ + self._server_shutdown_tag, + }) + + self.client_condition = threading.Condition() + self.client_completion_queue = cygrpc.CompletionQueue() + self.client_driver = QueueDriver(self.client_condition, + self.client_completion_queue) + + def tearDown(self): + self.server.shutdown(self.server_completion_queue, + self._server_shutdown_tag) + self.server.cancel_all_calls() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py new file mode 100644 index 0000000000..14cc66675c --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -0,0 +1,131 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test a corner-case at the level of the Cython API.""" + +import threading +import unittest + +from grpc._cython import cygrpc + +from tests.unit._cython import _common + + +class Test(_common.RpcTest, unittest.TestCase): + + def _do_rpcs(self): + server_call_condition = threading.Condition() + server_call_completion_queue = cygrpc.CompletionQueue() + server_call_driver = _common.QueueDriver(server_call_condition, + server_call_completion_queue) + + server_request_call_tag = 'server_request_call_tag' + server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' + server_complete_rpc_tag = 'server_complete_rpc_tag' + + with self.server_condition: + server_request_call_start_batch_result = self.server.request_call( + server_call_completion_queue, self.server_completion_queue, + server_request_call_tag) + self.server_driver.add_due({ + server_request_call_tag, + }) + + client_call = self.channel.create_call( + None, _common.EMPTY_FLAGS, self.client_completion_queue, + b'/twinkies', None, _common.INFINITE_FUTURE) + client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' + client_complete_rpc_tag = 'client_complete_rpc_tag' + with self.client_condition: + client_receive_initial_metadata_start_batch_result = ( + client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_receive_initial_metadata( + _common.EMPTY_FLAGS), + ]), client_receive_initial_metadata_tag)) + client_complete_rpc_start_batch_result = client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.operation_send_close_from_client( + _common.EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client( + _common.EMPTY_FLAGS), + ]), client_complete_rpc_tag) + self.client_driver.add_due({ + client_receive_initial_metadata_tag, + client_complete_rpc_tag, + }) + + server_request_call_event = self.server_driver.event_with_tag( + server_request_call_tag) + + with server_call_condition: + server_send_initial_metadata_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_send_initial_metadata( + _common.INITIAL_METADATA, _common.EMPTY_FLAGS), + ], server_send_initial_metadata_tag)) + server_call_driver.add_due({ + server_send_initial_metadata_tag, + }) + server_send_initial_metadata_event = server_call_driver.event_with_tag( + server_send_initial_metadata_tag) + + with server_call_condition: + server_complete_rpc_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_receive_close_on_server( + _common.EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _common.TRAILING_METADATA, cygrpc.StatusCode.ok, + b'test details', _common.EMPTY_FLAGS), + ], server_complete_rpc_tag)) + server_call_driver.add_due({ + server_complete_rpc_tag, + }) + server_complete_rpc_event = server_call_driver.event_with_tag( + server_complete_rpc_tag) + + client_receive_initial_metadata_event = self.client_driver.event_with_tag( + client_receive_initial_metadata_tag) + client_complete_rpc_event = self.client_driver.event_with_tag( + client_complete_rpc_tag) + + return (_common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.type, + server_complete_rpc_event.success),) + + def test_rpcs(self): + expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * + 5] * _common.RPC_COUNT + actuallys = _common.execute_many_times(self._do_rpcs) + self.assertSequenceEqual(expecteds, actuallys) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py new file mode 100644 index 0000000000..1e44bcc4dc --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -0,0 +1,126 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test a corner-case at the level of the Cython API.""" + +import threading +import unittest + +from grpc._cython import cygrpc + +from tests.unit._cython import _common + + +class Test(_common.RpcTest, unittest.TestCase): + + def _do_rpcs(self): + server_request_call_tag = 'server_request_call_tag' + server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' + server_complete_rpc_tag = 'server_complete_rpc_tag' + + with self.server_condition: + server_request_call_start_batch_result = self.server.request_call( + self.server_completion_queue, self.server_completion_queue, + server_request_call_tag) + self.server_driver.add_due({ + server_request_call_tag, + }) + + client_call = self.channel.create_call( + None, _common.EMPTY_FLAGS, self.client_completion_queue, + b'/twinkies', None, _common.INFINITE_FUTURE) + client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' + client_complete_rpc_tag = 'client_complete_rpc_tag' + with self.client_condition: + client_receive_initial_metadata_start_batch_result = ( + client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_receive_initial_metadata( + _common.EMPTY_FLAGS), + ]), client_receive_initial_metadata_tag)) + client_complete_rpc_start_batch_result = client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.operation_send_close_from_client( + _common.EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client( + _common.EMPTY_FLAGS), + ]), client_complete_rpc_tag) + self.client_driver.add_due({ + client_receive_initial_metadata_tag, + client_complete_rpc_tag, + }) + + server_request_call_event = self.server_driver.event_with_tag( + server_request_call_tag) + + with self.server_condition: + server_send_initial_metadata_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_send_initial_metadata( + _common.INITIAL_METADATA, _common.EMPTY_FLAGS), + ], server_send_initial_metadata_tag)) + self.server_driver.add_due({ + server_send_initial_metadata_tag, + }) + server_send_initial_metadata_event = self.server_driver.event_with_tag( + server_send_initial_metadata_tag) + + with self.server_condition: + server_complete_rpc_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_receive_close_on_server( + _common.EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _common.TRAILING_METADATA, cygrpc.StatusCode.ok, + b'test details', _common.EMPTY_FLAGS), + ], server_complete_rpc_tag)) + self.server_driver.add_due({ + server_complete_rpc_tag, + }) + server_complete_rpc_event = self.server_driver.event_with_tag( + server_complete_rpc_tag) + + client_receive_initial_metadata_event = self.client_driver.event_with_tag( + client_receive_initial_metadata_tag) + client_complete_rpc_event = self.client_driver.event_with_tag( + client_complete_rpc_tag) + + return (_common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.type, + server_complete_rpc_event.success),) + + def test_rpcs(self): + expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * + 5] * _common.RPC_COUNT + actuallys = _common.execute_many_times(self._do_rpcs) + self.assertSequenceEqual(expecteds, actuallys) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py b/src/python/grpcio_tests/tests/unit/_sanity/__init__.py deleted file mode 100644 index 5772620b60..0000000000 --- a/src/python/grpcio_tests/tests/unit/_sanity/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py b/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py deleted file mode 100644 index 19bc8801eb..0000000000 --- a/src/python/grpcio_tests/tests/unit/_sanity/_sanity_test.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import unittest - -import pkg_resources -import six - -import tests - - -class Sanity(unittest.TestCase): - - def testTestsJsonUpToDate(self): - """Autodiscovers all test suites and checks that tests.json is up to date""" - loader = tests.Loader() - loader.loadTestsFromNames(['tests']) - test_suite_names = [ - test_case_class.id().rsplit('.', 1)[0] - for test_case_class in tests._loader.iterate_suite_cases( - loader.suite) - ] - test_suite_names = sorted(set(test_suite_names)) - - tests_json_string = pkg_resources.resource_string('tests', 'tests.json') - if six.PY3: - tests_json_string = tests_json_string.decode() - tests_json = json.loads(tests_json_string) - self.assertListEqual(test_suite_names, tests_json) - - -if __name__ == '__main__': - unittest.main(verbosity=2) |