diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/_cython')
9 files changed, 376 insertions, 363 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 5b97b7b542..3765ce4fb0 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -20,9 +20,8 @@ from grpc._cython import cygrpc from grpc.framework.foundation import logging_pool from tests.unit.framework.common import test_constants -_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _EMPTY_FLAGS = 0 -_EMPTY_METADATA = cygrpc.Metadata(()) +_EMPTY_METADATA = () _SERVER_SHUTDOWN_TAG = 'server_shutdown' _REQUEST_CALL_TAG = 'request_call' @@ -53,7 +52,7 @@ class _Handler(object): self._state = state self._lock = threading.Lock() self._completion_queue = completion_queue - self._call = rpc_event.operation_call + self._call = rpc_event.call def __call__(self): with self._state.condition: @@ -65,12 +64,10 @@ class _Handler(object): with self._lock: self._call.start_server_batch( - cygrpc.Operations( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),)), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _RECEIVE_CLOSE_ON_SERVER_TAG) self._call.start_server_batch( - cygrpc.Operations( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),)), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _RECEIVE_MESSAGE_TAG) first_event = self._completion_queue.poll() if _is_cancellation_event(first_event): @@ -78,14 +75,15 @@ class _Handler(object): else: with self._lock: operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', - _EMPTY_FLAGS),) - self._call.start_server_batch( - cygrpc.Operations(operations), _SERVER_COMPLETE_CALL_TAG) + _EMPTY_FLAGS), + ) + self._call.start_server_batch(operations, + _SERVER_COMPLETE_CALL_TAG) self._completion_queue.poll() self._completion_queue.poll() @@ -143,17 +141,25 @@ class CancelManyCallsTest(unittest.TestCase): test_constants.THREAD_CONCURRENCY) server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server([ + ( + b'grpc.so_reuseport', + 0, + ), + ]) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), - cygrpc.ChannelArgs([])) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None) state = _State() - server_thread_args = (state, server, server_completion_queue, - server_thread_pool,) + server_thread_args = ( + state, + server, + server_completion_queue, + server_thread_pool, + ) server_thread = threading.Thread(target=_serve, args=server_thread_args) server_thread.start() @@ -167,20 +173,20 @@ class CancelManyCallsTest(unittest.TestCase): with client_condition: client_calls = [] for index in range(test_constants.RPC_CONCURRENCY): - client_call = channel.create_call( - None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', - None, _INFINITE_FUTURE) + client_call = channel.create_call(None, _EMPTY_FLAGS, + client_completion_queue, + b'/twinkies', None, None) operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ) tag = 'client_complete_call_{0:04d}_tag'.format(index) - client_call.start_client_batch( - cygrpc.Operations(operations), tag) + client_call.start_client_batch(operations, tag) client_due.add(tag) client_calls.append(client_call) @@ -195,8 +201,8 @@ class CancelManyCallsTest(unittest.TestCase): state.condition.notify_all() break - client_driver.events(test_constants.RPC_CONCURRENCY * - _SUCCESS_CALL_FRACTION) + client_driver.events( + test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) with client_condition: for client_call in client_calls: client_call.cancel() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py index 1d57ea7ec1..7305d0fa3f 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py @@ -22,7 +22,7 @@ from tests.unit.framework.common import test_constants def _channel_and_completion_queue(): - channel = cygrpc.Channel(b'localhost:54321', cygrpc.ChannelArgs(())) + channel = cygrpc.Channel(b'localhost:54321', ()) completion_queue = cygrpc.CompletionQueue() return channel, completion_queue @@ -31,9 +31,9 @@ def _connectivity_loop(channel, completion_queue): for _ in range(100): connectivity = channel.check_connectivity_state(True) channel.watch_connectivity_state(connectivity, - cygrpc.Timespec(time.time() + 0.2), - completion_queue, None) - completion_queue.poll(deadline=cygrpc.Timespec(float('+inf'))) + time.time() + 0.2, completion_queue, + None) + completion_queue.poll() def _create_loop_destroy(): @@ -56,7 +56,10 @@ class ChannelTest(unittest.TestCase): def test_single_channel_lonely_connectivity(self): channel, completion_queue = _channel_and_completion_queue() - _in_parallel(_connectivity_loop, (channel, completion_queue,)) + _in_parallel(_connectivity_loop, ( + channel, + completion_queue, + )) completion_queue.shutdown() def test_multiple_channels_lonely_connectivity(self): diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index ac66d1db3d..7fd3d19b4e 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -20,20 +20,22 @@ from grpc._cython import cygrpc RPC_COUNT = 4000 -INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) EMPTY_FLAGS = 0 -INVOCATION_METADATA = cygrpc.Metadata( - (cygrpc.Metadatum(b'client-md-key', b'client-md-key'), - cygrpc.Metadatum(b'client-md-key-bin', b'\x00\x01' * 3000),)) +INVOCATION_METADATA = ( + ('client-md-key', 'client-md-key'), + ('client-md-key-bin', b'\x00\x01' * 3000), +) -INITIAL_METADATA = cygrpc.Metadata( - (cygrpc.Metadatum(b'server-initial-md-key', b'server-initial-md-value'), - cygrpc.Metadatum(b'server-initial-md-key-bin', b'\x00\x02' * 3000),)) +INITIAL_METADATA = ( + ('server-initial-md-key', 'server-initial-md-value'), + ('server-initial-md-key-bin', b'\x00\x02' * 3000), +) -TRAILING_METADATA = cygrpc.Metadata( - (cygrpc.Metadatum(b'server-trailing-md-key', b'server-trailing-md-value'), - cygrpc.Metadatum(b'server-trailing-md-key-bin', b'\x00\x03' * 3000),)) +TRAILING_METADATA = ( + ('server-trailing-md-key', 'server-trailing-md-value'), + ('server-trailing-md-key-bin', b'\x00\x03' * 3000), +) class QueueDriver(object): @@ -79,7 +81,10 @@ def execute_many_times(behavior): class OperationResult( collections.namedtuple('OperationResult', ( - 'start_batch_result', 'completion_type', 'success',))): + 'start_batch_result', + 'completion_type', + 'success', + ))): pass @@ -91,12 +96,11 @@ class RpcTest(object): def setUp(self): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server = cygrpc.Server([(b'grpc.so_reuseport', 0)]) self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() - self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), - cygrpc.ChannelArgs([])) + self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), []) self._server_shutdown_tag = 'server_shutdown_tag' self.server_condition = threading.Condition() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index 14cc66675c..7caa98f72d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -41,27 +41,27 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call( - None, _common.EMPTY_FLAGS, self.client_completion_queue, - b'/twinkies', None, _common.INFINITE_FUTURE) + client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, + self.client_completion_queue, + b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' with self.client_condition: client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), - ]), client_receive_initial_metadata_tag)) + client_call.start_client_batch([ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag)) + self.assertEqual(cygrpc.CallError.ok, + client_receive_initial_metadata_start_batch_result) client_complete_rpc_start_batch_result = client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_send_initial_metadata( + [ + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), - ]), client_complete_rpc_tag) + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), + ], client_complete_rpc_tag) + self.assertEqual(cygrpc.CallError.ok, + client_complete_rpc_start_batch_result) self.client_driver.add_due({ client_receive_initial_metadata_tag, client_complete_rpc_tag, @@ -72,8 +72,8 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + server_request_call_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_call_driver.add_due({ @@ -84,10 +84,9 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_request_call_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, b'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -102,27 +101,29 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), - _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), - _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + return ( + _common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success), + ) def test_rpcs(self): - expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * - 5] * _common.RPC_COUNT + expecteds = [( + _common.SUCCESSFUL_OPERATION_RESULT,) * 5] * _common.RPC_COUNT actuallys = _common.execute_many_times(self._do_rpcs) self.assertSequenceEqual(expecteds, actuallys) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index 1e44bcc4dc..8582a39c01 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -36,27 +36,23 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call( - None, _common.EMPTY_FLAGS, self.client_completion_queue, - b'/twinkies', None, _common.INFINITE_FUTURE) + client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, + self.client_completion_queue, + b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' with self.client_condition: client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), - ]), client_receive_initial_metadata_tag)) + client_call.start_client_batch([ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag)) client_complete_rpc_start_batch_result = client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_send_initial_metadata( + [ + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), - ]), client_complete_rpc_tag) + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), + ], client_complete_rpc_tag) self.client_driver.add_due({ client_receive_initial_metadata_tag, client_complete_rpc_tag, @@ -67,8 +63,8 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_send_initial_metadata_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + server_request_call_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) self.server_driver.add_due({ @@ -79,12 +75,11 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_complete_rpc_start_batch_result = ( - server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + server_request_call_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, - b'test details', _common.EMPTY_FLAGS), + 'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) self.server_driver.add_due({ server_complete_rpc_tag, @@ -97,27 +92,29 @@ class Test(_common.RpcTest, unittest.TestCase): client_complete_rpc_event = self.client_driver.event_with_tag( client_complete_rpc_tag) - return (_common.OperationResult(server_request_call_start_batch_result, - server_request_call_event.type, - server_request_call_event.success), - _common.OperationResult( - client_receive_initial_metadata_start_batch_result, - client_receive_initial_metadata_event.type, - client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, - client_complete_rpc_event.type, - client_complete_rpc_event.success), - _common.OperationResult( - server_send_initial_metadata_start_batch_result, - server_send_initial_metadata_event.type, - server_send_initial_metadata_event.success), - _common.OperationResult(server_complete_rpc_start_batch_result, - server_complete_rpc_event.type, - server_complete_rpc_event.success),) + return ( + _common.OperationResult(server_request_call_start_batch_result, + server_request_call_event.completion_type, + server_request_call_event.success), + _common.OperationResult( + client_receive_initial_metadata_start_batch_result, + client_receive_initial_metadata_event.completion_type, + client_receive_initial_metadata_event.success), + _common.OperationResult(client_complete_rpc_start_batch_result, + client_complete_rpc_event.completion_type, + client_complete_rpc_event.success), + _common.OperationResult( + server_send_initial_metadata_start_batch_result, + server_send_initial_metadata_event.completion_type, + server_send_initial_metadata_event.success), + _common.OperationResult(server_complete_rpc_start_batch_result, + server_complete_rpc_event.completion_type, + server_complete_rpc_event.success), + ) def test_rpcs(self): - expecteds = [(_common.SUCCESSFUL_OPERATION_RESULT,) * - 5] * _common.RPC_COUNT + expecteds = [( + _common.SUCCESSFUL_OPERATION_RESULT,) * 5] * _common.RPC_COUNT actuallys = _common.execute_many_times(self._do_rpcs) self.assertSequenceEqual(expecteds, actuallys) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index 0105612b47..bc63b54879 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -18,9 +18,8 @@ import unittest from grpc._cython import cygrpc -_INFINITE_FUTURE = cygrpc.Timespec(float('+inf')) _EMPTY_FLAGS = 0 -_EMPTY_METADATA = cygrpc.Metadata(()) +_EMPTY_METADATA = () class _ServerDriver(object): @@ -112,12 +111,14 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): def testReadSomeButNotAllResponses(self): server_completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server([( + b'grpc.so_reuseport', + 0, + )]) server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), - cygrpc.ChannelArgs([])) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set()) server_shutdown_tag = 'server_shutdown_tag' server_driver = _ServerDriver(server_completion_queue, @@ -136,9 +137,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_send_first_message_tag = 'server_send_first_message_tag' server_send_second_message_tag = 'server_send_second_message_tag' server_complete_rpc_tag = 'server_complete_rpc_tag' - server_call_due = set( - (server_send_initial_metadata_tag, server_send_first_message_tag, - server_send_second_message_tag, server_complete_rpc_tag,)) + server_call_due = set(( + server_send_initial_metadata_tag, + server_send_first_message_tag, + server_send_second_message_tag, + server_complete_rpc_tag, + )) server_call_completion_queue = cygrpc.CompletionQueue() server_call_driver = _QueueDriver(server_call_condition, server_call_completion_queue, @@ -152,37 +156,35 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_call = channel.create_call(None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', - None, _INFINITE_FUTURE) + None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' with client_condition: client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - ]), client_receive_initial_metadata_tag)) + client_call.start_client_batch([ + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], client_receive_initial_metadata_tag)) client_due.add(client_receive_initial_metadata_tag) client_complete_rpc_start_batch_result = ( - client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), - ]), client_complete_rpc_tag)) + client_call.start_client_batch([ + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], client_complete_rpc_tag)) client_due.add(client_complete_rpc_tag) server_rpc_event = server_driver.first_event() with server_call_condition: server_send_initial_metadata_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_send_first_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_first_message_tag)) server_send_initial_metadata_event = server_call_driver.event_with_tag( server_send_initial_metadata_tag) @@ -190,15 +192,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_send_first_message_tag) with server_call_condition: server_send_second_message_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_second_message_tag)) server_complete_rpc_start_batch_result = ( - server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( - cygrpc.Metadata(()), cygrpc.StatusCode.ok, - b'test details', _EMPTY_FLAGS), + server_rpc_event.call.start_server_batch([ + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( + (), cygrpc.StatusCode.ok, b'test details', + _EMPTY_FLAGS), ], server_complete_rpc_tag)) server_send_second_message_event = server_call_driver.event_with_tag( server_send_second_message_tag) @@ -209,10 +211,9 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with client_condition: client_receive_first_message_tag = 'client_receive_first_message_tag' client_receive_first_message_start_batch_result = ( - client_call.start_client_batch( - cygrpc.Operations([ - cygrpc.operation_receive_message(_EMPTY_FLAGS), - ]), client_receive_first_message_tag)) + client_call.start_client_batch([ + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + ], client_receive_first_message_tag)) client_due.add(client_receive_first_message_tag) client_receive_first_message_event = client_driver.event_with_tag( client_receive_first_message_tag) @@ -234,9 +235,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, - server_rpc_event.type) - self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call) - self.assertEqual(0, len(server_rpc_event.batch_operations)) + server_rpc_event.completion_type) + self.assertIsInstance(server_rpc_event.call, cygrpc.Call) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py new file mode 100644 index 0000000000..bbd25457b3 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test servers at the level of the Cython API.""" + +import threading +import time +import unittest + +from grpc._cython import cygrpc + + +class Test(unittest.TestCase): + + def test_lonely_server(self): + server_call_completion_queue = cygrpc.CompletionQueue() + server_shutdown_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server(None) + server.register_completion_queue(server_call_completion_queue) + server.register_completion_queue(server_shutdown_completion_queue) + port = server.add_http2_port(b'[::]:0') + server.start() + + server_request_call_tag = 'server_request_call_tag' + server_request_call_start_batch_result = server.request_call( + server_call_completion_queue, server_call_completion_queue, + server_request_call_tag) + + time.sleep(4) + + server_shutdown_tag = 'server_shutdown_tag' + server_shutdown_result = server.shutdown( + server_shutdown_completion_queue, server_shutdown_tag) + server_request_call_event = server_call_completion_queue.poll() + server_shutdown_event = server_shutdown_completion_queue.poll() + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 18d4a6df64..9045ff58a0 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -28,97 +28,43 @@ _CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value' _EMPTY_FLAGS = 0 -def _metadata_plugin_callback(context, callback): - callback( - cygrpc.Metadata([ - cygrpc.Metadatum(_CALL_CREDENTIALS_METADATA_KEY, - _CALL_CREDENTIALS_METADATA_VALUE) - ]), cygrpc.StatusCode.ok, b'') +def _metadata_plugin(context, callback): + callback((( + _CALL_CREDENTIALS_METADATA_KEY, + _CALL_CREDENTIALS_METADATA_VALUE, + ),), cygrpc.StatusCode.ok, b'') class TypeSmokeTest(unittest.TestCase): - def testStringsInUtilitiesUpDown(self): - self.assertEqual(0, cygrpc.StatusCode.ok) - metadatum = cygrpc.Metadatum(b'a', b'b') - self.assertEqual(b'a', metadatum.key) - self.assertEqual(b'b', metadatum.value) - metadata = cygrpc.Metadata([metadatum]) - self.assertEqual(1, len(metadata)) - self.assertEqual(metadatum.key, metadata[0].key) - - def testMetadataIteration(self): - metadata = cygrpc.Metadata( - [cygrpc.Metadatum(b'a', b'b'), cygrpc.Metadatum(b'c', b'd')]) - iterator = iter(metadata) - metadatum = next(iterator) - self.assertIsInstance(metadatum, cygrpc.Metadatum) - self.assertEqual(metadatum.key, b'a') - self.assertEqual(metadatum.value, b'b') - metadatum = next(iterator) - self.assertIsInstance(metadatum, cygrpc.Metadatum) - self.assertEqual(metadatum.key, b'c') - self.assertEqual(metadatum.value, b'd') - with self.assertRaises(StopIteration): - next(iterator) - - def testOperationsIteration(self): - operations = cygrpc.Operations( - [cygrpc.operation_send_message(b'asdf', _EMPTY_FLAGS)]) - iterator = iter(operations) - operation = next(iterator) - self.assertIsInstance(operation, cygrpc.Operation) - # `Operation`s are write-only structures; can't directly debug anything out - # of them. Just check that we stop iterating. - with self.assertRaises(StopIteration): - next(iterator) - - def testOperationFlags(self): - operation = cygrpc.operation_send_message(b'asdf', - cygrpc.WriteFlag.no_compress) - self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) - - def testTimespec(self): - now = time.time() - now_timespec_a = cygrpc.Timespec(now) - now_timespec_b = cygrpc.Timespec(now) - self.assertAlmostEqual(now, float(now_timespec_a), places=8) - self.assertEqual(now_timespec_a, now_timespec_b) - self.assertLess(cygrpc.Timespec(now - 1), cygrpc.Timespec(now)) - self.assertGreater(cygrpc.Timespec(now + 1), cygrpc.Timespec(now)) - self.assertGreaterEqual(cygrpc.Timespec(now + 1), cygrpc.Timespec(now)) - self.assertGreaterEqual(cygrpc.Timespec(now), cygrpc.Timespec(now)) - self.assertLessEqual(cygrpc.Timespec(now - 1), cygrpc.Timespec(now)) - self.assertLessEqual(cygrpc.Timespec(now), cygrpc.Timespec(now)) - self.assertNotEqual(cygrpc.Timespec(now - 1), cygrpc.Timespec(now)) - self.assertNotEqual(cygrpc.Timespec(now + 1), cygrpc.Timespec(now)) - def testCompletionQueueUpDown(self): completion_queue = cygrpc.CompletionQueue() del completion_queue def testServerUpDown(self): - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server(set([ + ( + b'grpc.so_reuseport', + 0, + ), + ])) del server def testChannelUpDown(self): - channel = cygrpc.Channel(b'[::]:0', cygrpc.ChannelArgs([])) + channel = cygrpc.Channel(b'[::]:0', None) del channel - def testCredentialsMetadataPluginUpDown(self): - plugin = cygrpc.CredentialsMetadataPlugin( - lambda ignored_a, ignored_b: None, b'') - del plugin - - def testCallCredentialsFromPluginUpDown(self): - plugin = cygrpc.CredentialsMetadataPlugin(_metadata_plugin_callback, - b'') - call_credentials = cygrpc.call_credentials_metadata_plugin(plugin) - del plugin - del call_credentials + def test_metadata_plugin_call_credentials_up_down(self): + cygrpc.MetadataPluginCallCredentials(_metadata_plugin, + b'test plugin name!') def testServerStartNoExplicitShutdown(self): - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server([ + ( + b'grpc.so_reuseport', + 0, + ), + ]) completion_queue = cygrpc.CompletionQueue() server.register_completion_queue(completion_queue) port = server.add_http2_port(b'[::]:0') @@ -128,14 +74,20 @@ class TypeSmokeTest(unittest.TestCase): def testServerStartShutdown(self): completion_queue = cygrpc.CompletionQueue() - server = cygrpc.Server(cygrpc.ChannelArgs([])) + server = cygrpc.Server([ + ( + b'grpc.so_reuseport', + 0, + ), + ]) server.add_http2_port(b'[::]:0') server.register_completion_queue(completion_queue) server.start() shutdown_tag = object() server.shutdown(completion_queue, shutdown_tag) event = completion_queue.poll() - self.assertEqual(cygrpc.CompletionType.operation_complete, event.type) + self.assertEqual(cygrpc.CompletionType.operation_complete, + event.completion_type) self.assertIs(shutdown_tag, event.tag) del server del completion_queue @@ -145,7 +97,12 @@ class ServerClientMixin(object): def setUpMixin(self, server_credentials, client_credentials, host_override): self.server_completion_queue = cygrpc.CompletionQueue() - self.server = cygrpc.Server(cygrpc.ChannelArgs([])) + self.server = cygrpc.Server([ + ( + b'grpc.so_reuseport', + 0, + ), + ]) self.server.register_completion_queue(self.server_completion_queue) if server_credentials: self.port = self.server.add_http2_port(b'[::]:0', @@ -155,17 +112,16 @@ class ServerClientMixin(object): self.server.start() self.client_completion_queue = cygrpc.CompletionQueue() if client_credentials: - client_channel_arguments = cygrpc.ChannelArgs([ - cygrpc.ChannelArg(cygrpc.ChannelArgKey.ssl_target_name_override, - host_override) - ]) - self.client_channel = cygrpc.Channel( - 'localhost:{}'.format(self.port).encode(), - client_channel_arguments, client_credentials) + client_channel_arguments = (( + cygrpc.ChannelArgKey.ssl_target_name_override, + host_override, + ),) + self.client_channel = cygrpc.Channel('localhost:{}'.format( + self.port).encode(), client_channel_arguments, + client_credentials) else: - self.client_channel = cygrpc.Channel( - 'localhost:{}'.format(self.port).encode(), - cygrpc.ChannelArgs([])) + self.client_channel = cygrpc.Channel('localhost:{}'.format( + self.port).encode(), set()) if host_override: self.host_argument = None # default host self.expected_host = host_override @@ -190,40 +146,37 @@ class ServerClientMixin(object): def performer(): tag = object() try: - call_result = call.start_client_batch( - cygrpc.Operations(operations), tag) + call_result = call.start_client_batch(operations, tag) self.assertEqual(cygrpc.CallError.ok, call_result) - event = queue.poll(deadline) + event = queue.poll(deadline=deadline) self.assertEqual(cygrpc.CompletionType.operation_complete, - event.type) + event.completion_type) self.assertTrue(event.success) self.assertIs(tag, event.tag) except Exception as error: - raise Exception( - "Error in '{}': {}".format(description, error.message)) + raise Exception("Error in '{}': {}".format( + description, error.message)) return event return test_utilities.SimpleFuture(performer) - def testEcho(self): + def test_echo(self): DEADLINE = time.time() + 5 DEADLINE_TOLERANCE = 0.25 - CLIENT_METADATA_ASCII_KEY = b'key' - CLIENT_METADATA_ASCII_VALUE = b'val' - CLIENT_METADATA_BIN_KEY = b'key-bin' + CLIENT_METADATA_ASCII_KEY = 'key' + CLIENT_METADATA_ASCII_VALUE = 'val' + CLIENT_METADATA_BIN_KEY = 'key-bin' CLIENT_METADATA_BIN_VALUE = b'\0' * 1000 - SERVER_INITIAL_METADATA_KEY = b'init_me_me_me' - SERVER_INITIAL_METADATA_VALUE = b'whodawha?' - SERVER_TRAILING_METADATA_KEY = b'california_is_in_a_drought' - SERVER_TRAILING_METADATA_VALUE = b'zomg it is' + SERVER_INITIAL_METADATA_KEY = 'init_me_me_me' + SERVER_INITIAL_METADATA_VALUE = 'whodawha?' + SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought' + SERVER_TRAILING_METADATA_VALUE = 'zomg it is' SERVER_STATUS_CODE = cygrpc.StatusCode.ok - SERVER_STATUS_DETAILS = b'our work is never over' + SERVER_STATUS_DETAILS = 'our work is never over' REQUEST = b'in death a member of project mayhem has a name' RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' - cygrpc_deadline = cygrpc.Timespec(DEADLINE) - server_request_tag = object() request_call_result = self.server.request_call( self.server_completion_queue, self.server_completion_queue, @@ -234,89 +187,91 @@ class ServerClientMixin(object): client_call_tag = object() client_call = self.client_channel.create_call( None, 0, self.client_completion_queue, METHOD, self.host_argument, - cygrpc_deadline) - client_initial_metadata = cygrpc.Metadata([ - cygrpc.Metadatum(CLIENT_METADATA_ASCII_KEY, - CLIENT_METADATA_ASCII_VALUE), - cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE) - ]) + DEADLINE) + client_initial_metadata = ( + ( + CLIENT_METADATA_ASCII_KEY, + CLIENT_METADATA_ASCII_VALUE, + ), + ( + CLIENT_METADATA_BIN_KEY, + CLIENT_METADATA_BIN_VALUE, + ), + ) client_start_batch_result = client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendInitialMetadataOperation(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( - self.client_completion_queue, cygrpc_deadline) + self.client_completion_queue, DEADLINE) - request_event = self.server_completion_queue.poll(cygrpc_deadline) + request_event = self.server_completion_queue.poll(deadline=DEADLINE) self.assertEqual(cygrpc.CompletionType.operation_complete, - request_event.type) - self.assertIsInstance(request_event.operation_call, cygrpc.Call) + request_event.completion_type) + self.assertIsInstance(request_event.call, cygrpc.Call) self.assertIs(server_request_tag, request_event.tag) - self.assertEqual(0, len(request_event.batch_operations)) self.assertTrue( test_common.metadata_transmitted(client_initial_metadata, - request_event.request_metadata)) - self.assertEqual(METHOD, request_event.request_call_details.method) - self.assertEqual(self.expected_host, - request_event.request_call_details.host) + request_event.invocation_metadata)) + self.assertEqual(METHOD, request_event.call_details.method) + self.assertEqual(self.expected_host, request_event.call_details.host) self.assertLess( - abs(DEADLINE - float(request_event.request_call_details.deadline)), + abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE) server_call_tag = object() - server_call = request_event.operation_call - server_initial_metadata = cygrpc.Metadata([ - cygrpc.Metadatum(SERVER_INITIAL_METADATA_KEY, - SERVER_INITIAL_METADATA_VALUE) - ]) - server_trailing_metadata = cygrpc.Metadata([ - cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY, - SERVER_TRAILING_METADATA_VALUE) - ]) + server_call = request_event.call + server_initial_metadata = (( + SERVER_INITIAL_METADATA_KEY, + SERVER_INITIAL_METADATA_VALUE, + ),) + server_trailing_metadata = (( + SERVER_TRAILING_METADATA_KEY, + SERVER_TRAILING_METADATA_VALUE, + ),) server_start_batch_result = server_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( - server_initial_metadata, - _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(server_initial_metadata, + _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) self.assertEqual(cygrpc.CallError.ok, server_start_batch_result) - server_event = self.server_completion_queue.poll(cygrpc_deadline) + server_event = self.server_completion_queue.poll(deadline=DEADLINE) client_event = client_event_future.result() self.assertEqual(6, len(client_event.batch_operations)) found_client_op_types = set() for client_result in client_event.batch_operations: # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == cygrpc.OperationType.receive_initial_metadata: + self.assertNotIn(client_result.type(), found_client_op_types) + found_client_op_types.add(client_result.type()) + if client_result.type( + ) == cygrpc.OperationType.receive_initial_metadata: self.assertTrue( test_common.metadata_transmitted( server_initial_metadata, - client_result.received_metadata)) - elif client_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(RESPONSE, - client_result.received_message.bytes()) - elif client_result.type == cygrpc.OperationType.receive_status_on_client: + client_result.initial_metadata())) + elif client_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(RESPONSE, client_result.message()) + elif client_result.type( + ) == cygrpc.OperationType.receive_status_on_client: self.assertTrue( test_common.metadata_transmitted( server_trailing_metadata, - client_result.received_metadata)) - self.assertEqual(SERVER_STATUS_DETAILS, - client_result.received_status_details) - self.assertEqual(SERVER_STATUS_CODE, - client_result.received_status_code) + client_result.trailing_metadata())) + self.assertEqual(SERVER_STATUS_DETAILS, client_result.details()) + self.assertEqual(SERVER_STATUS_CODE, client_result.code()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -330,13 +285,13 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(REQUEST, - server_result.received_message.bytes()) - elif server_result.type == cygrpc.OperationType.receive_close_on_server: - self.assertFalse(server_result.received_cancelled) + self.assertNotIn(client_result.type(), found_server_op_types) + found_server_op_types.add(server_result.type()) + if server_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(REQUEST, server_result.message()) + elif server_result.type( + ) == cygrpc.OperationType.receive_close_on_server: + self.assertFalse(server_result.cancelled()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -354,8 +309,7 @@ class ServerClientMixin(object): DEADLINE_TOLERANCE = 0.25 METHOD = b'twinkies' - cygrpc_deadline = cygrpc.Timespec(DEADLINE) - empty_metadata = cygrpc.Metadata([]) + empty_metadata = () server_request_tag = object() self.server.request_call(self.server_completion_queue, @@ -363,31 +317,29 @@ class ServerClientMixin(object): server_request_tag) client_call = self.client_channel.create_call( None, 0, self.client_completion_queue, METHOD, self.host_argument, - cygrpc_deadline) + DEADLINE) # Prologue def perform_client_operations(operations, description): return self._perform_operations(operations, client_call, self.client_completion_queue, - cygrpc_deadline, description) + DEADLINE, description) client_event_future = perform_client_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], "Client prologue") - request_event = self.server_completion_queue.poll(cygrpc_deadline) - server_call = request_event.operation_call + request_event = self.server_completion_queue.poll(deadline=DEADLINE) + server_call = request_event.call def perform_server_operations(operations, description): return self._perform_operations(operations, server_call, self.server_completion_queue, - cygrpc_deadline, description) + DEADLINE, description) server_event_future = perform_server_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") client_event_future.result() # force completion @@ -396,12 +348,12 @@ class ServerClientMixin(object): # Messaging for _ in range(10): client_event_future = perform_client_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") server_event_future = perform_server_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") client_event_future.result() # force completion @@ -409,13 +361,13 @@ class ServerClientMixin(object): # Epilogue client_event_future = perform_client_operations([ - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") server_event_future = perform_server_operations([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") @@ -435,12 +387,13 @@ class InsecureServerInsecureClient(unittest.TestCase, ServerClientMixin): class SecureServerSecureClient(unittest.TestCase, ServerClientMixin): def setUp(self): - server_credentials = cygrpc.server_credentials_ssl(None, [ - cygrpc.SslPemKeyCertPair(resources.private_key(), - resources.certificate_chain()) - ], False) - client_credentials = cygrpc.channel_credentials_ssl( - resources.test_root_certificates(), None) + server_credentials = cygrpc.server_credentials_ssl( + None, [ + cygrpc.SslPemKeyCertPair(resources.private_key(), + resources.certificate_chain()) + ], False) + client_credentials = cygrpc.SSLChannelCredentials( + resources.test_root_certificates(), None, None) self.setUpMixin(server_credentials, client_credentials, _SSL_HOST_OVERRIDE) diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py index 8e91161f80..4a00b9ef2f 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py @@ -49,4 +49,4 @@ class CompletionQueuePollFuture(SimpleFuture): def __init__(self, completion_queue, deadline): super(CompletionQueuePollFuture, - self).__init__(lambda: completion_queue.poll(deadline)) + self).__init__(lambda: completion_queue.poll(deadline=deadline)) |