diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit')
30 files changed, 410 insertions, 2907 deletions
diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index 468869a03e..8c1a30e032 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -102,7 +102,8 @@ class AuthContextTest(unittest.TestCase): self.assertIsNone(auth_data[_ID]) self.assertIsNone(auth_data[_ID_KEY]) self.assertDictEqual({ - 'transport_security_type': [b'ssl'] + 'transport_security_type': [b'ssl'], + 'ssl_session_reused': [b'false'], }, auth_data[_AUTH_CTX]) def testSecureClientCert(self): diff --git a/src/python/grpcio_tests/tests/unit/_channel_close_test.py b/src/python/grpcio_tests/tests/unit/_channel_close_test.py new file mode 100644 index 0000000000..af3a9ee1ee --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_channel_close_test.py @@ -0,0 +1,185 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests server and client side compression.""" + +import threading +import time +import unittest + +import grpc + +from tests.unit import test_common +from tests.unit.framework.common import test_constants + +_BEAT = 0.5 +_SOME_TIME = 5 +_MORE_TIME = 10 + + +class _MethodHandler(grpc.RpcMethodHandler): + + request_streaming = True + response_streaming = True + request_deserializer = None + response_serializer = None + + def stream_stream(self, request_iterator, servicer_context): + for request in request_iterator: + yield request * 2 + + +_METHOD_HANDLER = _MethodHandler() + + +class _GenericHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + return _METHOD_HANDLER + + +_GENERIC_HANDLER = _GenericHandler() + + +class _Pipe(object): + + def __init__(self, values): + self._condition = threading.Condition() + self._values = list(values) + self._open = True + + def __iter__(self): + return self + + def _next(self): + with self._condition: + while not self._values and self._open: + self._condition.wait() + if self._values: + return self._values.pop(0) + else: + raise StopIteration() + + def next(self): + return self._next() + + def __next__(self): + return self._next() + + def add(self, value): + with self._condition: + self._values.append(value) + self._condition.notify() + + def close(self): + with self._condition: + self._open = False + self._condition.notify() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + +class ChannelCloseTest(unittest.TestCase): + + def setUp(self): + self._server = test_common.test_server( + max_workers=test_constants.THREAD_CONCURRENCY) + self._server.add_generic_rpc_handlers((_GENERIC_HANDLER,)) + self._port = self._server.add_insecure_port('[::]:0') + self._server.start() + + def tearDown(self): + self._server.stop(None) + + def test_close_immediately_after_call_invocation(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe(()) + response_iterator = multi_callable(request_iterator) + channel.close() + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_close_while_call_active(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + channel.close() + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_context_manager_close_while_call_active(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: # pylint: disable=bad-continuation + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_context_manager_close_while_many_calls_active(self): + with grpc.insecure_channel('localhost:{}'.format( + self._port)) as channel: # pylint: disable=bad-continuation + multi_callable = channel.stream_stream('Meffod') + request_iterators = tuple( + _Pipe((b'abc',)) + for _ in range(test_constants.THREAD_CONCURRENCY)) + response_iterators = [] + for request_iterator in request_iterators: + response_iterator = multi_callable(request_iterator) + next(response_iterator) + response_iterators.append(response_iterator) + for request_iterator in request_iterators: + request_iterator.close() + + for response_iterator in response_iterators: + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + def test_many_concurrent_closes(self): + channel = grpc.insecure_channel('localhost:{}'.format(self._port)) + multi_callable = channel.stream_stream('Meffod') + request_iterator = _Pipe((b'abc',)) + response_iterator = multi_callable(request_iterator) + next(response_iterator) + start = time.time() + end = start + _MORE_TIME + + def sleep_some_time_then_close(): + time.sleep(_SOME_TIME) + channel.close() + + for _ in range(test_constants.THREAD_CONCURRENCY): + close_thread = threading.Thread(target=sleep_some_time_then_close) + close_thread.start() + while True: + request_iterator.add(b'def') + time.sleep(_BEAT) + if end < time.time(): + break + request_iterator.close() + + self.assertIs(response_iterator.code(), grpc.StatusCode.CANCELLED) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_compression_test.py b/src/python/grpcio_tests/tests/unit/_compression_test.py index 7550cd39ba..0b11f03adf 100644 --- a/src/python/grpcio_tests/tests/unit/_compression_test.py +++ b/src/python/grpcio_tests/tests/unit/_compression_test.py @@ -52,9 +52,9 @@ class _MethodHandler(grpc.RpcMethodHandler): self.stream_unary = None self.stream_stream = None if self.request_streaming and self.response_streaming: - self.stream_stream = lambda x, y: handle_stream(x, y) + self.stream_stream = handle_stream elif not self.request_streaming and not self.response_streaming: - self.unary_unary = lambda x, y: handle_unary(x, y) + self.unary_unary = handle_unary class _GenericHandler(grpc.GenericRpcHandler): diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 3765ce4fb0..578a3d79ad 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from grpc.framework.foundation import logging_pool from tests.unit.framework.common import test_constants +from tests.unit._cython import test_utilities _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -30,6 +31,8 @@ _RECEIVE_MESSAGE_TAG = 'receive_message' _SERVER_COMPLETE_CALL_TAG = 'server_complete_call' _SUCCESS_CALL_FRACTION = 1.0 / 8.0 +_SUCCESSFUL_CALLS = int(test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) +_UNSUCCESSFUL_CALLS = test_constants.RPC_CONCURRENCY - _SUCCESSFUL_CALLS class _State(object): @@ -43,7 +46,7 @@ class _State(object): def _is_cancellation_event(event): return (event.tag is _RECEIVE_CLOSE_ON_SERVER_TAG and - event.batch_operations[0].received_cancelled) + event.batch_operations[0].cancelled()) class _Handler(object): @@ -150,7 +153,8 @@ class CancelManyCallsTest(unittest.TestCase): server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), None, + None) state = _State() @@ -165,31 +169,33 @@ class CancelManyCallsTest(unittest.TestCase): client_condition = threading.Condition() client_due = set() - client_completion_queue = cygrpc.CompletionQueue() - client_driver = _QueueDriver(client_condition, client_completion_queue, - client_due) - client_driver.start() with client_condition: client_calls = [] for index in range(test_constants.RPC_CONCURRENCY): - client_call = channel.create_call(None, _EMPTY_FLAGS, - client_completion_queue, - b'/twinkies', None, None) - operations = ( - cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ) tag = 'client_complete_call_{0:04d}_tag'.format(index) - client_call.start_client_batch(operations, tag) + client_call = channel.integrated_call( + _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, + None, (( + ( + cygrpc.SendInitialMetadataOperation( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation( + _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ), + tag, + ),)) client_due.add(tag) client_calls.append(client_call) + client_events_future = test_utilities.SimpleFuture( + lambda: tuple(channel.next_call_event() for _ in range(_SUCCESSFUL_CALLS))) + with state.condition: while True: if state.parked_handlers < test_constants.THREAD_CONCURRENCY: @@ -201,12 +207,14 @@ class CancelManyCallsTest(unittest.TestCase): state.condition.notify_all() break - client_driver.events( - test_constants.RPC_CONCURRENCY * _SUCCESS_CALL_FRACTION) + client_events_future.result() with client_condition: for client_call in client_calls: - client_call.cancel() + client_call.cancel(cygrpc.StatusCode.cancelled, 'Cancelled!') + for _ in range(_UNSUCCESSFUL_CALLS): + channel.next_call_event() + channel.close(cygrpc.StatusCode.unknown, 'Cancelled on channel close!') with state.condition: server.shutdown(server_completion_queue, _SERVER_SHUTDOWN_TAG) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py index 7305d0fa3f..d95286071d 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_channel_test.py @@ -21,25 +21,20 @@ from grpc._cython import cygrpc from tests.unit.framework.common import test_constants -def _channel_and_completion_queue(): - channel = cygrpc.Channel(b'localhost:54321', ()) - completion_queue = cygrpc.CompletionQueue() - return channel, completion_queue +def _channel(): + return cygrpc.Channel(b'localhost:54321', (), None) -def _connectivity_loop(channel, completion_queue): +def _connectivity_loop(channel): for _ in range(100): connectivity = channel.check_connectivity_state(True) - channel.watch_connectivity_state(connectivity, - time.time() + 0.2, completion_queue, - None) - completion_queue.poll() + channel.watch_connectivity_state(connectivity, time.time() + 0.2) def _create_loop_destroy(): - channel, completion_queue = _channel_and_completion_queue() - _connectivity_loop(channel, completion_queue) - completion_queue.shutdown() + channel = _channel() + _connectivity_loop(channel) + channel.close(cygrpc.StatusCode.ok, 'Channel close!') def _in_parallel(behavior, arguments): @@ -55,12 +50,9 @@ def _in_parallel(behavior, arguments): class ChannelTest(unittest.TestCase): def test_single_channel_lonely_connectivity(self): - channel, completion_queue = _channel_and_completion_queue() - _in_parallel(_connectivity_loop, ( - channel, - completion_queue, - )) - completion_queue.shutdown() + channel = _channel() + _connectivity_loop(channel) + channel.close(cygrpc.StatusCode.ok, 'Channel close!') def test_multiple_channels_lonely_connectivity(self): _in_parallel(_create_loop_destroy, ()) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_common.py b/src/python/grpcio_tests/tests/unit/_cython/_common.py index 7fd3d19b4e..d8210f36f8 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_common.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_common.py @@ -100,7 +100,8 @@ class RpcTest(object): self.server.register_completion_queue(self.server_completion_queue) port = self.server.add_http2_port(b'[::]:0') self.server.start() - self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), []) + self.channel = cygrpc.Channel('localhost:{}'.format(port).encode(), [], + None) self._server_shutdown_tag = 'server_shutdown_tag' self.server_condition = threading.Condition() diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index 7caa98f72d..8a721788f4 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from tests.unit._cython import _common +from tests.unit._cython import test_utilities class Test(_common.RpcTest, unittest.TestCase): @@ -41,31 +42,27 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, - self.client_completion_queue, - b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with self.client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - self.assertEqual(cygrpc.CallError.ok, - client_receive_initial_metadata_start_batch_result) - client_complete_rpc_start_batch_result = client_call.start_client_batch( + client_call = self.channel.integrated_call( + _common.EMPTY_FLAGS, b'/twinkies', None, None, + _common.INVOCATION_METADATA, None, [( [ - cygrpc.SendInitialMetadataOperation( - _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), - ], client_complete_rpc_tag) - self.assertEqual(cygrpc.CallError.ok, - client_complete_rpc_start_batch_result) - self.client_driver.add_due({ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag, - client_complete_rpc_tag, - }) + )]) + client_call.operate([ + cygrpc.SendInitialMetadataOperation(_common.INVOCATION_METADATA, + _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), + ], client_complete_rpc_tag) + + client_events_future = test_utilities.SimpleFuture( + lambda: [ + self.channel.next_call_event(), + self.channel.next_call_event(),]) server_request_call_event = self.server_driver.event_with_tag( server_request_call_tag) @@ -96,20 +93,23 @@ class Test(_common.RpcTest, unittest.TestCase): server_complete_rpc_event = server_call_driver.event_with_tag( server_complete_rpc_tag) - client_receive_initial_metadata_event = self.client_driver.event_with_tag( - client_receive_initial_metadata_tag) - client_complete_rpc_event = self.client_driver.event_with_tag( - client_complete_rpc_tag) + client_events = client_events_future.result() + if client_events[0].tag is client_receive_initial_metadata_tag: + client_receive_initial_metadata_event = client_events[0] + client_complete_rpc_event = client_events[1] + else: + client_complete_rpc_event = client_events[0] + client_receive_initial_metadata_event = client_events[1] return ( _common.OperationResult(server_request_call_start_batch_result, server_request_call_event.completion_type, server_request_call_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, + cygrpc.CallError.ok, client_receive_initial_metadata_event.completion_type, client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, + _common.OperationResult(cygrpc.CallError.ok, client_complete_rpc_event.completion_type, client_complete_rpc_event.success), _common.OperationResult( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index 8582a39c01..47f39ebce2 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -19,6 +19,7 @@ import unittest from grpc._cython import cygrpc from tests.unit._cython import _common +from tests.unit._cython import test_utilities class Test(_common.RpcTest, unittest.TestCase): @@ -36,28 +37,31 @@ class Test(_common.RpcTest, unittest.TestCase): server_request_call_tag, }) - client_call = self.channel.create_call(None, _common.EMPTY_FLAGS, - self.client_completion_queue, - b'/twinkies', None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with self.client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - client_complete_rpc_start_batch_result = client_call.start_client_batch( - [ - cygrpc.SendInitialMetadataOperation( - _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), - ], client_complete_rpc_tag) - self.client_driver.add_due({ - client_receive_initial_metadata_tag, - client_complete_rpc_tag, - }) - + client_call = self.channel.integrated_call( + _common.EMPTY_FLAGS, b'/twinkies', None, None, + _common.INVOCATION_METADATA, None, [ + ( + [ + cygrpc.SendInitialMetadataOperation( + _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation( + _common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation( + _common.EMPTY_FLAGS), + ], + client_complete_rpc_tag, + ), + ]) + client_call.operate([ + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), + ], client_receive_initial_metadata_tag) + + client_events_future = test_utilities.SimpleFuture( + lambda: [ + self.channel.next_call_event(), + self.channel.next_call_event(),]) server_request_call_event = self.server_driver.event_with_tag( server_request_call_tag) @@ -87,20 +91,19 @@ class Test(_common.RpcTest, unittest.TestCase): server_complete_rpc_event = self.server_driver.event_with_tag( server_complete_rpc_tag) - client_receive_initial_metadata_event = self.client_driver.event_with_tag( - client_receive_initial_metadata_tag) - client_complete_rpc_event = self.client_driver.event_with_tag( - client_complete_rpc_tag) + client_events = client_events_future.result() + client_receive_initial_metadata_event = client_events[0] + client_complete_rpc_event = client_events[1] return ( _common.OperationResult(server_request_call_start_batch_result, server_request_call_event.completion_type, server_request_call_event.success), _common.OperationResult( - client_receive_initial_metadata_start_batch_result, + cygrpc.CallError.ok, client_receive_initial_metadata_event.completion_type, client_receive_initial_metadata_event.success), - _common.OperationResult(client_complete_rpc_start_batch_result, + _common.OperationResult(cygrpc.CallError.ok, client_complete_rpc_event.completion_type, client_complete_rpc_event.success), _common.OperationResult( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index bc63b54879..8a903bfaf9 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -17,6 +17,7 @@ import threading import unittest from grpc._cython import cygrpc +from tests.unit._cython import test_utilities _EMPTY_FLAGS = 0 _EMPTY_METADATA = () @@ -118,7 +119,8 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server.register_completion_queue(server_completion_queue) port = server.add_http2_port(b'[::]:0') server.start() - channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set()) + channel = cygrpc.Channel('localhost:{}'.format(port).encode(), set(), + None) server_shutdown_tag = 'server_shutdown_tag' server_driver = _ServerDriver(server_completion_queue, @@ -127,10 +129,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_condition = threading.Condition() client_due = set() - client_completion_queue = cygrpc.CompletionQueue() - client_driver = _QueueDriver(client_condition, client_completion_queue, - client_due) - client_driver.start() server_call_condition = threading.Condition() server_send_initial_metadata_tag = 'server_send_initial_metadata_tag' @@ -154,25 +152,28 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_completion_queue, server_rpc_tag) - client_call = channel.create_call(None, _EMPTY_FLAGS, - client_completion_queue, b'/twinkies', - None, None) client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag' client_complete_rpc_tag = 'client_complete_rpc_tag' - with client_condition: - client_receive_initial_metadata_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - ], client_receive_initial_metadata_tag)) - client_due.add(client_receive_initial_metadata_tag) - client_complete_rpc_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ], client_complete_rpc_tag)) - client_due.add(client_complete_rpc_tag) + client_call = channel.segregated_call( + _EMPTY_FLAGS, b'/twinkies', None, None, _EMPTY_METADATA, None, ( + ( + [ + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], + client_receive_initial_metadata_tag, + ), + ( + [ + cygrpc.SendInitialMetadataOperation( + _EMPTY_METADATA, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + client_complete_rpc_tag, + ), + )) + client_receive_initial_metadata_event_future = test_utilities.SimpleFuture( + client_call.next_event) server_rpc_event = server_driver.first_event() @@ -208,19 +209,20 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): server_complete_rpc_tag) server_call_driver.events() - with client_condition: - client_receive_first_message_tag = 'client_receive_first_message_tag' - client_receive_first_message_start_batch_result = ( - client_call.start_client_batch([ - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - ], client_receive_first_message_tag)) - client_due.add(client_receive_first_message_tag) - client_receive_first_message_event = client_driver.event_with_tag( - client_receive_first_message_tag) + client_recieve_initial_metadata_event = client_receive_initial_metadata_event_future.result( + ) + + client_receive_first_message_tag = 'client_receive_first_message_tag' + client_call.operate([ + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + ], client_receive_first_message_tag) + client_receive_first_message_event = client_call.next_event() - client_call_cancel_result = client_call.cancel() - client_driver.events() + client_call_cancel_result = client_call.cancel( + cygrpc.StatusCode.cancelled, 'Cancelled during test!') + client_complete_rpc_event = client_call.next_event() + channel.close(cygrpc.StatusCode.unknown, 'Channel closed!') server.shutdown(server_completion_queue, server_shutdown_tag) server.cancel_all_calls() server_driver.events() @@ -228,11 +230,6 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): self.assertEqual(cygrpc.CallError.ok, request_call_result) self.assertEqual(cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, - client_receive_initial_metadata_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, - client_complete_rpc_start_batch_result) - self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result) self.assertIs(server_rpc_tag, server_rpc_event.tag) self.assertEqual(cygrpc.CompletionType.operation_complete, server_rpc_event.completion_type) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 9045ff58a0..724a690746 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -51,8 +51,8 @@ class TypeSmokeTest(unittest.TestCase): del server def testChannelUpDown(self): - channel = cygrpc.Channel(b'[::]:0', None) - del channel + channel = cygrpc.Channel(b'[::]:0', None, None) + channel.close(cygrpc.StatusCode.cancelled, 'Test method anyway!') def test_metadata_plugin_call_credentials_up_down(self): cygrpc.MetadataPluginCallCredentials(_metadata_plugin, @@ -121,7 +121,7 @@ class ServerClientMixin(object): client_credentials) else: self.client_channel = cygrpc.Channel('localhost:{}'.format( - self.port).encode(), set()) + self.port).encode(), set(), None) if host_override: self.host_argument = None # default host self.expected_host = host_override @@ -131,17 +131,20 @@ class ServerClientMixin(object): self.expected_host = self.host_argument def tearDownMixin(self): + self.client_channel.close(cygrpc.StatusCode.ok, 'test being torn down!') + del self.client_channel del self.server del self.client_completion_queue del self.server_completion_queue - def _perform_operations(self, operations, call, queue, deadline, - description): - """Perform the list of operations with given call, queue, and deadline. + def _perform_queue_operations(self, operations, call, queue, deadline, + description): + """Perform the operations with given call, queue, and deadline. - Invocation errors are reported with as an exception with `description` in - the message. Performs the operations asynchronously, returning a future. - """ + Invocation errors are reported with as an exception with `description` + in the message. Performs the operations asynchronously, returning a + future. + """ def performer(): tag = object() @@ -185,9 +188,6 @@ class ServerClientMixin(object): self.assertEqual(cygrpc.CallError.ok, request_call_result) client_call_tag = object() - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, self.host_argument, - DEADLINE) client_initial_metadata = ( ( CLIENT_METADATA_ASCII_KEY, @@ -198,18 +198,24 @@ class ServerClientMixin(object): CLIENT_METADATA_BIN_VALUE, ), ) - client_start_batch_result = client_call.start_client_batch([ - cygrpc.SendInitialMetadataOperation(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), - cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), - ], client_call_tag) - self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) - client_event_future = test_utilities.CompletionQueuePollFuture( - self.client_completion_queue, DEADLINE) + client_call = self.client_channel.integrated_call( + 0, METHOD, self.host_argument, DEADLINE, client_initial_metadata, + None, [ + ( + [ + cygrpc.SendInitialMetadataOperation( + client_initial_metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + client_call_tag, + ), + ]) + client_event_future = test_utilities.SimpleFuture( + self.client_channel.next_call_event) request_event = self.server_completion_queue.poll(deadline=DEADLINE) self.assertEqual(cygrpc.CompletionType.operation_complete, @@ -285,7 +291,7 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type(), found_server_op_types) + self.assertNotIn(server_result.type(), found_server_op_types) found_server_op_types.add(server_result.type()) if server_result.type() == cygrpc.OperationType.receive_message: self.assertEqual(REQUEST, server_result.message()) @@ -304,66 +310,76 @@ class ServerClientMixin(object): del client_call del server_call - def test6522(self): + def test_6522(self): DEADLINE = time.time() + 5 DEADLINE_TOLERANCE = 0.25 METHOD = b'twinkies' empty_metadata = () + # Prologue server_request_tag = object() self.server.request_call(self.server_completion_queue, self.server_completion_queue, server_request_tag) - client_call = self.client_channel.create_call( - None, 0, self.client_completion_queue, METHOD, self.host_argument, - DEADLINE) - - # Prologue - def perform_client_operations(operations, description): - return self._perform_operations(operations, client_call, - self.client_completion_queue, - DEADLINE, description) - - client_event_future = perform_client_operations([ - cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), - cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), - ], "Client prologue") + client_call = self.client_channel.segregated_call( + 0, METHOD, self.host_argument, DEADLINE, None, None, ([( + [ + cygrpc.SendInitialMetadataOperation(empty_metadata, + _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + ], + object(), + ), ( + [ + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), + ], + object(), + )])) + + client_initial_metadata_event_future = test_utilities.SimpleFuture( + client_call.next_event) request_event = self.server_completion_queue.poll(deadline=DEADLINE) server_call = request_event.call def perform_server_operations(operations, description): - return self._perform_operations(operations, server_call, - self.server_completion_queue, - DEADLINE, description) + return self._perform_queue_operations(operations, server_call, + self.server_completion_queue, + DEADLINE, description) server_event_future = perform_server_operations([ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") - client_event_future.result() # force completion + client_initial_metadata_event_future.result() # force completion server_event_future.result() # Messaging for _ in range(10): - client_event_future = perform_client_operations([ + client_call.operate([ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") + client_message_event_future = test_utilities.SimpleFuture( + client_call.next_event) server_event_future = perform_server_operations([ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") - client_event_future.result() # force completion + client_message_event_future.result() # force completion server_event_future.result() # Epilogue - client_event_future = perform_client_operations([ + client_call.operate([ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), - cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") + # One for ReceiveStatusOnClient, one for SendCloseFromClient. + client_events_future = test_utilities.SimpleFuture( + lambda: { + client_call.next_event(), + client_call.next_event(),}) server_event_future = perform_server_operations([ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), @@ -371,7 +387,7 @@ class ServerClientMixin(object): empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") - client_event_future.result() # force completion + client_events_future.result() # force completion server_event_future.result() diff --git a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py index 4a00b9ef2f..7d5eaaaa84 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py +++ b/src/python/grpcio_tests/tests/unit/_cython/test_utilities.py @@ -25,7 +25,7 @@ class SimpleFuture(object): def wrapped_function(): try: self._result = function(*args, **kwargs) - except Exception as error: + except Exception as error: # pylint: disable=broad-except self._error = error self._result = None @@ -41,7 +41,7 @@ class SimpleFuture(object): self._thread.join() if self._error: # TODO(atash): re-raise exceptions in a way that preserves tracebacks - raise self._error + raise self._error # pylint: disable=raising-bad-type return self._result diff --git a/src/python/grpcio_tests/tests/unit/_exit_test.py b/src/python/grpcio_tests/tests/unit/_exit_test.py index 6e6d9de0fb..f40f3ae07c 100644 --- a/src/python/grpcio_tests/tests/unit/_exit_test.py +++ b/src/python/grpcio_tests/tests/unit/_exit_test.py @@ -49,7 +49,7 @@ def cleanup_processes(): for process in processes: try: process.kill() - except Exception: + except Exception: # pylint: disable=broad-except pass diff --git a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py index e683131722..ad847ae03e 100644 --- a/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py +++ b/src/python/grpcio_tests/tests/unit/_from_grpc_import_star.py @@ -14,7 +14,7 @@ _BEFORE_IMPORT = tuple(globals()) -from grpc import * +from grpc import * # pylint: disable=wildcard-import _AFTER_IMPORT = tuple(globals()) diff --git a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py index 4edf0fc4ad..f153089a24 100644 --- a/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py +++ b/src/python/grpcio_tests/tests/unit/_invalid_metadata_test.py @@ -81,29 +81,16 @@ class InvalidMetadataTest(unittest.TestCase): request = b'\x07\x08' metadata = (('InVaLiD', 'UnaryRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_future = self._unary_unary.future(request, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - response_future.result() - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_future.details(), expected_error_details) - self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._unary_unary.future(request, metadata=metadata) def testUnaryRequestStreamResponse(self): request = b'\x37\x58' metadata = (('InVaLiD', 'UnaryRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_iterator = self._unary_stream(request, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - next(response_iterator) - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_iterator.details(), expected_error_details) - self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._unary_stream(request, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestBlockingUnaryResponse(self): request_iterator = ( @@ -129,32 +116,18 @@ class InvalidMetadataTest(unittest.TestCase): b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestFutureUnaryResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_future = self._stream_unary.future( - request_iterator, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - response_future.result() - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_future.details(), expected_error_details) - self.assertEqual(response_future.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._stream_unary.future(request_iterator, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) def testStreamRequestStreamResponse(self): request_iterator = ( b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH)) metadata = (('InVaLiD', 'StreamRequestStreamResponse'),) expected_error_details = "metadata was invalid: %s" % metadata - response_iterator = self._stream_stream( - request_iterator, metadata=metadata) - with self.assertRaises(grpc.RpcError) as exception_context: - next(response_iterator) - self.assertEqual(exception_context.exception.details(), - expected_error_details) - self.assertEqual(exception_context.exception.code(), - grpc.StatusCode.INTERNAL) - self.assertEqual(response_iterator.details(), expected_error_details) - self.assertEqual(response_iterator.code(), grpc.StatusCode.INTERNAL) + with self.assertRaises(ValueError) as exception_context: + self._stream_stream(request_iterator, metadata=metadata) + self.assertIn(expected_error_details, str(exception_context.exception)) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py index e40cca8b24..93a5fdf9ff 100644 --- a/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py +++ b/src/python/grpcio_tests/tests/unit/_invocation_defects_test.py @@ -165,11 +165,13 @@ class FailAfterFewIterationsCounter(object): def __next__(self): if self._current >= self._high: - raise Exception("This is a deliberate failure in a unit test.") + raise test_control.Defect() else: self._current += 1 return self._bytestring + next = __next__ + def _unary_unary_multi_callable(channel): return channel.unary_unary(_UNARY_UNARY) diff --git a/src/python/grpcio_tests/tests/unit/_reconnect_test.py b/src/python/grpcio_tests/tests/unit/_reconnect_test.py index 8acba5a30b..a708d8d862 100644 --- a/src/python/grpcio_tests/tests/unit/_reconnect_test.py +++ b/src/python/grpcio_tests/tests/unit/_reconnect_test.py @@ -89,7 +89,10 @@ class ReconnectTest(unittest.TestCase): multi_callable = channel.unary_unary(_UNARY_UNARY) self.assertEqual(_RESPONSE, multi_callable(_REQUEST)) server.stop(None) - time.sleep(1) + # By default, the channel connectivity is checked every 5s + # GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS can be set to change + # this. + time.sleep(5.1) server = grpc.server(server_pool, (handler,)) server.add_insecure_port('[::]:{}'.format(port)) server.start() diff --git a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py b/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py deleted file mode 100644 index 18f5af058a..0000000000 --- a/src/python/grpcio_tests/tests/unit/_thread_cleanup_test.py +++ /dev/null @@ -1,115 +0,0 @@ -# Copyright 2016 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for CleanupThread.""" - -import threading -import time -import unittest - -from grpc import _common - -_SHORT_TIME = 0.5 -_LONG_TIME = 5.0 -_EPSILON = 0.5 - - -def cleanup(timeout): - if timeout is not None: - time.sleep(timeout) - else: - time.sleep(_LONG_TIME) - - -def slow_cleanup(timeout): - # Don't respect timeout - time.sleep(_LONG_TIME) - - -class CleanupThreadTest(unittest.TestCase): - - def testTargetInvocation(self): - event = threading.Event() - - def target(arg1, arg2, arg3=None): - self.assertEqual('arg1', arg1) - self.assertEqual('arg2', arg2) - self.assertEqual('arg3', arg3) - event.set() - - cleanup_thread = _common.CleanupThread( - behavior=lambda x: None, - target=target, - name='test-name', - args=('arg1', 'arg2'), - kwargs={ - 'arg3': 'arg3' - }) - cleanup_thread.start() - cleanup_thread.join() - self.assertEqual(cleanup_thread.name, 'test-name') - self.assertTrue(event.is_set()) - - def testJoinNoTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join() - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowBehavior(self): - cleanup_thread = _common.CleanupThread(behavior=slow_cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _LONG_TIME, end_time - start_time, delta=_EPSILON) - - def testJoinTimeoutSlowTarget(self): - event = threading.Event() - - def target(): - event.wait(_LONG_TIME) - - cleanup_thread = _common.CleanupThread(behavior=cleanup, target=target) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(_SHORT_TIME) - end_time = time.time() - self.assertAlmostEqual( - _SHORT_TIME, end_time - start_time, delta=_EPSILON) - event.set() - - def testJoinZeroTimeout(self): - cleanup_thread = _common.CleanupThread(behavior=cleanup) - cleanup_thread.start() - start_time = time.time() - cleanup_thread.join(0) - end_time = time.time() - self.assertAlmostEqual(0, end_time - start_time, delta=_EPSILON) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py index 61c03f64ba..b43c647fc9 100644 --- a/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py +++ b/src/python/grpcio_tests/tests/unit/beta/_beta_features_test.py @@ -65,7 +65,7 @@ class _Servicer(object): self._serviced = True self._condition.notify_all() return - yield + yield # pylint: disable=unreachable def stream_unary(self, request_iterator, context): for request in request_iterator: diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py deleted file mode 100644 index c99738e085..0000000000 --- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests Face interface compliance of the gRPC Python Beta API.""" - -import collections -import unittest - -import six - -from grpc.beta import implementations -from grpc.beta import interfaces -from tests.unit import resources -from tests.unit import test_common as grpc_test_common -from tests.unit.beta import test_utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces - -_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' - - -class _SerializationBehaviors( - collections.namedtuple('_SerializationBehaviors', ( - 'request_serializers', - 'request_deserializers', - 'response_serializers', - 'response_deserializers', - ))): - pass - - -def _serialization_behaviors_from_test_methods(test_methods): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), test_method in six.iteritems(test_methods): - request_serializers[group, method] = test_method.serialize_request - request_deserializers[group, method] = test_method.deserialize_request - response_serializers[group, method] = test_method.serialize_response - response_deserializers[group, method] = test_method.deserialize_response - return _SerializationBehaviors(request_serializers, request_deserializers, - response_serializers, response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate(self, methods, method_implementations, - multi_method_implementation): - serialization_behaviors = _serialization_behaviors_from_test_methods( - methods) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - service = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods) - } - - server_options = implementations.server_options( - request_deserializers=serialization_behaviors.request_deserializers, - response_serializers=serialization_behaviors.response_serializers, - thread_pool_size=test_constants.POOL_SIZE) - server = implementations.server( - method_implementations, options=server_options) - server_credentials = implementations.ssl_server_credentials([ - ( - resources.private_key(), - resources.certificate_chain(), - ), - ]) - port = server.add_secure_port('[::]:0', server_credentials) - server.start() - channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE) - stub_options = implementations.stub_options( - request_serializers=serialization_behaviors.request_serializers, - response_deserializers=serialization_behaviors. - response_deserializers, - thread_pool_size=test_constants.POOL_SIZE) - generic_stub = implementations.generic_stub( - channel, options=stub_options) - dynamic_stub = implementations.dynamic_stub( - channel, service, cardinalities, options=stub_options) - return generic_stub, {service: dynamic_stub}, server - - def destantiate(self, memo): - memo.stop(test_constants.SHORT_TIMEOUT).wait() - - def invocation_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def terminal_metadata(self): - return grpc_test_common.SERVICE_TERMINAL_METADATA - - def code(self): - return interfaces.StatusCode.OK - - def details(self): - return grpc_test_common.DETAILS - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py deleted file mode 100644 index 5fb4f3c3cf..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py deleted file mode 100644 index 6eb7ba33f6..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""A test constant working around issue 3069.""" - -# test_constants is referenced from specification in this module. -from tests.unit.framework.common import test_constants # pylint: disable=unused-import - -# TODO(issue 3069): Replace uses of this constant with -# test_constants.SHORT_TIMEOUT. -REALLY_SHORT_TIMEOUT = 0.1 diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py deleted file mode 100644 index 5fb4f3c3cf..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py deleted file mode 100644 index 5d8679aa62..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ /dev/null @@ -1,287 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test code for the Face layer of RPC Framework.""" - -from __future__ import division - -import abc -import itertools -import unittest -from concurrent import futures - -import six - -# test_interfaces is referenced from specification in this module. -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class TestCase( - six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, - unittest.TestCase)): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - - NAME = 'BlockingInvocationInlineServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, - self._control, None) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.inline_method_implementations, - None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response, call = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(request, response, self) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response, call = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(requests, response, self) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response = self._invoker.blocking(group, method)( - first_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(first_request, first_response, self) - - second_response = self._invoker.blocking(group, method)( - second_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() - for response_future in response_futures - ] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - pool.shutdown(wait=True) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures_to_indices[response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], - response_future.result(), self) - pool.shutdown(wait=True) - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestStreamResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestStreamResponse(self): - raise NotImplementedError() - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py deleted file mode 100644 index b1c33da43a..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py +++ /dev/null @@ -1,432 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Code for making a service.TestService more amenable to use in tests.""" - -import collections -import threading - -import six - -# test_control, _service, and test_interfaces are referenced from specification -# in this module. -from grpc.framework.common import cardinality -from grpc.framework.common import style -from grpc.framework.foundation import stream -from grpc.framework.foundation import stream_util -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_control # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - -_IDENTITY = lambda x: x - - -class TestServiceDigest( - collections.namedtuple('TestServiceDigest', ( - 'methods', - 'inline_method_implementations', - 'event_method_implementations', - 'multi_method_implementation', - 'unary_unary_messages_sequences', - 'unary_stream_messages_sequences', - 'stream_unary_messages_sequences', - 'stream_stream_messages_sequences', - ))): - """A transformation of a service.TestService. - - Attributes: - methods: A dict from method group-name pair to test_interfaces.Method object - describing the RPC methods that may be called during the test. - inline_method_implementations: A dict from method group-name pair to - face.MethodImplementation object to be used in tests of in-line calls to - behaviors under test. - event_method_implementations: A dict from method group-name pair to - face.MethodImplementation object to be used in tests of event-driven calls - to behaviors under test. - multi_method_implementation: A face.MultiMethodImplementation to be used in - tests of generic calls to behaviors under test. - unary_unary_messages_sequences: A dict from method group-name pair to - sequence of service.UnaryUnaryTestMessages objects to be used to test the - identified method. - unary_stream_messages_sequences: A dict from method group-name pair to - sequence of service.UnaryStreamTestMessages objects to be used to test the - identified method. - stream_unary_messages_sequences: A dict from method group-name pair to - sequence of service.StreamUnaryTestMessages objects to be used to test the - identified method. - stream_stream_messages_sequences: A dict from method group-name pair to - sequence of service.StreamStreamTestMessages objects to be used to test - the identified method. - """ - - -class _BufferingConsumer(stream.Consumer): - """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" - - def __init__(self): - self.consumed = [] - self.terminated = False - - def consume(self, value): - self.consumed.append(value) - - def terminate(self): - self.terminated = True - - def consume_and_terminate(self, value): - self.consumed.append(value) - self.terminated = True - - -class _InlineUnaryUnaryMethod(face.MethodImplementation): - - def __init__(self, unary_unary_test_method, control): - self._test_method = unary_unary_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.INLINE - - def unary_unary_inline(self, request, context): - response_list = [] - self._test_method.service(request, response_list.append, context, - self._control) - return response_list.pop(0) - - -class _EventUnaryUnaryMethod(face.MethodImplementation): - - def __init__(self, unary_unary_test_method, control, pool): - self._test_method = unary_unary_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.EVENT - - def unary_unary_event(self, request, response_callback, context): - if self._pool is None: - self._test_method.service(request, response_callback, context, - self._control) - else: - self._pool.submit(self._test_method.service, request, - response_callback, context, self._control) - - -class _InlineUnaryStreamMethod(face.MethodImplementation): - - def __init__(self, unary_stream_test_method, control): - self._test_method = unary_stream_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.INLINE - - def unary_stream_inline(self, request, context): - response_consumer = _BufferingConsumer() - self._test_method.service(request, response_consumer, context, - self._control) - for response in response_consumer.consumed: - yield response - - -class _EventUnaryStreamMethod(face.MethodImplementation): - - def __init__(self, unary_stream_test_method, control, pool): - self._test_method = unary_stream_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.EVENT - - def unary_stream_event(self, request, response_consumer, context): - if self._pool is None: - self._test_method.service(request, response_consumer, context, - self._control) - else: - self._pool.submit(self._test_method.service, request, - response_consumer, context, self._control) - - -class _InlineStreamUnaryMethod(face.MethodImplementation): - - def __init__(self, stream_unary_test_method, control): - self._test_method = stream_unary_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.INLINE - - def stream_unary_inline(self, request_iterator, context): - response_list = [] - request_consumer = self._test_method.service(response_list.append, - context, self._control) - for request in request_iterator: - request_consumer.consume(request) - request_consumer.terminate() - return response_list.pop(0) - - -class _EventStreamUnaryMethod(face.MethodImplementation): - - def __init__(self, stream_unary_test_method, control, pool): - self._test_method = stream_unary_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.EVENT - - def stream_unary_event(self, response_callback, context): - request_consumer = self._test_method.service(response_callback, context, - self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _InlineStreamStreamMethod(face.MethodImplementation): - - def __init__(self, stream_stream_test_method, control): - self._test_method = stream_stream_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.INLINE - - def stream_stream_inline(self, request_iterator, context): - response_consumer = _BufferingConsumer() - request_consumer = self._test_method.service(response_consumer, context, - self._control) - - for request in request_iterator: - request_consumer.consume(request) - while response_consumer.consumed: - yield response_consumer.consumed.pop(0) - response_consumer.terminate() - - -class _EventStreamStreamMethod(face.MethodImplementation): - - def __init__(self, stream_stream_test_method, control, pool): - self._test_method = stream_stream_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.EVENT - - def stream_stream_event(self, response_consumer, context): - request_consumer = self._test_method.service(response_consumer, context, - self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _UnaryConsumer(stream.Consumer): - """A Consumer that only allows consumption of exactly one value.""" - - def __init__(self, action): - self._lock = threading.Lock() - self._action = action - self._consumed = False - self._terminated = False - - def consume(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - - self._action(value) - - def terminate(self): - with self._lock: - if not self._consumed: - raise ValueError('Unary consumer hasn\'t yet consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._terminated = True - - def consume_and_terminate(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - self._terminated = True - - self._action(value) - - -class _UnaryUnaryAdaptation(object): - - def __init__(self, unary_unary_test_method): - self._method = unary_unary_test_method - - def service(self, response_consumer, context, control): - - def action(request): - self._method.service(request, - response_consumer.consume_and_terminate, - context, control) - - return _UnaryConsumer(action) - - -class _UnaryStreamAdaptation(object): - - def __init__(self, unary_stream_test_method): - self._method = unary_stream_test_method - - def service(self, response_consumer, context, control): - - def action(request): - self._method.service(request, response_consumer, context, control) - - return _UnaryConsumer(action) - - -class _StreamUnaryAdaptation(object): - - def __init__(self, stream_unary_test_method): - self._method = stream_unary_test_method - - def service(self, response_consumer, context, control): - return self._method.service(response_consumer.consume_and_terminate, - context, control) - - -class _MultiMethodImplementation(face.MultiMethodImplementation): - - def __init__(self, methods, control, pool): - self._methods = methods - self._control = control - self._pool = pool - - def service(self, group, name, response_consumer, context): - method = self._methods.get(group, name, None) - if method is None: - raise face.NoSuchMethodError(group, name) - elif self._pool is None: - return method(response_consumer, context, self._control) - else: - request_consumer = method(response_consumer, context, self._control) - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _Assembly( - collections.namedtuple( - '_Assembly', - ['methods', 'inlines', 'events', 'adaptations', 'messages'])): - """An intermediate structure created when creating a TestServiceDigest.""" - - -def _assemble(scenarios, identifiers, inline_method_constructor, - event_method_constructor, adapter, control, pool): - """Creates an _Assembly from the given scenarios.""" - methods = {} - inlines = {} - events = {} - adaptations = {} - messages = {} - for identifier, scenario in six.iteritems(scenarios): - if identifier in identifiers: - raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) - - test_method = scenario[0] - inline_method = inline_method_constructor(test_method, control) - event_method = event_method_constructor(test_method, control, pool) - adaptation = adapter(test_method) - - methods[identifier] = test_method - inlines[identifier] = inline_method - events[identifier] = event_method - adaptations[identifier] = adaptation - messages[identifier] = scenario[1] - - return _Assembly(methods, inlines, events, adaptations, messages) - - -def digest(service, control, pool): - """Creates a TestServiceDigest from a TestService. - - Args: - service: A _service.TestService. - control: A test_control.Control. - pool: If RPC methods should be serviced in a separate thread, a thread pool. - None if RPC methods should be serviced in the thread belonging to the - run-time that calls for their service. - - Returns: - A TestServiceDigest synthesized from the given service.TestService. - """ - identifiers = set() - - unary_unary = _assemble(service.unary_unary_scenarios(), identifiers, - _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod, - _UnaryUnaryAdaptation, control, pool) - identifiers.update(unary_unary.inlines) - - unary_stream = _assemble(service.unary_stream_scenarios(), identifiers, - _InlineUnaryStreamMethod, _EventUnaryStreamMethod, - _UnaryStreamAdaptation, control, pool) - identifiers.update(unary_stream.inlines) - - stream_unary = _assemble(service.stream_unary_scenarios(), identifiers, - _InlineStreamUnaryMethod, _EventStreamUnaryMethod, - _StreamUnaryAdaptation, control, pool) - identifiers.update(stream_unary.inlines) - - stream_stream = _assemble(service.stream_stream_scenarios(), identifiers, - _InlineStreamStreamMethod, - _EventStreamStreamMethod, _IDENTITY, control, - pool) - identifiers.update(stream_stream.inlines) - - methods = dict(unary_unary.methods) - methods.update(unary_stream.methods) - methods.update(stream_unary.methods) - methods.update(stream_stream.methods) - adaptations = dict(unary_unary.adaptations) - adaptations.update(unary_stream.adaptations) - adaptations.update(stream_unary.adaptations) - adaptations.update(stream_stream.adaptations) - inlines = dict(unary_unary.inlines) - inlines.update(unary_stream.inlines) - inlines.update(stream_unary.inlines) - inlines.update(stream_stream.inlines) - events = dict(unary_unary.events) - events.update(unary_stream.events) - events.update(stream_unary.events) - events.update(stream_stream.events) - - return TestServiceDigest(methods, inlines, events, - _MultiMethodImplementation(adaptations, control, - pool), - unary_unary.messages, unary_stream.messages, - stream_unary.messages, stream_stream.messages) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py deleted file mode 100644 index 3d9b2816aa..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ /dev/null @@ -1,508 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test code for the Face layer of RPC Framework.""" - -from __future__ import division - -import abc -import contextlib -import itertools -import threading -import unittest -from concurrent import futures - -import six - -# test_interfaces is referenced from specification in this module. -from grpc.framework.foundation import future -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class _PauseableIterator(object): - - def __init__(self, upstream): - self._upstream = upstream - self._condition = threading.Condition() - self._paused = False - - @contextlib.contextmanager - def pause(self): - with self._condition: - self._paused = True - yield - with self._condition: - self._paused = False - self._condition.notify_all() - - def __iter__(self): - return self - - def __next__(self): - return self.next() - - def next(self): - with self._condition: - while self._paused: - self._condition.wait() - return next(self._upstream) - - -class _Callback(object): - - def __init__(self): - self._condition = threading.Condition() - self._called = False - self._passed_future = None - self._passed_other_stuff = None - - def __call__(self, *args, **kwargs): - with self._condition: - self._called = True - if args: - self._passed_future = args[0] - if 1 < len(args) or kwargs: - self._passed_other_stuff = tuple(args[1:]), dict(kwargs) - self._condition.notify_all() - - def future(self): - with self._condition: - while True: - if self._passed_other_stuff is not None: - raise ValueError( - 'Test callback passed unexpected values: %s', - self._passed_other_stuff) - elif self._called: - return self._passed_future - else: - self._condition.wait() - - -class TestCase( - six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, - unittest.TestCase)): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - - NAME = 'FutureInvocationAsynchronousEventServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) - self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, - self._control, self._digest_pool) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.event_method_implementations, - None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - self._digest_pool.shutdown(wait=True) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - response = response_future.result() - - test_messages.verify(request, response, self) - self.assertIs(callback.future(), response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - callback = _Callback() - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_future = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - future_passed_to_callback = callback.future() - response = future_passed_to_callback.result() - - test_messages.verify(requests, response, self) - self.assertIs(future_passed_to_callback, response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_iterator = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - - test_messages.verify(first_request, first_response, self) - - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - second_response = second_response_future.result() - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() - for response_future in response_futures - ] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - inner_response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - outer_response_future = pool.submit( - inner_response_future.result) - requests.append(request) - response_futures_to_indices[outer_response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], - response_future.result(), self) - pool.shutdown(wait=True) - - def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance(response_future.exception(), - face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance(response_future.exception(), - face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py deleted file mode 100644 index efc93d56b0..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Coverage across the Face layer's generic-to-dynamic range for invocation.""" - -import abc - -import six - -from grpc.framework.common import cardinality - -_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', -} - -_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', -} - -_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream', -} - -_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = { - cardinality.Cardinality.UNARY_UNARY: 'unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'stream_stream', -} - - -class Invoker(six.with_metaclass(abc.ABCMeta)): - """A type used to invoke test RPCs.""" - - @abc.abstractmethod - def blocking(self, group, name): - """Invokes an RPC with blocking control flow.""" - raise NotImplementedError() - - @abc.abstractmethod - def future(self, group, name): - """Invokes an RPC with future control flow.""" - raise NotImplementedError() - - @abc.abstractmethod - def event(self, group, name): - """Invokes an RPC with event control flow.""" - raise NotImplementedError() - - -class InvokerConstructor(six.with_metaclass(abc.ABCMeta)): - """A type used to create Invokers.""" - - @abc.abstractmethod - def name(self): - """Specifies the name of the Invoker constructed by this object.""" - raise NotImplementedError() - - @abc.abstractmethod - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - """Constructs an Invoker for the given stubs and methods.""" - raise NotImplementedError() - - -class _GenericInvoker(Invoker): - - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods - - def _behavior(self, group, name, cardinality_to_generic_method): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr(self._stub, - cardinality_to_generic_method[method_cardinality]) - return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) - - def blocking(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) - - def future(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) - - def event(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) - - -class _GenericInvokerConstructor(InvokerConstructor): - - def name(self): - return 'GenericInvoker' - - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _GenericInvoker(generic_stub, methods) - - -class _MultiCallableInvoker(Invoker): - - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods - - def _multi_callable(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - return behavior(group, name) - - def blocking(self, group, name): - return self._multi_callable(group, name) - - def future(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - if method_cardinality in (cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return behavior(group, name).future - else: - return behavior(group, name) - - def event(self, group, name): - return self._multi_callable(group, name).event - - -class _MultiCallableInvokerConstructor(InvokerConstructor): - - def name(self): - return 'MultiCallableInvoker' - - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _MultiCallableInvoker(generic_stub, methods) - - -class _DynamicInvoker(Invoker): - - def __init__(self, dynamic_stubs, methods): - self._stubs = dynamic_stubs - self._methods = methods - - def blocking(self, group, name): - return getattr(self._stubs[group], name) - - def future(self, group, name): - if self._methods[group, name].cardinality() in ( - cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return getattr(self._stubs[group], name).future - else: - return getattr(self._stubs[group], name) - - def event(self, group, name): - return getattr(self._stubs[group], name).event - - -class _DynamicInvokerConstructor(InvokerConstructor): - - def name(self): - return 'DynamicInvoker' - - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - return _DynamicInvoker(dynamic_stubs, methods) - - -def invoker_constructors(): - """Creates a sequence of InvokerConstructors to use in tests of RPCs. - - Returns: - A sequence of InvokerConstructors. - """ - return ( - _GenericInvokerConstructor(), - _MultiCallableInvokerConstructor(), - _DynamicInvokerConstructor(), - ) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py deleted file mode 100644 index f1c96b6dc5..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Private interfaces implemented by data sets used in Face-layer tests.""" - -import abc - -import six - -# face is referenced from specification in this module. -from grpc.framework.interfaces.face import face # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import test_interfaces - - -class UnaryUnaryTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-unary method.""" - - @abc.abstractmethod - def service(self, request, response_callback, context, control): - """Services an RPC that accepts one message and produces one message. - - Args: - request: The single request message for the RPC. - response_callback: A callback to be called to accept the response message - of the RPC. - context: An face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-unary-response message pairings.""" - - @abc.abstractmethod - def request(self): - """Affords a request message. - - Implementations of this method should return a different message with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A request message. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, request, response, test_case): - """Verifies that the computed response matches the given request. - - Args: - request: A request message. - response: A response message. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the request and response do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class UnaryStreamTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-stream method.""" - - @abc.abstractmethod - def service(self, request, response_consumer, context, control): - """Services an RPC that takes one message and produces a stream of messages. - - Args: - request: The single request message for the RPC. - response_consumer: A stream.Consumer to be called to accept the response - messages of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-stream-response message pairings.""" - - @abc.abstractmethod - def request(self): - """Affords a request message. - - Implementations of this method should return a different message with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A request message. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, request, responses, test_case): - """Verifies that the computed responses match the given request. - - Args: - request: A request message. - responses: A sequence of response messages. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the request and responses do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class StreamUnaryTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-unary method.""" - - @abc.abstractmethod - def service(self, response_callback, context, control): - """Services an RPC that takes a stream of messages and produces one message. - - Args: - response_callback: A callback to be called to accept the response message - of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Returns: - A stream.Consumer with which to accept the request messages of the RPC. - The consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing messages to this object. Implementations must not assume that - this object will be called to completion of the request stream or even - called at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-unary-response message pairings.""" - - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. - - Implementations of this method should return a different sequences with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A sequence of request messages. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, requests, response, test_case): - """Verifies that the computed response matches the given requests. - - Args: - requests: A sequence of request messages. - response: A response message. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the requests and response do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class StreamStreamTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-stream method.""" - - @abc.abstractmethod - def service(self, response_consumer, context, control): - """Services an RPC that accepts and produces streams of messages. - - Args: - response_consumer: A stream.Consumer to be called to accept the response - messages of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Returns: - A stream.Consumer with which to accept the request messages of the RPC. - The consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing messages to this object. Implementations must not assume that - this object will be called to completion of the request stream or even - called at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-stream-response message pairings.""" - - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. - - Implementations of this method should return a different sequences with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A sequence of request messages. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, requests, responses, test_case): - """Verifies that the computed response matches the given requests. - - Args: - requests: A sequence of request messages. - responses: A sequence of response messages. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the requests and responses do not match, indicating - that there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class TestService(six.with_metaclass(abc.ABCMeta)): - """A specification of implemented methods to use in tests.""" - - @abc.abstractmethod - def unary_unary_scenarios(self): - """Affords unary-request-unary-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a UnaryUnaryTestMethodImplementation object - and the second element is a sequence of UnaryUnaryTestMethodMessages - objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def unary_stream_scenarios(self): - """Affords unary-request-stream-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a UnaryStreamTestMethodImplementation - object and the second element is a sequence of - UnaryStreamTestMethodMessages objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def stream_unary_scenarios(self): - """Affords stream-request-unary-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a StreamUnaryTestMethodImplementation - object and the second element is a sequence of - StreamUnaryTestMethodMessages objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def stream_stream_scenarios(self): - """Affords stream-request-stream-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a StreamStreamTestMethodImplementation - object and the second element is a sequence of - StreamStreamTestMethodMessages objects. - """ - raise NotImplementedError() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py deleted file mode 100644 index a84e02a79a..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py +++ /dev/null @@ -1,390 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Examples of Python implementations of the stock.proto Stock service.""" - -from grpc.framework.common import cardinality -from grpc.framework.foundation import abandonment -from grpc.framework.foundation import stream -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import _service -from tests.unit._junkdrawer import stock_pb2 - -_STOCK_GROUP_NAME = 'Stock' -_SYMBOL_FORMAT = 'test symbol:%03d' - -# A test-appropriate security-pricing function. :-P -_price = lambda symbol_name: float(hash(symbol_name) % 4096) - - -def _get_last_trade_price(stock_request, stock_reply_callback, control, active): - """A unary-request, unary-response test method.""" - control.control() - if active(): - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - raise abandonment.Abandoned() - - -def _get_last_trade_price_multiple(stock_reply_consumer, control, active): - """A stream-request, stream-response test method.""" - - def stock_reply_for_stock_request(stock_request): - control.control() - if active(): - return stock_pb2.StockReply( - symbol=stock_request.symbol, price=_price(stock_request.symbol)) - else: - raise abandonment.Abandoned() - - class StockRequestConsumer(stream.Consumer): - - def consume(self, stock_request): - stock_reply_consumer.consume( - stock_reply_for_stock_request(stock_request)) - - def terminate(self): - control.control() - stock_reply_consumer.terminate() - - def consume_and_terminate(self, stock_request): - stock_reply_consumer.consume_and_terminate( - stock_reply_for_stock_request(stock_request)) - - return StockRequestConsumer() - - -def _watch_future_trades(stock_request, stock_reply_consumer, control, active): - """A unary-request, stream-response test method.""" - base_price = _price(stock_request.symbol) - for index in range(stock_request.num_trades_to_watch): - control.control() - if active(): - stock_reply_consumer.consume( - stock_pb2.StockReply( - symbol=stock_request.symbol, price=base_price + index)) - else: - raise abandonment.Abandoned() - stock_reply_consumer.terminate() - - -def _get_highest_trade_price(stock_reply_callback, control, active): - """A stream-request, unary-response test method.""" - - class StockRequestConsumer(stream.Consumer): - """Keeps an ongoing record of the most valuable symbol yet consumed.""" - - def __init__(self): - self._symbol = None - self._price = None - - def consume(self, stock_request): - control.control() - if active(): - if self._price is None: - self._symbol = stock_request.symbol - self._price = _price(stock_request.symbol) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - self._symbol = stock_request.symbol - self._price = candidate_price - - def terminate(self): - control.control() - if active(): - if self._symbol is None: - raise ValueError() - else: - stock_reply_callback( - stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) - self._symbol = None - self._price = None - - def consume_and_terminate(self, stock_request): - control.control() - if active(): - if self._price is None: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=candidate_price)) - else: - stock_reply_callback( - stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) - - self._symbol = None - self._price = None - - return StockRequestConsumer() - - -class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation): - """GetLastTradePrice for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetLastTradePrice' - - def cardinality(self): - return cardinality.Cardinality.UNARY_UNARY - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, request, response_callback, context, control): - _get_last_trade_price(request, response_callback, control, - context.is_active) - - -class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages): - - def __init__(self): - self._index = 0 - - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest(symbol=symbol) - - def verify(self, request, response, test_case): - test_case.assertEqual(request.symbol, response.symbol) - test_case.assertEqual(_price(request.symbol), response.price) - - -class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation): - """GetLastTradePriceMultiple for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetLastTradePriceMultiple' - - def cardinality(self): - return cardinality.Cardinality.STREAM_STREAM - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, response_consumer, context, control): - return _get_last_trade_price_multiple(response_consumer, control, - context.is_active) - - -class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages): - """Pairs of message streams for use with GetLastTradePriceMultiple.""" - - def __init__(self): - self._index = 0 - - def requests(self): - base_index = self._index - self._index += 1 - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) - for index in range(test_constants.STREAM_LENGTH) - ] - - def verify(self, requests, responses, test_case): - test_case.assertEqual(len(requests), len(responses)) - for stock_request, stock_reply in zip(requests, responses): - test_case.assertEqual(stock_request.symbol, stock_reply.symbol) - test_case.assertEqual( - _price(stock_request.symbol), stock_reply.price) - - -class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation): - """WatchFutureTrades for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'WatchFutureTrades' - - def cardinality(self): - return cardinality.Cardinality.UNARY_STREAM - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, request, response_consumer, context, control): - _watch_future_trades(request, response_consumer, control, - context.is_active) - - -class WatchFutureTradesMessages(_service.UnaryStreamTestMessages): - """Pairs of a single request message and a sequence of response messages.""" - - def __init__(self): - self._index = 0 - - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest( - symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) - - def verify(self, request, responses, test_case): - test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) - base_price = _price(request.symbol) - for index, response in enumerate(responses): - test_case.assertEqual(base_price + index, response.price) - - -class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation): - """GetHighestTradePrice for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetHighestTradePrice' - - def cardinality(self): - return cardinality.Cardinality.STREAM_UNARY - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, response_callback, context, control): - return _get_highest_trade_price(response_callback, control, - context.is_active) - - -class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages): - - def requests(self): - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) - for index in range(test_constants.STREAM_LENGTH) - ] - - def verify(self, requests, response, test_case): - price = None - symbol = None - for stock_request in requests: - current_symbol = stock_request.symbol - current_price = _price(current_symbol) - if price is None or price < current_price: - price = current_price - symbol = current_symbol - test_case.assertEqual(price, response.price) - test_case.assertEqual(symbol, response.symbol) - - -class StockTestService(_service.TestService): - """A corpus of test data with one method of each RPC cardinality.""" - - def unary_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePrice'): - (GetLastTradePrice(), [GetLastTradePriceMessages()]), - } - - def unary_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'WatchFutureTrades'): - (WatchFutureTrades(), [WatchFutureTradesMessages()]), - } - - def stream_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): - (GetHighestTradePrice(), [GetHighestTradePriceMessages()]) - } - - def stream_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): - (GetLastTradePriceMultiple(), - [GetLastTradePriceMultipleMessages()]), - } - - -STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py deleted file mode 100644 index cff4b7cdea..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tools for creating tests of implementations of the Face layer.""" - -# unittest is referenced from specification in this module. -import unittest # pylint: disable=unused-import - -# test_interfaces is referenced from specification in this module. -from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service -from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service -from tests.unit.framework.interfaces.face import _invocation -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - -_TEST_CASE_SUPERCLASSES = ( - _blocking_invocation_inline_service.TestCase, - _future_invocation_asynchronous_event_service.TestCase, -) - - -def test_cases(implementation): - """Creates unittest.TestCase classes for a given Face layer implementation. - - Args: - implementation: A test_interfaces.Implementation specifying creation and - destruction of a given Face layer implementation. - - Returns: - A sequence of subclasses of unittest.TestCase defining tests of the - specified Face layer implementation. - """ - test_case_classes = [] - for invoker_constructor in _invocation.invoker_constructors(): - for super_class in _TEST_CASE_SUPERCLASSES: - test_case_classes.append( - type( - invoker_constructor.name() + super_class.NAME, - (super_class,), { - 'implementation': implementation, - 'invoker_constructor': invoker_constructor, - '__module__': implementation.__module__, - })) - return test_case_classes diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py deleted file mode 100644 index d0de8e1c54..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Interfaces used in tests of implementations of the Face layer.""" - -import abc - -import six - -from grpc.framework.common import cardinality # pylint: disable=unused-import -from grpc.framework.interfaces.face import face # pylint: disable=unused-import - - -class Method(six.with_metaclass(abc.ABCMeta)): - """Specifies a method to be used in tests.""" - - @abc.abstractmethod - def group(self): - """Identify the group of the method. - - Returns: - The group of the method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def name(self): - """Identify the name of the method. - - Returns: - The name of the method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def cardinality(self): - """Identify the cardinality of the method. - - Returns: - A cardinality.Cardinality value describing the streaming semantics of the - method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def request_class(self): - """Identify the class used for the method's request objects. - - Returns: - The class object of the class to which the method's request objects - belong. - """ - raise NotImplementedError() - - @abc.abstractmethod - def response_class(self): - """Identify the class used for the method's response objects. - - Returns: - The class object of the class to which the method's response objects - belong. - """ - raise NotImplementedError() - - @abc.abstractmethod - def serialize_request(self, request): - """Serialize the given request object. - - Args: - request: A request object appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def deserialize_request(self, serialized_request): - """Synthesize a request object from a given bytestring. - - Args: - serialized_request: A bytestring deserializable into a request object - appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def serialize_response(self, response): - """Serialize the given response object. - - Args: - response: A response object appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def deserialize_response(self, serialized_response): - """Synthesize a response object from a given bytestring. - - Args: - serialized_response: A bytestring deserializable into a response object - appropriate for this method. - """ - raise NotImplementedError() - - -class Implementation(six.with_metaclass(abc.ABCMeta)): - """Specifies an implementation of the Face layer.""" - - @abc.abstractmethod - def instantiate(self, methods, method_implementations, - multi_method_implementation): - """Instantiates the Face layer implementation to be used in a test. - - Args: - methods: A sequence of Method objects describing the methods available to - be called during the test. - method_implementations: A dictionary from group-name pair to - face.MethodImplementation object specifying implementation of a method. - multi_method_implementation: A face.MultiMethodImplementation or None. - - Returns: - A sequence of length three the first element of which is a - face.GenericStub, the second element of which is dictionary from groups - to face.DynamicStubs affording invocation of the group's methods, and - the third element of which is an arbitrary memo object to be kept and - passed to destantiate at the conclusion of the test. The returned stubs - must be backed by the provided implementations. - """ - raise NotImplementedError() - - @abc.abstractmethod - def destantiate(self, memo): - """Destroys the Face layer implementation under test. - - Args: - memo: The object from the third position of the return value of a call to - instantiate. - """ - raise NotImplementedError() - - @abc.abstractmethod - def invocation_metadata(self): - """Provides the metadata to be used when invoking a test RPC. - - Returns: - An object to use as the supplied-at-invocation-time metadata in a test - RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def initial_metadata(self): - """Provides the metadata for use as a test RPC's first servicer metadata. - - Returns: - An object to use as the from-the-servicer-before-responses metadata in a - test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def terminal_metadata(self): - """Provides the metadata for use as a test RPC's second servicer metadata. - - Returns: - An object to use as the from-the-servicer-after-all-responses metadata in - a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def code(self): - """Provides the value for use as a test RPC's code. - - Returns: - An object to use as the from-the-servicer code in a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def details(self): - """Provides the value for use as a test RPC's details. - - Returns: - An object to use as the from-the-servicer details in a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def metadata_transmitted(self, original_metadata, transmitted_metadata): - """Identifies whether or not metadata was properly transmitted. - - Args: - original_metadata: A metadata value passed to the Face interface - implementation under test. - transmitted_metadata: The same metadata value after having been - transmitted via an RPC performed by the Face interface implementation - under test. - - Returns: - Whether or not the metadata was properly transmitted by the Face interface - implementation under test. - """ - raise NotImplementedError() |