diff options
author | Nathaniel Manista <nathaniel@google.com> | 2018-03-13 23:50:25 +0000 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2018-03-13 23:50:25 +0000 |
commit | 62170372a7899949a7baaebec112f11552e6985a (patch) | |
tree | 9e6e65f29e0cd6f06196eef02092d7b1b8350927 | |
parent | f7a8f8ba7c304197fa83ad43d59cd448c1fc5792 (diff) |
Remove _face_interface_test
The Beta API has an execution date and RPC Framework is but a distant
memory.
This test is flaky with Python 3.5 on Windows! Some mysteries will just
have to remain unsolved...
13 files changed, 0 insertions, 2569 deletions
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 0f6fd983aa..bbbc3ea360 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -218,12 +218,6 @@ "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", - "unit.beta._face_interface_test.DynamicInvokerBlockingInvocationInlineServiceTest", - "unit.beta._face_interface_test.DynamicInvokerFutureInvocationAsynchronousEventServiceTest", - "unit.beta._face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest", - "unit.beta._face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest", - "unit.beta._face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest", - "unit.beta._face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest", "unit.beta._implementations_test.CallCredentialsTest", "unit.beta._implementations_test.ChannelCredentialsTest", "unit.beta._not_found_test.NotFoundTest", diff --git a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py b/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py deleted file mode 100644 index c99738e085..0000000000 --- a/src/python/grpcio_tests/tests/unit/beta/_face_interface_test.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests Face interface compliance of the gRPC Python Beta API.""" - -import collections -import unittest - -import six - -from grpc.beta import implementations -from grpc.beta import interfaces -from tests.unit import resources -from tests.unit import test_common as grpc_test_common -from tests.unit.beta import test_utilities -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import test_cases -from tests.unit.framework.interfaces.face import test_interfaces - -_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' - - -class _SerializationBehaviors( - collections.namedtuple('_SerializationBehaviors', ( - 'request_serializers', - 'request_deserializers', - 'response_serializers', - 'response_deserializers', - ))): - pass - - -def _serialization_behaviors_from_test_methods(test_methods): - request_serializers = {} - request_deserializers = {} - response_serializers = {} - response_deserializers = {} - for (group, method), test_method in six.iteritems(test_methods): - request_serializers[group, method] = test_method.serialize_request - request_deserializers[group, method] = test_method.deserialize_request - response_serializers[group, method] = test_method.serialize_response - response_deserializers[group, method] = test_method.deserialize_response - return _SerializationBehaviors(request_serializers, request_deserializers, - response_serializers, response_deserializers) - - -class _Implementation(test_interfaces.Implementation): - - def instantiate(self, methods, method_implementations, - multi_method_implementation): - serialization_behaviors = _serialization_behaviors_from_test_methods( - methods) - # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. - service = next(iter(methods))[0] - # TODO(nathaniel): Add a "cardinalities_by_group" attribute to - # _digest.TestServiceDigest. - cardinalities = { - method: method_object.cardinality() - for (group, method), method_object in six.iteritems(methods) - } - - server_options = implementations.server_options( - request_deserializers=serialization_behaviors.request_deserializers, - response_serializers=serialization_behaviors.response_serializers, - thread_pool_size=test_constants.POOL_SIZE) - server = implementations.server( - method_implementations, options=server_options) - server_credentials = implementations.ssl_server_credentials([ - ( - resources.private_key(), - resources.certificate_chain(), - ), - ]) - port = server.add_secure_port('[::]:0', server_credentials) - server.start() - channel_credentials = implementations.ssl_channel_credentials( - resources.test_root_certificates()) - channel = test_utilities.not_really_secure_channel( - 'localhost', port, channel_credentials, _SERVER_HOST_OVERRIDE) - stub_options = implementations.stub_options( - request_serializers=serialization_behaviors.request_serializers, - response_deserializers=serialization_behaviors. - response_deserializers, - thread_pool_size=test_constants.POOL_SIZE) - generic_stub = implementations.generic_stub( - channel, options=stub_options) - dynamic_stub = implementations.dynamic_stub( - channel, service, cardinalities, options=stub_options) - return generic_stub, {service: dynamic_stub}, server - - def destantiate(self, memo): - memo.stop(test_constants.SHORT_TIMEOUT).wait() - - def invocation_metadata(self): - return grpc_test_common.INVOCATION_INITIAL_METADATA - - def initial_metadata(self): - return grpc_test_common.SERVICE_INITIAL_METADATA - - def terminal_metadata(self): - return grpc_test_common.SERVICE_TERMINAL_METADATA - - def code(self): - return interfaces.StatusCode.OK - - def details(self): - return grpc_test_common.DETAILS - - def metadata_transmitted(self, original_metadata, transmitted_metadata): - return original_metadata is None or grpc_test_common.metadata_transmitted( - original_metadata, transmitted_metadata) - - -def load_tests(loader, tests, pattern): - return unittest.TestSuite( - tests=tuple( - loader.loadTestsFromTestCase(test_case_class) - for test_case_class in test_cases.test_cases(_Implementation()))) - - -if __name__ == '__main__': - unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py deleted file mode 100644 index 5fb4f3c3cf..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py deleted file mode 100644 index 6eb7ba33f6..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""A test constant working around issue 3069.""" - -# test_constants is referenced from specification in this module. -from tests.unit.framework.common import test_constants # pylint: disable=unused-import - -# TODO(issue 3069): Replace uses of this constant with -# test_constants.SHORT_TIMEOUT. -REALLY_SHORT_TIMEOUT = 0.1 diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py deleted file mode 100644 index 5fb4f3c3cf..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py deleted file mode 100644 index 5d8679aa62..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ /dev/null @@ -1,287 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test code for the Face layer of RPC Framework.""" - -from __future__ import division - -import abc -import itertools -import unittest -from concurrent import futures - -import six - -# test_interfaces is referenced from specification in this module. -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class TestCase( - six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, - unittest.TestCase)): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - - NAME = 'BlockingInvocationInlineServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, - self._control, None) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.inline_method_implementations, - None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response, call = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(request, response, self) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response, call = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(requests, response, self) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response = self._invoker.blocking(group, method)( - first_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(first_request, first_response, self) - - second_response = self._invoker.blocking(group, method)( - second_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() - for response_future in response_futures - ] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - pool.shutdown(wait=True) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures_to_indices[response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], - response_future.result(), self) - pool.shutdown(wait=True) - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestStreamResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestStreamResponse(self): - raise NotImplementedError() - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py deleted file mode 100644 index b1c33da43a..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py +++ /dev/null @@ -1,432 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Code for making a service.TestService more amenable to use in tests.""" - -import collections -import threading - -import six - -# test_control, _service, and test_interfaces are referenced from specification -# in this module. -from grpc.framework.common import cardinality -from grpc.framework.common import style -from grpc.framework.foundation import stream -from grpc.framework.foundation import stream_util -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_control # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - -_IDENTITY = lambda x: x - - -class TestServiceDigest( - collections.namedtuple('TestServiceDigest', ( - 'methods', - 'inline_method_implementations', - 'event_method_implementations', - 'multi_method_implementation', - 'unary_unary_messages_sequences', - 'unary_stream_messages_sequences', - 'stream_unary_messages_sequences', - 'stream_stream_messages_sequences', - ))): - """A transformation of a service.TestService. - - Attributes: - methods: A dict from method group-name pair to test_interfaces.Method object - describing the RPC methods that may be called during the test. - inline_method_implementations: A dict from method group-name pair to - face.MethodImplementation object to be used in tests of in-line calls to - behaviors under test. - event_method_implementations: A dict from method group-name pair to - face.MethodImplementation object to be used in tests of event-driven calls - to behaviors under test. - multi_method_implementation: A face.MultiMethodImplementation to be used in - tests of generic calls to behaviors under test. - unary_unary_messages_sequences: A dict from method group-name pair to - sequence of service.UnaryUnaryTestMessages objects to be used to test the - identified method. - unary_stream_messages_sequences: A dict from method group-name pair to - sequence of service.UnaryStreamTestMessages objects to be used to test the - identified method. - stream_unary_messages_sequences: A dict from method group-name pair to - sequence of service.StreamUnaryTestMessages objects to be used to test the - identified method. - stream_stream_messages_sequences: A dict from method group-name pair to - sequence of service.StreamStreamTestMessages objects to be used to test - the identified method. - """ - - -class _BufferingConsumer(stream.Consumer): - """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" - - def __init__(self): - self.consumed = [] - self.terminated = False - - def consume(self, value): - self.consumed.append(value) - - def terminate(self): - self.terminated = True - - def consume_and_terminate(self, value): - self.consumed.append(value) - self.terminated = True - - -class _InlineUnaryUnaryMethod(face.MethodImplementation): - - def __init__(self, unary_unary_test_method, control): - self._test_method = unary_unary_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.INLINE - - def unary_unary_inline(self, request, context): - response_list = [] - self._test_method.service(request, response_list.append, context, - self._control) - return response_list.pop(0) - - -class _EventUnaryUnaryMethod(face.MethodImplementation): - - def __init__(self, unary_unary_test_method, control, pool): - self._test_method = unary_unary_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.EVENT - - def unary_unary_event(self, request, response_callback, context): - if self._pool is None: - self._test_method.service(request, response_callback, context, - self._control) - else: - self._pool.submit(self._test_method.service, request, - response_callback, context, self._control) - - -class _InlineUnaryStreamMethod(face.MethodImplementation): - - def __init__(self, unary_stream_test_method, control): - self._test_method = unary_stream_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.INLINE - - def unary_stream_inline(self, request, context): - response_consumer = _BufferingConsumer() - self._test_method.service(request, response_consumer, context, - self._control) - for response in response_consumer.consumed: - yield response - - -class _EventUnaryStreamMethod(face.MethodImplementation): - - def __init__(self, unary_stream_test_method, control, pool): - self._test_method = unary_stream_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.EVENT - - def unary_stream_event(self, request, response_consumer, context): - if self._pool is None: - self._test_method.service(request, response_consumer, context, - self._control) - else: - self._pool.submit(self._test_method.service, request, - response_consumer, context, self._control) - - -class _InlineStreamUnaryMethod(face.MethodImplementation): - - def __init__(self, stream_unary_test_method, control): - self._test_method = stream_unary_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.INLINE - - def stream_unary_inline(self, request_iterator, context): - response_list = [] - request_consumer = self._test_method.service(response_list.append, - context, self._control) - for request in request_iterator: - request_consumer.consume(request) - request_consumer.terminate() - return response_list.pop(0) - - -class _EventStreamUnaryMethod(face.MethodImplementation): - - def __init__(self, stream_unary_test_method, control, pool): - self._test_method = stream_unary_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.EVENT - - def stream_unary_event(self, response_callback, context): - request_consumer = self._test_method.service(response_callback, context, - self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _InlineStreamStreamMethod(face.MethodImplementation): - - def __init__(self, stream_stream_test_method, control): - self._test_method = stream_stream_test_method - self._control = control - - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.INLINE - - def stream_stream_inline(self, request_iterator, context): - response_consumer = _BufferingConsumer() - request_consumer = self._test_method.service(response_consumer, context, - self._control) - - for request in request_iterator: - request_consumer.consume(request) - while response_consumer.consumed: - yield response_consumer.consumed.pop(0) - response_consumer.terminate() - - -class _EventStreamStreamMethod(face.MethodImplementation): - - def __init__(self, stream_stream_test_method, control, pool): - self._test_method = stream_stream_test_method - self._control = control - self._pool = pool - - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.EVENT - - def stream_stream_event(self, response_consumer, context): - request_consumer = self._test_method.service(response_consumer, context, - self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _UnaryConsumer(stream.Consumer): - """A Consumer that only allows consumption of exactly one value.""" - - def __init__(self, action): - self._lock = threading.Lock() - self._action = action - self._consumed = False - self._terminated = False - - def consume(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - - self._action(value) - - def terminate(self): - with self._lock: - if not self._consumed: - raise ValueError('Unary consumer hasn\'t yet consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._terminated = True - - def consume_and_terminate(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - self._terminated = True - - self._action(value) - - -class _UnaryUnaryAdaptation(object): - - def __init__(self, unary_unary_test_method): - self._method = unary_unary_test_method - - def service(self, response_consumer, context, control): - - def action(request): - self._method.service(request, - response_consumer.consume_and_terminate, - context, control) - - return _UnaryConsumer(action) - - -class _UnaryStreamAdaptation(object): - - def __init__(self, unary_stream_test_method): - self._method = unary_stream_test_method - - def service(self, response_consumer, context, control): - - def action(request): - self._method.service(request, response_consumer, context, control) - - return _UnaryConsumer(action) - - -class _StreamUnaryAdaptation(object): - - def __init__(self, stream_unary_test_method): - self._method = stream_unary_test_method - - def service(self, response_consumer, context, control): - return self._method.service(response_consumer.consume_and_terminate, - context, control) - - -class _MultiMethodImplementation(face.MultiMethodImplementation): - - def __init__(self, methods, control, pool): - self._methods = methods - self._control = control - self._pool = pool - - def service(self, group, name, response_consumer, context): - method = self._methods.get(group, name, None) - if method is None: - raise face.NoSuchMethodError(group, name) - elif self._pool is None: - return method(response_consumer, context, self._control) - else: - request_consumer = method(response_consumer, context, self._control) - return stream_util.ThreadSwitchingConsumer(request_consumer, - self._pool) - - -class _Assembly( - collections.namedtuple( - '_Assembly', - ['methods', 'inlines', 'events', 'adaptations', 'messages'])): - """An intermediate structure created when creating a TestServiceDigest.""" - - -def _assemble(scenarios, identifiers, inline_method_constructor, - event_method_constructor, adapter, control, pool): - """Creates an _Assembly from the given scenarios.""" - methods = {} - inlines = {} - events = {} - adaptations = {} - messages = {} - for identifier, scenario in six.iteritems(scenarios): - if identifier in identifiers: - raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) - - test_method = scenario[0] - inline_method = inline_method_constructor(test_method, control) - event_method = event_method_constructor(test_method, control, pool) - adaptation = adapter(test_method) - - methods[identifier] = test_method - inlines[identifier] = inline_method - events[identifier] = event_method - adaptations[identifier] = adaptation - messages[identifier] = scenario[1] - - return _Assembly(methods, inlines, events, adaptations, messages) - - -def digest(service, control, pool): - """Creates a TestServiceDigest from a TestService. - - Args: - service: A _service.TestService. - control: A test_control.Control. - pool: If RPC methods should be serviced in a separate thread, a thread pool. - None if RPC methods should be serviced in the thread belonging to the - run-time that calls for their service. - - Returns: - A TestServiceDigest synthesized from the given service.TestService. - """ - identifiers = set() - - unary_unary = _assemble(service.unary_unary_scenarios(), identifiers, - _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod, - _UnaryUnaryAdaptation, control, pool) - identifiers.update(unary_unary.inlines) - - unary_stream = _assemble(service.unary_stream_scenarios(), identifiers, - _InlineUnaryStreamMethod, _EventUnaryStreamMethod, - _UnaryStreamAdaptation, control, pool) - identifiers.update(unary_stream.inlines) - - stream_unary = _assemble(service.stream_unary_scenarios(), identifiers, - _InlineStreamUnaryMethod, _EventStreamUnaryMethod, - _StreamUnaryAdaptation, control, pool) - identifiers.update(stream_unary.inlines) - - stream_stream = _assemble(service.stream_stream_scenarios(), identifiers, - _InlineStreamStreamMethod, - _EventStreamStreamMethod, _IDENTITY, control, - pool) - identifiers.update(stream_stream.inlines) - - methods = dict(unary_unary.methods) - methods.update(unary_stream.methods) - methods.update(stream_unary.methods) - methods.update(stream_stream.methods) - adaptations = dict(unary_unary.adaptations) - adaptations.update(unary_stream.adaptations) - adaptations.update(stream_unary.adaptations) - adaptations.update(stream_stream.adaptations) - inlines = dict(unary_unary.inlines) - inlines.update(unary_stream.inlines) - inlines.update(stream_unary.inlines) - inlines.update(stream_stream.inlines) - events = dict(unary_unary.events) - events.update(unary_stream.events) - events.update(stream_unary.events) - events.update(stream_stream.events) - - return TestServiceDigest(methods, inlines, events, - _MultiMethodImplementation(adaptations, control, - pool), - unary_unary.messages, unary_stream.messages, - stream_unary.messages, stream_stream.messages) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py deleted file mode 100644 index 3d9b2816aa..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ /dev/null @@ -1,508 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Test code for the Face layer of RPC Framework.""" - -from __future__ import division - -import abc -import contextlib -import itertools -import threading -import unittest -from concurrent import futures - -import six - -# test_interfaces is referenced from specification in this module. -from grpc.framework.foundation import future -from grpc.framework.foundation import logging_pool -from grpc.framework.interfaces.face import face -from tests.unit.framework.common import test_constants -from tests.unit.framework.common import test_control -from tests.unit.framework.common import test_coverage -from tests.unit.framework.interfaces.face import _3069_test_constant -from tests.unit.framework.interfaces.face import _digest -from tests.unit.framework.interfaces.face import _stock_service -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - - -class _PauseableIterator(object): - - def __init__(self, upstream): - self._upstream = upstream - self._condition = threading.Condition() - self._paused = False - - @contextlib.contextmanager - def pause(self): - with self._condition: - self._paused = True - yield - with self._condition: - self._paused = False - self._condition.notify_all() - - def __iter__(self): - return self - - def __next__(self): - return self.next() - - def next(self): - with self._condition: - while self._paused: - self._condition.wait() - return next(self._upstream) - - -class _Callback(object): - - def __init__(self): - self._condition = threading.Condition() - self._called = False - self._passed_future = None - self._passed_other_stuff = None - - def __call__(self, *args, **kwargs): - with self._condition: - self._called = True - if args: - self._passed_future = args[0] - if 1 < len(args) or kwargs: - self._passed_other_stuff = tuple(args[1:]), dict(kwargs) - self._condition.notify_all() - - def future(self): - with self._condition: - while True: - if self._passed_other_stuff is not None: - raise ValueError( - 'Test callback passed unexpected values: %s', - self._passed_other_stuff) - elif self._called: - return self._passed_future - else: - self._condition.wait() - - -class TestCase( - six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, - unittest.TestCase)): - """A test of the Face layer of RPC Framework. - - Concrete subclasses must have an "implementation" attribute of type - test_interfaces.Implementation and an "invoker_constructor" attribute of type - _invocation.InvokerConstructor. - """ - - NAME = 'FutureInvocationAsynchronousEventServiceTest' - - def setUp(self): - """See unittest.TestCase.setUp for full specification. - - Overriding implementations must call this implementation. - """ - self._control = test_control.PauseFailControl() - self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) - self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, - self._control, self._digest_pool) - - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.event_method_implementations, - None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) - - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. - - Overriding implementations must call this implementation. - """ - self._invoker = None - self.implementation.destantiate(self._memo) - self._digest_pool.shutdown(wait=True) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - response = response_future.result() - - test_messages.verify(request, response, self) - self.assertIs(callback.future(), response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - callback = _Callback() - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_future = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - future_passed_to_callback = callback.future() - response = future_passed_to_callback.result() - - test_messages.verify(requests, response, self) - self.assertIs(future_passed_to_callback, response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_iterator = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - - test_messages.verify(first_request, first_response, self) - - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - second_response = second_response_future.result() - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() - for response_future in response_futures - ] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - inner_response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - outer_response_future = pool.submit( - inner_response_future.result) - requests.append(request) - response_futures_to_indices[outer_response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], - response_future.result(), self) - pool.shutdown(wait=True) - - def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance(response_future.exception(), - face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance(response_future.exception(), - face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance(response_future.exception(), - face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in (six.iteritems( - self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.future( - group, method)(iter(requests), - _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py deleted file mode 100644 index efc93d56b0..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Coverage across the Face layer's generic-to-dynamic range for invocation.""" - -import abc - -import six - -from grpc.framework.common import cardinality - -_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', -} - -_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', -} - -_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = { - cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream', -} - -_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = { - cardinality.Cardinality.UNARY_UNARY: 'unary_unary', - cardinality.Cardinality.UNARY_STREAM: 'unary_stream', - cardinality.Cardinality.STREAM_UNARY: 'stream_unary', - cardinality.Cardinality.STREAM_STREAM: 'stream_stream', -} - - -class Invoker(six.with_metaclass(abc.ABCMeta)): - """A type used to invoke test RPCs.""" - - @abc.abstractmethod - def blocking(self, group, name): - """Invokes an RPC with blocking control flow.""" - raise NotImplementedError() - - @abc.abstractmethod - def future(self, group, name): - """Invokes an RPC with future control flow.""" - raise NotImplementedError() - - @abc.abstractmethod - def event(self, group, name): - """Invokes an RPC with event control flow.""" - raise NotImplementedError() - - -class InvokerConstructor(six.with_metaclass(abc.ABCMeta)): - """A type used to create Invokers.""" - - @abc.abstractmethod - def name(self): - """Specifies the name of the Invoker constructed by this object.""" - raise NotImplementedError() - - @abc.abstractmethod - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - """Constructs an Invoker for the given stubs and methods.""" - raise NotImplementedError() - - -class _GenericInvoker(Invoker): - - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods - - def _behavior(self, group, name, cardinality_to_generic_method): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr(self._stub, - cardinality_to_generic_method[method_cardinality]) - return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) - - def blocking(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) - - def future(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) - - def event(self, group, name): - return self._behavior(group, name, - _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) - - -class _GenericInvokerConstructor(InvokerConstructor): - - def name(self): - return 'GenericInvoker' - - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _GenericInvoker(generic_stub, methods) - - -class _MultiCallableInvoker(Invoker): - - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods - - def _multi_callable(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - return behavior(group, name) - - def blocking(self, group, name): - return self._multi_callable(group, name) - - def future(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - if method_cardinality in (cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return behavior(group, name).future - else: - return behavior(group, name) - - def event(self, group, name): - return self._multi_callable(group, name).event - - -class _MultiCallableInvokerConstructor(InvokerConstructor): - - def name(self): - return 'MultiCallableInvoker' - - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _MultiCallableInvoker(generic_stub, methods) - - -class _DynamicInvoker(Invoker): - - def __init__(self, dynamic_stubs, methods): - self._stubs = dynamic_stubs - self._methods = methods - - def blocking(self, group, name): - return getattr(self._stubs[group], name) - - def future(self, group, name): - if self._methods[group, name].cardinality() in ( - cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return getattr(self._stubs[group], name).future - else: - return getattr(self._stubs[group], name) - - def event(self, group, name): - return getattr(self._stubs[group], name).event - - -class _DynamicInvokerConstructor(InvokerConstructor): - - def name(self): - return 'DynamicInvoker' - - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - return _DynamicInvoker(dynamic_stubs, methods) - - -def invoker_constructors(): - """Creates a sequence of InvokerConstructors to use in tests of RPCs. - - Returns: - A sequence of InvokerConstructors. - """ - return ( - _GenericInvokerConstructor(), - _MultiCallableInvokerConstructor(), - _DynamicInvokerConstructor(), - ) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py deleted file mode 100644 index f1c96b6dc5..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Private interfaces implemented by data sets used in Face-layer tests.""" - -import abc - -import six - -# face is referenced from specification in this module. -from grpc.framework.interfaces.face import face # pylint: disable=unused-import -from tests.unit.framework.interfaces.face import test_interfaces - - -class UnaryUnaryTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-unary method.""" - - @abc.abstractmethod - def service(self, request, response_callback, context, control): - """Services an RPC that accepts one message and produces one message. - - Args: - request: The single request message for the RPC. - response_callback: A callback to be called to accept the response message - of the RPC. - context: An face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-unary-response message pairings.""" - - @abc.abstractmethod - def request(self): - """Affords a request message. - - Implementations of this method should return a different message with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A request message. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, request, response, test_case): - """Verifies that the computed response matches the given request. - - Args: - request: A request message. - response: A response message. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the request and response do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class UnaryStreamTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-stream method.""" - - @abc.abstractmethod - def service(self, request, response_consumer, context, control): - """Services an RPC that takes one message and produces a stream of messages. - - Args: - request: The single request message for the RPC. - response_consumer: A stream.Consumer to be called to accept the response - messages of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-stream-response message pairings.""" - - @abc.abstractmethod - def request(self): - """Affords a request message. - - Implementations of this method should return a different message with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A request message. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, request, responses, test_case): - """Verifies that the computed responses match the given request. - - Args: - request: A request message. - responses: A sequence of response messages. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the request and responses do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class StreamUnaryTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-unary method.""" - - @abc.abstractmethod - def service(self, response_callback, context, control): - """Services an RPC that takes a stream of messages and produces one message. - - Args: - response_callback: A callback to be called to accept the response message - of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Returns: - A stream.Consumer with which to accept the request messages of the RPC. - The consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing messages to this object. Implementations must not assume that - this object will be called to completion of the request stream or even - called at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-unary-response message pairings.""" - - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. - - Implementations of this method should return a different sequences with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A sequence of request messages. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, requests, response, test_case): - """Verifies that the computed response matches the given requests. - - Args: - requests: A sequence of request messages. - response: A response message. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the requests and response do not match, indicating that - there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class StreamStreamTestMethodImplementation( - six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-stream method.""" - - @abc.abstractmethod - def service(self, response_consumer, context, control): - """Services an RPC that accepts and produces streams of messages. - - Args: - response_consumer: A stream.Consumer to be called to accept the response - messages of the RPC. - context: A face.ServicerContext object. - control: A test_control.Control to control execution of this method. - - Returns: - A stream.Consumer with which to accept the request messages of the RPC. - The consumer returned from this method may or may not be invoked to - completion: in the case of RPC abortion, RPC Framework will simply stop - passing messages to this object. Implementations must not assume that - this object will be called to completion of the request stream or even - called at all. - - Raises: - abandonment.Abandoned: May or may not be raised when the RPC has been - aborted. - """ - raise NotImplementedError() - - -class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-stream-response message pairings.""" - - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. - - Implementations of this method should return a different sequences with each - call so that multiple test executions of the test method may be made with - different inputs. - - Returns: - A sequence of request messages. - """ - raise NotImplementedError() - - @abc.abstractmethod - def verify(self, requests, responses, test_case): - """Verifies that the computed response matches the given requests. - - Args: - requests: A sequence of request messages. - responses: A sequence of response messages. - test_case: A unittest.TestCase object affording useful assertion methods. - - Raises: - AssertionError: If the requests and responses do not match, indicating - that there was some problem executing the RPC under test. - """ - raise NotImplementedError() - - -class TestService(six.with_metaclass(abc.ABCMeta)): - """A specification of implemented methods to use in tests.""" - - @abc.abstractmethod - def unary_unary_scenarios(self): - """Affords unary-request-unary-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a UnaryUnaryTestMethodImplementation object - and the second element is a sequence of UnaryUnaryTestMethodMessages - objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def unary_stream_scenarios(self): - """Affords unary-request-stream-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a UnaryStreamTestMethodImplementation - object and the second element is a sequence of - UnaryStreamTestMethodMessages objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def stream_unary_scenarios(self): - """Affords stream-request-unary-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a StreamUnaryTestMethodImplementation - object and the second element is a sequence of - StreamUnaryTestMethodMessages objects. - """ - raise NotImplementedError() - - @abc.abstractmethod - def stream_stream_scenarios(self): - """Affords stream-request-stream-response test methods and their messages. - - Returns: - A dict from method group-name pair to implementation/messages pair. The - first element of the pair is a StreamStreamTestMethodImplementation - object and the second element is a sequence of - StreamStreamTestMethodMessages objects. - """ - raise NotImplementedError() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py deleted file mode 100644 index a84e02a79a..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py +++ /dev/null @@ -1,390 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Examples of Python implementations of the stock.proto Stock service.""" - -from grpc.framework.common import cardinality -from grpc.framework.foundation import abandonment -from grpc.framework.foundation import stream -from tests.unit.framework.common import test_constants -from tests.unit.framework.interfaces.face import _service -from tests.unit._junkdrawer import stock_pb2 - -_STOCK_GROUP_NAME = 'Stock' -_SYMBOL_FORMAT = 'test symbol:%03d' - -# A test-appropriate security-pricing function. :-P -_price = lambda symbol_name: float(hash(symbol_name) % 4096) - - -def _get_last_trade_price(stock_request, stock_reply_callback, control, active): - """A unary-request, unary-response test method.""" - control.control() - if active(): - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - raise abandonment.Abandoned() - - -def _get_last_trade_price_multiple(stock_reply_consumer, control, active): - """A stream-request, stream-response test method.""" - - def stock_reply_for_stock_request(stock_request): - control.control() - if active(): - return stock_pb2.StockReply( - symbol=stock_request.symbol, price=_price(stock_request.symbol)) - else: - raise abandonment.Abandoned() - - class StockRequestConsumer(stream.Consumer): - - def consume(self, stock_request): - stock_reply_consumer.consume( - stock_reply_for_stock_request(stock_request)) - - def terminate(self): - control.control() - stock_reply_consumer.terminate() - - def consume_and_terminate(self, stock_request): - stock_reply_consumer.consume_and_terminate( - stock_reply_for_stock_request(stock_request)) - - return StockRequestConsumer() - - -def _watch_future_trades(stock_request, stock_reply_consumer, control, active): - """A unary-request, stream-response test method.""" - base_price = _price(stock_request.symbol) - for index in range(stock_request.num_trades_to_watch): - control.control() - if active(): - stock_reply_consumer.consume( - stock_pb2.StockReply( - symbol=stock_request.symbol, price=base_price + index)) - else: - raise abandonment.Abandoned() - stock_reply_consumer.terminate() - - -def _get_highest_trade_price(stock_reply_callback, control, active): - """A stream-request, unary-response test method.""" - - class StockRequestConsumer(stream.Consumer): - """Keeps an ongoing record of the most valuable symbol yet consumed.""" - - def __init__(self): - self._symbol = None - self._price = None - - def consume(self, stock_request): - control.control() - if active(): - if self._price is None: - self._symbol = stock_request.symbol - self._price = _price(stock_request.symbol) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - self._symbol = stock_request.symbol - self._price = candidate_price - - def terminate(self): - control.control() - if active(): - if self._symbol is None: - raise ValueError() - else: - stock_reply_callback( - stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) - self._symbol = None - self._price = None - - def consume_and_terminate(self, stock_request): - control.control() - if active(): - if self._price is None: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=candidate_price)) - else: - stock_reply_callback( - stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) - - self._symbol = None - self._price = None - - return StockRequestConsumer() - - -class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation): - """GetLastTradePrice for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetLastTradePrice' - - def cardinality(self): - return cardinality.Cardinality.UNARY_UNARY - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, request, response_callback, context, control): - _get_last_trade_price(request, response_callback, control, - context.is_active) - - -class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages): - - def __init__(self): - self._index = 0 - - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest(symbol=symbol) - - def verify(self, request, response, test_case): - test_case.assertEqual(request.symbol, response.symbol) - test_case.assertEqual(_price(request.symbol), response.price) - - -class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation): - """GetLastTradePriceMultiple for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetLastTradePriceMultiple' - - def cardinality(self): - return cardinality.Cardinality.STREAM_STREAM - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, response_consumer, context, control): - return _get_last_trade_price_multiple(response_consumer, control, - context.is_active) - - -class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages): - """Pairs of message streams for use with GetLastTradePriceMultiple.""" - - def __init__(self): - self._index = 0 - - def requests(self): - base_index = self._index - self._index += 1 - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) - for index in range(test_constants.STREAM_LENGTH) - ] - - def verify(self, requests, responses, test_case): - test_case.assertEqual(len(requests), len(responses)) - for stock_request, stock_reply in zip(requests, responses): - test_case.assertEqual(stock_request.symbol, stock_reply.symbol) - test_case.assertEqual( - _price(stock_request.symbol), stock_reply.price) - - -class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation): - """WatchFutureTrades for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'WatchFutureTrades' - - def cardinality(self): - return cardinality.Cardinality.UNARY_STREAM - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, request, response_consumer, context, control): - _watch_future_trades(request, response_consumer, control, - context.is_active) - - -class WatchFutureTradesMessages(_service.UnaryStreamTestMessages): - """Pairs of a single request message and a sequence of response messages.""" - - def __init__(self): - self._index = 0 - - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest( - symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) - - def verify(self, request, responses, test_case): - test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) - base_price = _price(request.symbol) - for index, response in enumerate(responses): - test_case.assertEqual(base_price + index, response.price) - - -class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation): - """GetHighestTradePrice for use in tests.""" - - def group(self): - return _STOCK_GROUP_NAME - - def name(self): - return 'GetHighestTradePrice' - - def cardinality(self): - return cardinality.Cardinality.STREAM_UNARY - - def request_class(self): - return stock_pb2.StockRequest - - def response_class(self): - return stock_pb2.StockReply - - def serialize_request(self, request): - return request.SerializeToString() - - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) - - def serialize_response(self, response): - return response.SerializeToString() - - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) - - def service(self, response_callback, context, control): - return _get_highest_trade_price(response_callback, control, - context.is_active) - - -class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages): - - def requests(self): - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) - for index in range(test_constants.STREAM_LENGTH) - ] - - def verify(self, requests, response, test_case): - price = None - symbol = None - for stock_request in requests: - current_symbol = stock_request.symbol - current_price = _price(current_symbol) - if price is None or price < current_price: - price = current_price - symbol = current_symbol - test_case.assertEqual(price, response.price) - test_case.assertEqual(symbol, response.symbol) - - -class StockTestService(_service.TestService): - """A corpus of test data with one method of each RPC cardinality.""" - - def unary_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePrice'): - (GetLastTradePrice(), [GetLastTradePriceMessages()]), - } - - def unary_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'WatchFutureTrades'): - (WatchFutureTrades(), [WatchFutureTradesMessages()]), - } - - def stream_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): - (GetHighestTradePrice(), [GetHighestTradePriceMessages()]) - } - - def stream_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): - (GetLastTradePriceMultiple(), - [GetLastTradePriceMultipleMessages()]), - } - - -STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py deleted file mode 100644 index cff4b7cdea..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tools for creating tests of implementations of the Face layer.""" - -# unittest is referenced from specification in this module. -import unittest # pylint: disable=unused-import - -# test_interfaces is referenced from specification in this module. -from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service -from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service -from tests.unit.framework.interfaces.face import _invocation -from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import - -_TEST_CASE_SUPERCLASSES = ( - _blocking_invocation_inline_service.TestCase, - _future_invocation_asynchronous_event_service.TestCase, -) - - -def test_cases(implementation): - """Creates unittest.TestCase classes for a given Face layer implementation. - - Args: - implementation: A test_interfaces.Implementation specifying creation and - destruction of a given Face layer implementation. - - Returns: - A sequence of subclasses of unittest.TestCase defining tests of the - specified Face layer implementation. - """ - test_case_classes = [] - for invoker_constructor in _invocation.invoker_constructors(): - for super_class in _TEST_CASE_SUPERCLASSES: - test_case_classes.append( - type( - invoker_constructor.name() + super_class.NAME, - (super_class,), { - 'implementation': implementation, - 'invoker_constructor': invoker_constructor, - '__module__': implementation.__module__, - })) - return test_case_classes diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py deleted file mode 100644 index d0de8e1c54..0000000000 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Interfaces used in tests of implementations of the Face layer.""" - -import abc - -import six - -from grpc.framework.common import cardinality # pylint: disable=unused-import -from grpc.framework.interfaces.face import face # pylint: disable=unused-import - - -class Method(six.with_metaclass(abc.ABCMeta)): - """Specifies a method to be used in tests.""" - - @abc.abstractmethod - def group(self): - """Identify the group of the method. - - Returns: - The group of the method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def name(self): - """Identify the name of the method. - - Returns: - The name of the method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def cardinality(self): - """Identify the cardinality of the method. - - Returns: - A cardinality.Cardinality value describing the streaming semantics of the - method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def request_class(self): - """Identify the class used for the method's request objects. - - Returns: - The class object of the class to which the method's request objects - belong. - """ - raise NotImplementedError() - - @abc.abstractmethod - def response_class(self): - """Identify the class used for the method's response objects. - - Returns: - The class object of the class to which the method's response objects - belong. - """ - raise NotImplementedError() - - @abc.abstractmethod - def serialize_request(self, request): - """Serialize the given request object. - - Args: - request: A request object appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def deserialize_request(self, serialized_request): - """Synthesize a request object from a given bytestring. - - Args: - serialized_request: A bytestring deserializable into a request object - appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def serialize_response(self, response): - """Serialize the given response object. - - Args: - response: A response object appropriate for this method. - """ - raise NotImplementedError() - - @abc.abstractmethod - def deserialize_response(self, serialized_response): - """Synthesize a response object from a given bytestring. - - Args: - serialized_response: A bytestring deserializable into a response object - appropriate for this method. - """ - raise NotImplementedError() - - -class Implementation(six.with_metaclass(abc.ABCMeta)): - """Specifies an implementation of the Face layer.""" - - @abc.abstractmethod - def instantiate(self, methods, method_implementations, - multi_method_implementation): - """Instantiates the Face layer implementation to be used in a test. - - Args: - methods: A sequence of Method objects describing the methods available to - be called during the test. - method_implementations: A dictionary from group-name pair to - face.MethodImplementation object specifying implementation of a method. - multi_method_implementation: A face.MultiMethodImplementation or None. - - Returns: - A sequence of length three the first element of which is a - face.GenericStub, the second element of which is dictionary from groups - to face.DynamicStubs affording invocation of the group's methods, and - the third element of which is an arbitrary memo object to be kept and - passed to destantiate at the conclusion of the test. The returned stubs - must be backed by the provided implementations. - """ - raise NotImplementedError() - - @abc.abstractmethod - def destantiate(self, memo): - """Destroys the Face layer implementation under test. - - Args: - memo: The object from the third position of the return value of a call to - instantiate. - """ - raise NotImplementedError() - - @abc.abstractmethod - def invocation_metadata(self): - """Provides the metadata to be used when invoking a test RPC. - - Returns: - An object to use as the supplied-at-invocation-time metadata in a test - RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def initial_metadata(self): - """Provides the metadata for use as a test RPC's first servicer metadata. - - Returns: - An object to use as the from-the-servicer-before-responses metadata in a - test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def terminal_metadata(self): - """Provides the metadata for use as a test RPC's second servicer metadata. - - Returns: - An object to use as the from-the-servicer-after-all-responses metadata in - a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def code(self): - """Provides the value for use as a test RPC's code. - - Returns: - An object to use as the from-the-servicer code in a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def details(self): - """Provides the value for use as a test RPC's details. - - Returns: - An object to use as the from-the-servicer details in a test RPC. - """ - raise NotImplementedError() - - @abc.abstractmethod - def metadata_transmitted(self, original_metadata, transmitted_metadata): - """Identifies whether or not metadata was properly transmitted. - - Args: - original_metadata: A metadata value passed to the Face interface - implementation under test. - transmitted_metadata: The same metadata value after having been - transmitted via an RPC performed by the Face interface implementation - under test. - - Returns: - Whether or not the metadata was properly transmitted by the Face interface - implementation under test. - """ - raise NotImplementedError() |