diff options
Diffstat (limited to 'src/python/grpcio_test/grpc_test')
11 files changed, 2811 insertions, 0 deletions
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py new file mode 100644 index 0000000000..857ad5cf3e --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -0,0 +1,250 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test code for the Face layer of RPC Framework.""" + +import abc +import unittest + +# test_interfaces is referenced from specification in this module. +from grpc.framework.interfaces.face import face +from grpc_test.framework.common import test_constants +from grpc_test.framework.common import test_control +from grpc_test.framework.common import test_coverage +from grpc_test.framework.interfaces.face import _digest +from grpc_test.framework.interfaces.face import _stock_service +from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + + +class TestCase(test_coverage.Coverage, unittest.TestCase): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must have an "implementation" attribute of type + test_interfaces.Implementation and an "invoker_constructor" attribute of type + _invocation.InvokerConstructor. + """ + __metaclass__ = abc.ABCMeta + + NAME = 'BlockingInvocationInlineServiceTest' + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self._control = test_control.PauseFailControl() + self._digest = _digest.digest( + _stock_service.STOCK_TEST_SERVICE, self._control, None) + + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.inline_method_implementations, None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.implementation.destantiate(self._memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response = self._invoker.blocking(group, method)( + first_request, test_constants.LONG_TIMEOUT) + + test_messages.verify(first_request, first_response, self) + + second_response = self._invoker.blocking(group, method)( + second_request, test_constants.LONG_TIMEOUT) + + test_messages.verify(second_request, second_response, self) + + @unittest.skip('Parallel invocations impossible with blocking control flow!') + def testParallelInvocations(self): + raise NotImplementedError() + + @unittest.skip('Parallel invocations impossible with blocking control flow!') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + self._invoker.blocking(group, method)( + request, test_constants.SHORT_TIMEOUT) + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + self._invoker.blocking(group, method)( + iter(requests), test_constants.SHORT_TIMEOUT) + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(), self.assertRaises(face.RemoteError): + self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(), self.assertRaises(face.RemoteError): + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(), self.assertRaises(face.RemoteError): + self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(), self.assertRaises(face.RemoteError): + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py new file mode 100644 index 0000000000..da56ed7b27 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_digest.py @@ -0,0 +1,444 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for making a service.TestService more amenable to use in tests.""" + +import collections +import threading + +# test_control, _service, and test_interfaces are referenced from specification +# in this module. +from grpc.framework.common import cardinality +from grpc.framework.common import style +from grpc.framework.foundation import stream +from grpc.framework.foundation import stream_util +from grpc.framework.interfaces.face import face +from grpc_test.framework.common import test_control # pylint: disable=unused-import +from grpc_test.framework.interfaces.face import _service # pylint: disable=unused-import +from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + +_IDENTITY = lambda x: x + + +class TestServiceDigest( + collections.namedtuple( + 'TestServiceDigest', + ('methods', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences',))): + """A transformation of a service.TestService. + + Attributes: + methods: A dict from method group-name pair to test_interfaces.Method object + describing the RPC methods that may be called during the test. + inline_method_implementations: A dict from method group-name pair to + face.MethodImplementation object to be used in tests of in-line calls to + behaviors under test. + event_method_implementations: A dict from method group-name pair to + face.MethodImplementation object to be used in tests of event-driven calls + to behaviors under test. + multi_method_implementation: A face.MultiMethodImplementation to be used in + tests of generic calls to behaviors under test. + unary_unary_messages_sequences: A dict from method group-name pair to + sequence of service.UnaryUnaryTestMessages objects to be used to test the + identified method. + unary_stream_messages_sequences: A dict from method group-name pair to + sequence of service.UnaryStreamTestMessages objects to be used to test the + identified method. + stream_unary_messages_sequences: A dict from method group-name pair to + sequence of service.StreamUnaryTestMessages objects to be used to test the + identified method. + stream_stream_messages_sequences: A dict from method group-name pair to + sequence of service.StreamStreamTestMessages objects to be used to test + the identified method. + """ + + +class _BufferingConsumer(stream.Consumer): + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + + def __init__(self): + self.consumed = [] + self.terminated = False + + def consume(self, value): + self.consumed.append(value) + + def terminate(self): + self.terminated = True + + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True + + +class _InlineUnaryUnaryMethod(face.MethodImplementation): + + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.INLINE + + def unary_unary_inline(self, request, context): + response_list = [] + self._test_method.service( + request, response_list.append, context, self._control) + return response_list.pop(0) + + +class _EventUnaryUnaryMethod(face.MethodImplementation): + + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.EVENT + + def unary_unary_event(self, request, response_callback, context): + if self._pool is None: + self._test_method.service( + request, response_callback, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_callback, context, + self._control) + + +class _InlineUnaryStreamMethod(face.MethodImplementation): + + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.INLINE + + def unary_stream_inline(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service( + request, response_consumer, context, self._control) + for response in response_consumer.consumed: + yield response + + +class _EventUnaryStreamMethod(face.MethodImplementation): + + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.EVENT + + def unary_stream_event(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service( + request, response_consumer, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_consumer, context, + self._control) + + +class _InlineStreamUnaryMethod(face.MethodImplementation): + + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.INLINE + + def stream_unary_inline(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service( + response_list.append, context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) + + +class _EventStreamUnaryMethod(face.MethodImplementation): + + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.EVENT + + def stream_unary_event(self, response_callback, context): + request_consumer = self._test_method.service( + response_callback, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _InlineStreamStreamMethod(face.MethodImplementation): + + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.INLINE + + def stream_stream_inline(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service( + response_consumer, context, self._control) + + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() + + +class _EventStreamStreamMethod(face.MethodImplementation): + + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.EVENT + + def stream_stream_event(self, response_consumer, context): + request_consumer = self._test_method.service( + response_consumer, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _UnaryConsumer(stream.Consumer): + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) + + +class _UnaryUnaryAdaptation(object): + + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service( + request, response_consumer.consume_and_terminate, context, control) + return _UnaryConsumer(action) + + +class _UnaryStreamAdaptation(object): + + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service(request, response_consumer, context, control) + return _UnaryConsumer(action) + + +class _StreamUnaryAdaptation(object): + + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method + + def service(self, response_consumer, context, control): + return self._method.service( + response_consumer.consume_and_terminate, context, control) + + +class _MultiMethodImplementation(face.MultiMethodImplementation): + + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool + + def service(self, group, name, response_consumer, context): + method = self._methods.get(group, name, None) + if method is None: + raise face.NoSuchMethodError(group, name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _Assembly( + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble( + scenarios, identifiers, inline_method_constructor, event_method_constructor, + adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = {} + inlines = {} + events = {} + adaptations = {} + messages = {} + for identifier, scenario in scenarios.iteritems(): + if identifier in identifiers: + raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods[identifier] = test_method + inlines[identifier] = inline_method + events[identifier] = event_method + adaptations[identifier] = adaptation + messages[identifier] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) + + +def digest(service, control, pool): + """Creates a TestServiceDigest from a TestService. + + Args: + service: A _service.TestService. + control: A test_control.Control. + pool: If RPC methods should be serviced in a separate thread, a thread pool. + None if RPC methods should be serviced in the thread belonging to the + run-time that calls for their service. + + Returns: + A TestServiceDigest synthesized from the given service.TestService. + """ + identifiers = set() + + unary_unary = _assemble( + service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod, + _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) + identifiers.update(unary_unary.inlines) + + unary_stream = _assemble( + service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod, + _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) + identifiers.update(unary_stream.inlines) + + stream_unary = _assemble( + service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod, + _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) + identifiers.update(stream_unary.inlines) + + stream_stream = _assemble( + service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, pool) + identifiers.update(stream_stream.inlines) + + methods = dict(unary_unary.methods) + methods.update(unary_stream.methods) + methods.update(stream_unary.methods) + methods.update(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + inlines = dict(unary_unary.inlines) + inlines.update(unary_stream.inlines) + inlines.update(stream_unary.inlines) + inlines.update(stream_stream.inlines) + events = dict(unary_unary.events) + events.update(unary_stream.events) + events.update(stream_unary.events) + events.update(stream_stream.events) + + return TestServiceDigest( + methods, + inlines, + events, + _MultiMethodImplementation(adaptations, control, pool), + unary_unary.messages, + unary_stream.messages, + stream_unary.messages, + stream_stream.messages) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py new file mode 100644 index 0000000000..ea5cdeaea3 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_event_invocation_synchronous_event_service.py @@ -0,0 +1,377 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test code for the Face layer of RPC Framework.""" + +import abc +import unittest + +# test_interfaces is referenced from specification in this module. +from grpc.framework.interfaces.face import face +from grpc_test.framework.common import test_constants +from grpc_test.framework.common import test_control +from grpc_test.framework.common import test_coverage +from grpc_test.framework.interfaces.face import _digest +from grpc_test.framework.interfaces.face import _receiver +from grpc_test.framework.interfaces.face import _stock_service +from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + + +class TestCase(test_coverage.Coverage, unittest.TestCase): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must have an "implementation" attribute of type + test_interfaces.Implementation and an "invoker_constructor" attribute of type + _invocation.InvokerConstructor. + """ + __metaclass__ = abc.ABCMeta + + NAME = 'EventInvocationSynchronousEventServiceTest' + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self._control = test_control.PauseFailControl() + self._digest = _digest.digest( + _stock_service.STOCK_TEST_SERVICE, self._control, None) + + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.event_method_implementations, None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.implementation.destantiate(self._memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + response = receiver.unary_response() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + responses = receiver.stream_responses() + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + response = receiver.unary_response() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + responses = receiver.stream_responses() + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + # pylint: disable=cell-var-from-loop + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + second_receiver = _receiver.Receiver() + + def make_second_invocation(): + self._invoker.event(group, method)( + second_request, second_receiver, second_receiver.abort, + test_constants.LONG_TIMEOUT) + + class FirstReceiver(_receiver.Receiver): + + def complete(self, terminal_metadata, code, details): + super(FirstReceiver, self).complete( + terminal_metadata, code, details) + make_second_invocation() + + first_receiver = FirstReceiver() + + self._invoker.event(group, method)( + first_request, first_receiver, first_receiver.abort, + test_constants.LONG_TIMEOUT) + second_receiver.block_until_terminated() + + first_response = first_receiver.unary_response() + second_response = second_receiver.unary_response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + first_receiver = _receiver.Receiver() + second_request = test_messages.request() + second_receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + first_request, first_receiver, first_receiver.abort, + test_constants.LONG_TIMEOUT) + self._invoker.event(group, method)( + second_request, second_receiver, second_receiver.abort, + test_constants.LONG_TIMEOUT) + first_receiver.block_until_terminated() + second_receiver.block_until_terminated() + + first_response = first_receiver.unary_response() + second_response = second_receiver.unary_response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.pause(): + call = self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + call.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testCancelledUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + call = self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + call.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testCancelledStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testCancelledStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + call_consumer.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.pause(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.pause(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.SHORT_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.SHORT_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.SHORT_TIMEOUT) + for request in requests: + call_consumer.consume(request) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.fail(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.fail(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + with self._control.fail(): + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + with self._control.fail(): + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py new file mode 100644 index 0000000000..a649362cef --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -0,0 +1,378 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test code for the Face layer of RPC Framework.""" + +import abc +import contextlib +import threading +import unittest + +# test_interfaces is referenced from specification in this module. +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.face import face +from grpc_test.framework.common import test_constants +from grpc_test.framework.common import test_control +from grpc_test.framework.common import test_coverage +from grpc_test.framework.interfaces.face import _digest +from grpc_test.framework.interfaces.face import _stock_service +from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + + +class _PauseableIterator(object): + + def __init__(self, upstream): + self._upstream = upstream + self._condition = threading.Condition() + self._paused = False + + @contextlib.contextmanager + def pause(self): + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while self._paused: + self._condition.wait() + return next(self._upstream) + + +class TestCase(test_coverage.Coverage, unittest.TestCase): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must have an "implementation" attribute of type + test_interfaces.Implementation and an "invoker_constructor" attribute of type + _invocation.InvokerConstructor. + """ + __metaclass__ = abc.ABCMeta + + NAME = 'FutureInvocationAsynchronousEventServiceTest' + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self._control = test_control.PauseFailControl() + self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) + self._digest = _digest.digest( + _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool) + + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.event_method_implementations, None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.implementation.destantiate(self._memo) + self._digest_pool.shutdown(wait=True) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response = response_future.result() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_future = self._invoker.future(group, method)( + request_iterator, test_constants.LONG_TIMEOUT) + response = response_future.result() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_iterator = self._invoker.future(group, method)( + request_iterator, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + + test_messages.verify(first_request, first_response, self) + + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + second_response = second_response_future.result() + + test_messages.verify(second_request, second_response, self) + + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + cancel_method_return_value = response_future.cancel() + + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + + def testCancelledUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(face.CancellationError): + next(response_iterator) + + def testCancelledStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + cancel_method_return_value = response_future.cancel() + + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + + def testCancelledStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(face.CancellationError): + next(response_iterator) + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_future = self._invoker.future( + group, method)(request, test_constants.SHORT_TIMEOUT) + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + request, test_constants.SHORT_TIMEOUT) + with self.assertRaises(face.ExpirationError): + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + iter(requests), test_constants.SHORT_TIMEOUT) + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + iter(requests), test_constants.SHORT_TIMEOUT) + with self.assertRaises(face.ExpirationError): + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(): + response_future = self._invoker.future(group, method)( + request, test_constants.SHORT_TIMEOUT) + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self._control.fail(), self.assertRaises(face.ExpirationError): + response_iterator = self._invoker.future(group, method)( + request, test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(): + response_future = self._invoker.future(group, method)( + iter(requests), test_constants.SHORT_TIMEOUT) + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self._control.fail(), self.assertRaises(face.ExpirationError): + response_iterator = self._invoker.future(group, method)( + iter(requests), test_constants.SHORT_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py new file mode 100644 index 0000000000..448e845a08 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_invocation.py @@ -0,0 +1,213 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Coverage across the Face layer's generic-to-dynamic range for invocation.""" + +import abc + +from grpc.framework.common import cardinality + +_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = { + cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', +} + +_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = { + cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', +} + +_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = { + cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream', +} + +_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = { + cardinality.Cardinality.UNARY_UNARY: 'unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'stream_stream', +} + + +class Invoker(object): + """A type used to invoke test RPCs.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def blocking(self, group, name): + """Invokes an RPC with blocking control flow.""" + raise NotImplementedError() + + @abc.abstractmethod + def future(self, group, name): + """Invokes an RPC with future control flow.""" + raise NotImplementedError() + + @abc.abstractmethod + def event(self, group, name): + """Invokes an RPC with event control flow.""" + raise NotImplementedError() + + +class InvokerConstructor(object): + """A type used to create Invokers.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Specifies the name of the Invoker constructed by this object.""" + raise NotImplementedError() + + @abc.abstractmethod + def construct_invoker(self, generic_stub, dynamic_stubs, methods): + """Constructs an Invoker for the given stubs and methods.""" + raise NotImplementedError() + + +class _GenericInvoker(Invoker): + + def __init__(self, generic_stub, methods): + self._stub = generic_stub + self._methods = methods + + def _behavior(self, group, name, cardinality_to_generic_method): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, cardinality_to_generic_method[method_cardinality]) + return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) + + def blocking(self, group, name): + return self._behavior( + group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) + + def future(self, group, name): + return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) + + def event(self, group, name): + return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) + + +class _GenericInvokerConstructor(InvokerConstructor): + + def name(self): + return 'GenericInvoker' + + def construct_invoker(self, generic_stub, dynamic_stub, methods): + return _GenericInvoker(generic_stub, methods) + + +class _MultiCallableInvoker(Invoker): + + def __init__(self, generic_stub, methods): + self._stub = generic_stub + self._methods = methods + + def _multi_callable(self, group, name): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, + _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) + return behavior(group, name) + + def blocking(self, group, name): + return self._multi_callable(group, name) + + def future(self, group, name): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, + _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) + if method_cardinality in ( + cardinality.Cardinality.UNARY_UNARY, + cardinality.Cardinality.STREAM_UNARY): + return behavior(group, name).future + else: + return behavior(group, name) + + def event(self, group, name): + return self._multi_callable(group, name).event + + +class _MultiCallableInvokerConstructor(InvokerConstructor): + + def name(self): + return 'MultiCallableInvoker' + + def construct_invoker(self, generic_stub, dynamic_stub, methods): + return _MultiCallableInvoker(generic_stub, methods) + + +class _DynamicInvoker(Invoker): + + def __init__(self, dynamic_stubs, methods): + self._stubs = dynamic_stubs + self._methods = methods + + def blocking(self, group, name): + return getattr(self._stubs[group], name) + + def future(self, group, name): + if self._methods[group, name].cardinality() in ( + cardinality.Cardinality.UNARY_UNARY, + cardinality.Cardinality.STREAM_UNARY): + return getattr(self._stubs[group], name).future + else: + return getattr(self._stubs[group], name) + + def event(self, group, name): + return getattr(self._stubs[group], name).event + + +class _DynamicInvokerConstructor(InvokerConstructor): + + def name(self): + return 'DynamicInvoker' + + def construct_invoker(self, generic_stub, dynamic_stubs, methods): + return _DynamicInvoker(dynamic_stubs, methods) + + +def invoker_constructors(): + """Creates a sequence of InvokerConstructors to use in tests of RPCs. + + Returns: + A sequence of InvokerConstructors. + """ + return ( + _GenericInvokerConstructor(), + _MultiCallableInvokerConstructor(), + _DynamicInvokerConstructor(), + ) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py new file mode 100644 index 0000000000..2e444ff09d --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_receiver.py @@ -0,0 +1,95 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A utility useful in tests of asynchronous, event-driven interfaces.""" + +import threading + +from grpc.framework.interfaces.face import face + + +class Receiver(face.ResponseReceiver): + """A utility object useful in tests of asynchronous code.""" + + def __init__(self): + self._condition = threading.Condition() + self._initial_metadata = None + self._responses = [] + self._terminal_metadata = None + self._code = None + self._details = None + self._completed = False + self._abortion = None + + def abort(self, abortion): + with self._condition: + self._abortion = abortion + self._condition.notify_all() + + def initial_metadata(self, initial_metadata): + with self._condition: + self._initial_metadata = initial_metadata + + def response(self, response): + with self._condition: + self._responses.append(response) + + def complete(self, terminal_metadata, code, details): + with self._condition: + self._terminal_metadata = terminal_metadata + self._code = code + self._details = details + self._completed = True + self._condition.notify_all() + + def block_until_terminated(self): + with self._condition: + while self._abortion is None and not self._completed: + self._condition.wait() + + def unary_response(self): + with self._condition: + if self._abortion is not None: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + elif len(self._responses) != 1: + raise AssertionError( + '%d responses received, not exactly one!', len(self._responses)) + else: + return self._responses[0] + + def stream_responses(self): + with self._condition: + if self._abortion is None: + return list(self._responses) + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def abortion(self): + with self._condition: + return self._abortion diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py new file mode 100644 index 0000000000..e25b8a038c --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_service.py @@ -0,0 +1,332 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# face is referenced from specification in this module. +from grpc.framework.interfaces.face import face # pylint: disable=unused-import +from grpc_test.framework.interfaces.face import test_interfaces + + +class UnaryUnaryTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a unary-unary method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context, control): + """Services an RPC that accepts one message and produces one message. + + Args: + request: The single request message for the RPC. + response_callback: A callback to be called to accept the response message + of the RPC. + context: An face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryUnaryTestMessages(object): + """A type for unary-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, response, test_case): + """Verifies that the computed response matches the given request. + + Args: + request: A request message. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class UnaryStreamTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a unary-stream method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context, control): + """Services an RPC that takes one message and produces a stream of messages. + + Args: + request: The single request message for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: A face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryStreamTestMessages(object): + """A type for unary-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, responses, test_case): + """Verifies that the computed responses match the given request. + + Args: + request: A request message. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and responses do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamUnaryTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a stream-unary method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context, control): + """Services an RPC that takes a stream of messages and produces one message. + + Args: + response_callback: A callback to be called to accept the response message + of the RPC. + context: A face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamUnaryTestMessages(object): + """A type for stream-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, response, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamStreamTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a stream-stream method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context, control): + """Services an RPC that accepts and produces streams of messages. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: A face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamStreamTestMessages(object): + """A type for stream-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, responses, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and responses do not match, indicating + that there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class TestService(object): + """A specification of implemented methods to use in tests.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def unary_unary_scenarios(self): + """Affords unary-request-unary-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a UnaryUnaryTestMethodImplementation object + and the second element is a sequence of UnaryUnaryTestMethodMessages + objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream_scenarios(self): + """Affords unary-request-stream-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a UnaryStreamTestMethodImplementation + object and the second element is a sequence of + UnaryStreamTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary_scenarios(self): + """Affords stream-request-unary-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a StreamUnaryTestMethodImplementation + object and the second element is a sequence of + StreamUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_stream_scenarios(self): + """Affords stream-request-stream-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a StreamStreamTestMethodImplementation + object and the second element is a sequence of + StreamStreamTestMethodMessages objects. + """ + raise NotImplementedError() diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py new file mode 100644 index 0000000000..1dd2ec3633 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/_stock_service.py @@ -0,0 +1,396 @@ +B# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Examples of Python implementations of the stock.proto Stock service.""" + +from grpc.framework.common import cardinality +from grpc.framework.foundation import abandonment +from grpc.framework.foundation import stream +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.face import _service +from grpc_test._junkdrawer import stock_pb2 + +_STOCK_GROUP_NAME = 'Stock' +_SYMBOL_FORMAT = 'test symbol:%03d' + +# A test-appropriate security-pricing function. :-P +_price = lambda symbol_name: float(hash(symbol_name) % 4096) + + +def _get_last_trade_price(stock_request, stock_reply_callback, control, active): + """A unary-request, unary-response test method.""" + control.control() + if active(): + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol))) + else: + raise abandonment.Abandoned() + + +def _get_last_trade_price_multiple(stock_reply_consumer, control, active): + """A stream-request, stream-response test method.""" + def stock_reply_for_stock_request(stock_request): + control.control() + if active(): + return stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol)) + else: + raise abandonment.Abandoned() + + class StockRequestConsumer(stream.Consumer): + + def consume(self, stock_request): + stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request)) + + def terminate(self): + control.control() + stock_reply_consumer.terminate() + + def consume_and_terminate(self, stock_request): + stock_reply_consumer.consume_and_terminate( + stock_reply_for_stock_request(stock_request)) + + return StockRequestConsumer() + + +def _watch_future_trades(stock_request, stock_reply_consumer, control, active): + """A unary-request, stream-response test method.""" + base_price = _price(stock_request.symbol) + for index in range(stock_request.num_trades_to_watch): + control.control() + if active(): + stock_reply_consumer.consume( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=base_price + index)) + else: + raise abandonment.Abandoned() + stock_reply_consumer.terminate() + + +def _get_highest_trade_price(stock_reply_callback, control, active): + """A stream-request, unary-response test method.""" + + class StockRequestConsumer(stream.Consumer): + """Keeps an ongoing record of the most valuable symbol yet consumed.""" + + def __init__(self): + self._symbol = None + self._price = None + + def consume(self, stock_request): + control.control() + if active(): + if self._price is None: + self._symbol = stock_request.symbol + self._price = _price(stock_request.symbol) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + self._symbol = stock_request.symbol + self._price = candidate_price + + def terminate(self): + control.control() + if active(): + if self._symbol is None: + raise ValueError() + else: + stock_reply_callback( + stock_pb2.StockReply(symbol=self._symbol, price=self._price)) + self._symbol = None + self._price = None + + def consume_and_terminate(self, stock_request): + control.control() + if active(): + if self._price is None: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=candidate_price)) + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + + self._symbol = None + self._price = None + + return StockRequestConsumer() + + +class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation): + """GetLastTradePrice for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'GetLastTradePrice' + + def cardinality(self): + return cardinality.Cardinality.UNARY_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_callback, context, control): + _get_last_trade_price( + request, response_callback, control, context.is_active) + + +class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages): + + def __init__(self): + self._index = 0 + + def request(self): + symbol = _SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest(symbol=symbol) + + def verify(self, request, response, test_case): + test_case.assertEqual(request.symbol, response.symbol) + test_case.assertEqual(_price(request.symbol), response.price) + + +class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation): + """GetLastTradePriceMultiple for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'GetLastTradePriceMultiple' + + def cardinality(self): + return cardinality.Cardinality.STREAM_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_consumer, context, control): + return _get_last_trade_price_multiple( + response_consumer, control, context.is_active) + + +class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages): + """Pairs of message streams for use with GetLastTradePriceMultiple.""" + + def __init__(self): + self._index = 0 + + def requests(self): + base_index = self._index + self._index += 1 + return [ + stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) + for index in range(test_constants.STREAM_LENGTH)] + + def verify(self, requests, responses, test_case): + test_case.assertEqual(len(requests), len(responses)) + for stock_request, stock_reply in zip(requests, responses): + test_case.assertEqual(stock_request.symbol, stock_reply.symbol) + test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) + + +class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation): + """WatchFutureTrades for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'WatchFutureTrades' + + def cardinality(self): + return cardinality.Cardinality.UNARY_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_consumer, context, control): + _watch_future_trades(request, response_consumer, control, context.is_active) + + +class WatchFutureTradesMessages(_service.UnaryStreamTestMessages): + """Pairs of a single request message and a sequence of response messages.""" + + def __init__(self): + self._index = 0 + + def request(self): + symbol = _SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest( + symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) + + def verify(self, request, responses, test_case): + test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) + base_price = _price(request.symbol) + for index, response in enumerate(responses): + test_case.assertEqual(base_price + index, response.price) + + +class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation): + """GetHighestTradePrice for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'GetHighestTradePrice' + + def cardinality(self): + return cardinality.Cardinality.STREAM_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_callback, context, control): + return _get_highest_trade_price( + response_callback, control, context.is_active) + + +class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages): + + def requests(self): + return [ + stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) + for index in range(test_constants.STREAM_LENGTH)] + + def verify(self, requests, response, test_case): + price = None + symbol = None + for stock_request in requests: + current_symbol = stock_request.symbol + current_price = _price(current_symbol) + if price is None or price < current_price: + price = current_price + symbol = current_symbol + test_case.assertEqual(price, response.price) + test_case.assertEqual(symbol, response.symbol) + + +class StockTestService(_service.TestService): + """A corpus of test data with one method of each RPC cardinality.""" + + def unary_unary_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetLastTradePrice'): ( + GetLastTradePrice(), [GetLastTradePriceMessages()]), + } + + def unary_stream_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'WatchFutureTrades'): ( + WatchFutureTrades(), [WatchFutureTradesMessages()]), + } + + def stream_unary_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): ( + GetHighestTradePrice(), [GetHighestTradePriceMessages()]) + } + + def stream_stream_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): ( + GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]), + } + + +STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py new file mode 100644 index 0000000000..ca623662f7 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_cases.py @@ -0,0 +1,67 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tools for creating tests of implementations of the Face layer.""" + +# unittest is referenced from specification in this module. +import unittest # pylint: disable=unused-import + +# test_interfaces is referenced from specification in this module. +from grpc_test.framework.interfaces.face import _blocking_invocation_inline_service +from grpc_test.framework.interfaces.face import _event_invocation_synchronous_event_service +from grpc_test.framework.interfaces.face import _future_invocation_asynchronous_event_service +from grpc_test.framework.interfaces.face import _invocation +from grpc_test.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + +_TEST_CASE_SUPERCLASSES = ( + _blocking_invocation_inline_service.TestCase, + _event_invocation_synchronous_event_service.TestCase, + _future_invocation_asynchronous_event_service.TestCase, +) + + +def test_cases(implementation): + """Creates unittest.TestCase classes for a given Face layer implementation. + + Args: + implementation: A test_interfaces.Implementation specifying creation and + destruction of a given Face layer implementation. + + Returns: + A sequence of subclasses of unittest.TestCase defining tests of the + specified Face layer implementation. + """ + test_case_classes = [] + for invoker_constructor in _invocation.invoker_constructors(): + for super_class in _TEST_CASE_SUPERCLASSES: + test_case_classes.append( + type(invoker_constructor.name() + super_class.NAME, (super_class,), + {'implementation': implementation, + 'invoker_constructor': invoker_constructor})) + return test_case_classes diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py new file mode 100644 index 0000000000..b2b5c10fa6 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/face/test_interfaces.py @@ -0,0 +1,229 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces used in tests of implementations of the Face layer.""" + +import abc + +from grpc.framework.common import cardinality # pylint: disable=unused-import +from grpc.framework.interfaces.face import face # pylint: disable=unused-import + + +class Method(object): + """Specifies a method to be used in tests.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def group(self): + """Identify the group of the method. + + Returns: + The group of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def name(self): + """Identify the name of the method. + + Returns: + The name of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cardinality(self): + """Identify the cardinality of the method. + + Returns: + A cardinality.Cardinality value describing the streaming semantics of the + method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def request_class(self): + """Identify the class used for the method's request objects. + + Returns: + The class object of the class to which the method's request objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def response_class(self): + """Identify the class used for the method's response objects. + + Returns: + The class object of the class to which the method's response objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serialize the given request object. + + Args: + request: A request object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Synthesize a request object from a given bytestring. + + Args: + serialized_request: A bytestring deserializable into a request object + appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serialize the given response object. + + Args: + response: A response object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Synthesize a response object from a given bytestring. + + Args: + serialized_response: A bytestring deserializable into a response object + appropriate for this method. + """ + raise NotImplementedError() + + +class Implementation(object): + """Specifies an implementation of the Face layer.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def instantiate( + self, methods, method_implementations, + multi_method_implementation): + """Instantiates the Face layer implementation to be used in a test. + + Args: + methods: A sequence of Method objects describing the methods available to + be called during the test. + method_implementations: A dictionary from group-name pair to + face.MethodImplementation object specifying implementation of a method. + multi_method_implementation: A face.MultiMethodImplementation or None. + + Returns: + A sequence of length three the first element of which is a + face.GenericStub, the second element of which is dictionary from groups + to face.DynamicStubs affording invocation of the group's methods, and + the third element of which is an arbitrary memo object to be kept and + passed to destantiate at the conclusion of the test. The returned stubs + must be backed by the provided implementations. + """ + raise NotImplementedError() + + @abc.abstractmethod + def destantiate(self, memo): + """Destroys the Face layer implementation under test. + + Args: + memo: The object from the third position of the return value of a call to + instantiate. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invocation_metadata(self): + """Provides the metadata to be used when invoking a test RPC. + + Returns: + An object to use as the supplied-at-invocation-time metadata in a test + RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def initial_metadata(self): + """Provides the metadata for use as a test RPC's first servicer metadata. + + Returns: + An object to use as the from-the-servicer-before-responses metadata in a + test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminal_metadata(self): + """Provides the metadata for use as a test RPC's second servicer metadata. + + Returns: + An object to use as the from-the-servicer-after-all-responses metadata in + a test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def code(self): + """Provides the value for use as a test RPC's code. + + Returns: + An object to use as the from-the-servicer code in a test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def details(self): + """Provides the value for use as a test RPC's details. + + Returns: + An object to use as the from-the-servicer details in a test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def metadata_transmitted(self, original_metadata, transmitted_metadata): + """Identifies whether or not metadata was properly transmitted. + + Args: + original_metadata: A metadata value passed to the Face interface + implementation under test. + transmitted_metadata: The same metadata value after having been + transmitted via an RPC performed by the Face interface implementation + under test. + + Returns: + Whether or not the metadata was properly transmitted by the Face interface + implementation under test. + """ + raise NotImplementedError() |