diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py')
-rw-r--r-- | src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py new file mode 100644 index 0000000000..14cc66675c --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -0,0 +1,131 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test a corner-case at the level of the Cython API.""" + +import threading +import unittest + +from grpc._cython import cygrpc + +from tests.unit._cython import _common + + +class Test(_common.RpcTest, unittest.TestCase): + + def _do_rpcs(self): + server_call_condition = threading.Condition() + server_call_completion_queue = cygrpc.CompletionQueue() + server_call_driver = _common.QueueDriver(server_call_condition, + server_call_completion_queue) + + server_request_call_tag = 'server_request_call_tag' + server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' + server_complete_rpc_tag = 'server_complete_rpc_tag' + + with self.server_condition: + server_request_call_start_batch_result = self.server.request_call( + server_call_completion_queue, self.server_completion_queue, + server_request_call_tag) + self.server_driver.add_due({ + server_request_call_tag, + }) + + client_call = self.channel.create_call( + None, _common.EMPTY_FLAGS, self.client_completion_queue, + b'/twinkies', None, _common.INFINITE_FUTURE) + client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' + client_complete_rpc_tag = 'client_complete_rpc_tag' + with self.client_condition: + client_receive_initial_metadata_start_batch_result = ( + client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_receive_initial_metadata( + _common.EMPTY_FLAGS), + ]), client_receive_initial_metadata_tag)) + client_complete_rpc_start_batch_result = client_call.start_client_batch( + cygrpc.Operations([ + cygrpc.operation_send_initial_metadata( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.operation_send_close_from_client( + _common.EMPTY_FLAGS), + cygrpc.operation_receive_status_on_client( + _common.EMPTY_FLAGS), + ]), client_complete_rpc_tag) + self.client_driver.add_due({ + client_receive_initial_metadata_tag, + client_complete_rpc_tag, + }) + + server_request_call_event = self.server_driver.event_with_tag( + server_request_call_tag) + + with server_call_condition: + server_send_initial_metadata_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_send_initial_metadata( + _common.INITIAL_METADATA, _common.EMPTY_FLAGS), + ], server_send_initial_metadata_tag)) + server_call_driver.add_due({ + server_send_initial_metadata_tag, + }) + server_send_initial_metadata_event = server_call_driver.event_with_tag( + server_send_initial_metadata_tag) + + with server_call_condition: + server_complete_rpc_start_batch_result = ( + server_request_call_event.operation_call.start_server_batch([ + cygrpc.operation_receive_close_on_server( + _common.EMPTY_FLAGS), + cygrpc.operation_send_status_from_server( + _common.TRAILING_METADATA, cygrpc.StatusCode.ok, + b'test details', _common.EMPTY_FLAGS), + ], server_complete_rpc_tag)) + server_call_driver.add_due({ + server_complete_rpc_tag, + }) + server_complete_rpc_event = server_call_driver.event_with_tag( + server_complete_rpc_tag) + + client_receive_initial_metadata_event = self.client_driver.event_with_tag( + client_receive_initial_metadata_tag) + client_complete_rpc_event = self.client_driver.event_with_tag( + client_complete_rpc_tag) + + return (_common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.type, + server_complete_rpc_event.success),) + + def test_rpcs(self): + expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * + 5] * _common.RPC_COUNT + actuallys = _common.execute_many_times(self._do_rpcs) + self.assertSequenceEqual(expecteds, actuallys) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |