diff options
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/framework/interfaces')
11 files changed, 1477 insertions, 1455 deletions
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py index 7086519106..b89398809f 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py @@ -26,5 +26,3 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py index 1ea356c0bf..2aec25c9ef 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """A test constant working around issue 3069.""" # test_constants is referenced from specification in this module. diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py index 7086519106..b89398809f 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py @@ -26,5 +26,3 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index e338aaa396..a79834f96f 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Test code for the Face layer of RPC Framework.""" from __future__ import division @@ -50,246 +49,254 @@ from tests.unit.framework.interfaces.face import _stock_service from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import -class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)): - """A test of the Face layer of RPC Framework. +class TestCase( + six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, + unittest.TestCase)): + """A test of the Face layer of RPC Framework. Concrete subclasses must have an "implementation" attribute of type test_interfaces.Implementation and an "invoker_constructor" attribute of type _invocation.InvokerConstructor. """ - NAME = 'BlockingInvocationInlineServiceTest' + NAME = 'BlockingInvocationInlineServiceTest' - def setUp(self): - """See unittest.TestCase.setUp for full specification. + def setUp(self): + """See unittest.TestCase.setUp for full specification. Overriding implementations must call this implementation. """ - self._control = test_control.PauseFailControl() - self._digest = _digest.digest( - _stock_service.STOCK_TEST_SERVICE, self._control, None) + self._control = test_control.PauseFailControl() + self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, + self._control, None) - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.inline_method_implementations, None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.inline_method_implementations, + None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. Overriding implementations must call this implementation. """ - self._invoker = None - self.implementation.destantiate(self._memo) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response, call = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(request, response, self) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response, call = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT, with_call=True) - - test_messages.verify(requests, response, self) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response = self._invoker.blocking(group, method)( - first_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(first_request, first_response, self) - - second_response = self._invoker.blocking(group, method)( - second_request, test_constants.LONG_TIMEOUT) - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() for response_future in response_futures] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - pool.shutdown(wait=True) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = pool.submit( - self._invoker.blocking(group, method), request, - test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures_to_indices[response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], response_future.result(), self) - pool.shutdown(wait=True) - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledUnaryRequestStreamResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestUnaryResponse(self): - raise NotImplementedError() - - @unittest.skip('Cancellation impossible with blocking control flow!') - def testCancelledStreamRequestStreamResponse(self): - raise NotImplementedError() - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - self._invoker.blocking(group, method)( - iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(), self.assertRaises( - face.ExpirationError): - response_iterator = self._invoker.blocking(group, method)( - iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - request, test_constants.LONG_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.fail(), self.assertRaises(face.RemoteError): - response_iterator = self._invoker.blocking(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - list(response_iterator) + self._invoker = None + self.implementation.destantiate(self._memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response, call = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT, with_call=True) + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response, call = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT, with_call=True) + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response = self._invoker.blocking(group, method)( + first_request, test_constants.LONG_TIMEOUT) + + test_messages.verify(first_request, first_response, self) + + second_response = self._invoker.blocking(group, method)( + second_request, test_constants.LONG_TIMEOUT) + + test_messages.verify(second_request, second_response, self) + + def testParallelInvocations(self): + pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.THREAD_CONCURRENCY): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() + for response_future in response_futures + ] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + pool.shutdown(wait=True) + + def testWaitingForSomeButNotAllParallelInvocations(self): + pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.THREAD_CONCURRENCY): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures_to_indices[response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.THREAD_CONCURRENCY // 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], + response_future.result(), self) + pool.shutdown(wait=True) + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + self._invoker.blocking(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.blocking(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + self._invoker.blocking(group, method)( + iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.blocking(group, method)( + iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(), self.assertRaises(face.RemoteError): + self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(), self.assertRaises(face.RemoteError): + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(), self.assertRaises(face.RemoteError): + self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(), self.assertRaises(face.RemoteError): + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py index f0befb0b27..0411da0a66 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Code for making a service.TestService more amenable to use in tests.""" import collections @@ -49,17 +48,16 @@ _IDENTITY = lambda x: x class TestServiceDigest( - collections.namedtuple( - 'TestServiceDigest', - ('methods', - 'inline_method_implementations', - 'event_method_implementations', - 'multi_method_implementation', - 'unary_unary_messages_sequences', - 'unary_stream_messages_sequences', - 'stream_unary_messages_sequences', - 'stream_stream_messages_sequences',))): - """A transformation of a service.TestService. + collections.namedtuple('TestServiceDigest', ( + 'methods', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences',))): + """A transformation of a service.TestService. Attributes: methods: A dict from method group-name pair to test_interfaces.Method object @@ -88,303 +86,308 @@ class TestServiceDigest( class _BufferingConsumer(stream.Consumer): - """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" - def __init__(self): - self.consumed = [] - self.terminated = False + def __init__(self): + self.consumed = [] + self.terminated = False - def consume(self, value): - self.consumed.append(value) + def consume(self, value): + self.consumed.append(value) - def terminate(self): - self.terminated = True + def terminate(self): + self.terminated = True - def consume_and_terminate(self, value): - self.consumed.append(value) - self.terminated = True + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True class _InlineUnaryUnaryMethod(face.MethodImplementation): - def __init__(self, unary_unary_test_method, control): - self._test_method = unary_unary_test_method - self._control = control + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.INLINE - def unary_unary_inline(self, request, context): - response_list = [] - self._test_method.service( - request, response_list.append, context, self._control) - return response_list.pop(0) + def unary_unary_inline(self, request, context): + response_list = [] + self._test_method.service(request, response_list.append, context, + self._control) + return response_list.pop(0) class _EventUnaryUnaryMethod(face.MethodImplementation): - def __init__(self, unary_unary_test_method, control, pool): - self._test_method = unary_unary_test_method - self._control = control - self._pool = pool + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.UNARY_UNARY - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.EVENT - def unary_unary_event(self, request, response_callback, context): - if self._pool is None: - self._test_method.service( - request, response_callback, context, self._control) - else: - self._pool.submit( - self._test_method.service, request, response_callback, context, - self._control) + def unary_unary_event(self, request, response_callback, context): + if self._pool is None: + self._test_method.service(request, response_callback, context, + self._control) + else: + self._pool.submit(self._test_method.service, request, + response_callback, context, self._control) class _InlineUnaryStreamMethod(face.MethodImplementation): - def __init__(self, unary_stream_test_method, control): - self._test_method = unary_stream_test_method - self._control = control + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.INLINE - def unary_stream_inline(self, request, context): - response_consumer = _BufferingConsumer() - self._test_method.service( - request, response_consumer, context, self._control) - for response in response_consumer.consumed: - yield response + def unary_stream_inline(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service(request, response_consumer, context, + self._control) + for response in response_consumer.consumed: + yield response class _EventUnaryStreamMethod(face.MethodImplementation): - def __init__(self, unary_stream_test_method, control, pool): - self._test_method = unary_stream_test_method - self._control = control - self._pool = pool + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.UNARY_STREAM - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.EVENT - def unary_stream_event(self, request, response_consumer, context): - if self._pool is None: - self._test_method.service( - request, response_consumer, context, self._control) - else: - self._pool.submit( - self._test_method.service, request, response_consumer, context, - self._control) + def unary_stream_event(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service(request, response_consumer, context, + self._control) + else: + self._pool.submit(self._test_method.service, request, + response_consumer, context, self._control) class _InlineStreamUnaryMethod(face.MethodImplementation): - def __init__(self, stream_unary_test_method, control): - self._test_method = stream_unary_test_method - self._control = control + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.INLINE - def stream_unary_inline(self, request_iterator, context): - response_list = [] - request_consumer = self._test_method.service( - response_list.append, context, self._control) - for request in request_iterator: - request_consumer.consume(request) - request_consumer.terminate() - return response_list.pop(0) + def stream_unary_inline(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service(response_list.append, + context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) class _EventStreamUnaryMethod(face.MethodImplementation): - def __init__(self, stream_unary_test_method, control, pool): - self._test_method = stream_unary_test_method - self._control = control - self._pool = pool + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.STREAM_UNARY - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.EVENT - def stream_unary_event(self, response_callback, context): - request_consumer = self._test_method.service( - response_callback, context, self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + def stream_unary_event(self, response_callback, context): + request_consumer = self._test_method.service(response_callback, context, + self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, + self._pool) class _InlineStreamStreamMethod(face.MethodImplementation): - def __init__(self, stream_stream_test_method, control): - self._test_method = stream_stream_test_method - self._control = control + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.INLINE + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.INLINE - def stream_stream_inline(self, request_iterator, context): - response_consumer = _BufferingConsumer() - request_consumer = self._test_method.service( - response_consumer, context, self._control) + def stream_stream_inline(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service(response_consumer, context, + self._control) - for request in request_iterator: - request_consumer.consume(request) - while response_consumer.consumed: - yield response_consumer.consumed.pop(0) - response_consumer.terminate() + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() class _EventStreamStreamMethod(face.MethodImplementation): - def __init__(self, stream_stream_test_method, control, pool): - self._test_method = stream_stream_test_method - self._control = control - self._pool = pool + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool - self.cardinality = cardinality.Cardinality.STREAM_STREAM - self.style = style.Service.EVENT + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.EVENT - def stream_stream_event(self, response_consumer, context): - request_consumer = self._test_method.service( - response_consumer, context, self._control) - if self._pool is None: - return request_consumer - else: - return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + def stream_stream_event(self, response_consumer, context): + request_consumer = self._test_method.service(response_consumer, context, + self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, + self._pool) class _UnaryConsumer(stream.Consumer): - """A Consumer that only allows consumption of exactly one value.""" - - def __init__(self, action): - self._lock = threading.Lock() - self._action = action - self._consumed = False - self._terminated = False - - def consume(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - - self._action(value) - - def terminate(self): - with self._lock: - if not self._consumed: - raise ValueError('Unary consumer hasn\'t yet consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._terminated = True - - def consume_and_terminate(self, value): - with self._lock: - if self._consumed: - raise ValueError('Unary consumer already consumed!') - elif self._terminated: - raise ValueError('Unary consumer already terminated!') - else: - self._consumed = True - self._terminated = True - - self._action(value) + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) class _UnaryUnaryAdaptation(object): - def __init__(self, unary_unary_test_method): - self._method = unary_unary_test_method + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + + def action(request): + self._method.service(request, + response_consumer.consume_and_terminate, + context, control) - def service(self, response_consumer, context, control): - def action(request): - self._method.service( - request, response_consumer.consume_and_terminate, context, control) - return _UnaryConsumer(action) + return _UnaryConsumer(action) class _UnaryStreamAdaptation(object): - def __init__(self, unary_stream_test_method): - self._method = unary_stream_test_method + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + + def action(request): + self._method.service(request, response_consumer, context, control) - def service(self, response_consumer, context, control): - def action(request): - self._method.service(request, response_consumer, context, control) - return _UnaryConsumer(action) + return _UnaryConsumer(action) class _StreamUnaryAdaptation(object): - def __init__(self, stream_unary_test_method): - self._method = stream_unary_test_method + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method - def service(self, response_consumer, context, control): - return self._method.service( - response_consumer.consume_and_terminate, context, control) + def service(self, response_consumer, context, control): + return self._method.service(response_consumer.consume_and_terminate, + context, control) class _MultiMethodImplementation(face.MultiMethodImplementation): - def __init__(self, methods, control, pool): - self._methods = methods - self._control = control - self._pool = pool + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool - def service(self, group, name, response_consumer, context): - method = self._methods.get(group, name, None) - if method is None: - raise face.NoSuchMethodError(group, name) - elif self._pool is None: - return method(response_consumer, context, self._control) - else: - request_consumer = method(response_consumer, context, self._control) - return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + def service(self, group, name, response_consumer, context): + method = self._methods.get(group, name, None) + if method is None: + raise face.NoSuchMethodError(group, name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, + self._pool) class _Assembly( - collections.namedtuple( - '_Assembly', - ['methods', 'inlines', 'events', 'adaptations', 'messages'])): - """An intermediate structure created when creating a TestServiceDigest.""" - - -def _assemble( - scenarios, identifiers, inline_method_constructor, event_method_constructor, - adapter, control, pool): - """Creates an _Assembly from the given scenarios.""" - methods = {} - inlines = {} - events = {} - adaptations = {} - messages = {} - for identifier, scenario in six.iteritems(scenarios): - if identifier in identifiers: - raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) - - test_method = scenario[0] - inline_method = inline_method_constructor(test_method, control) - event_method = event_method_constructor(test_method, control, pool) - adaptation = adapter(test_method) - - methods[identifier] = test_method - inlines[identifier] = inline_method - events[identifier] = event_method - adaptations[identifier] = adaptation - messages[identifier] = scenario[1] - - return _Assembly(methods, inlines, events, adaptations, messages) + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble(scenarios, identifiers, inline_method_constructor, + event_method_constructor, adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = {} + inlines = {} + events = {} + adaptations = {} + messages = {} + for identifier, scenario in six.iteritems(scenarios): + if identifier in identifiers: + raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods[identifier] = test_method + inlines[identifier] = inline_method + events[identifier] = event_method + adaptations[identifier] = adaptation + messages[identifier] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) def digest(service, control, pool): - """Creates a TestServiceDigest from a TestService. + """Creates a TestServiceDigest from a TestService. Args: service: A _service.TestService. @@ -396,51 +399,48 @@ def digest(service, control, pool): Returns: A TestServiceDigest synthesized from the given service.TestService. """ - identifiers = set() - - unary_unary = _assemble( - service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod, - _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) - identifiers.update(unary_unary.inlines) - - unary_stream = _assemble( - service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod, - _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) - identifiers.update(unary_stream.inlines) - - stream_unary = _assemble( - service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod, - _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) - identifiers.update(stream_unary.inlines) - - stream_stream = _assemble( - service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod, - _EventStreamStreamMethod, _IDENTITY, control, pool) - identifiers.update(stream_stream.inlines) - - methods = dict(unary_unary.methods) - methods.update(unary_stream.methods) - methods.update(stream_unary.methods) - methods.update(stream_stream.methods) - adaptations = dict(unary_unary.adaptations) - adaptations.update(unary_stream.adaptations) - adaptations.update(stream_unary.adaptations) - adaptations.update(stream_stream.adaptations) - inlines = dict(unary_unary.inlines) - inlines.update(unary_stream.inlines) - inlines.update(stream_unary.inlines) - inlines.update(stream_stream.inlines) - events = dict(unary_unary.events) - events.update(unary_stream.events) - events.update(stream_unary.events) - events.update(stream_stream.events) - - return TestServiceDigest( - methods, - inlines, - events, - _MultiMethodImplementation(adaptations, control, pool), - unary_unary.messages, - unary_stream.messages, - stream_unary.messages, - stream_stream.messages) + identifiers = set() + + unary_unary = _assemble(service.unary_unary_scenarios(), identifiers, + _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod, + _UnaryUnaryAdaptation, control, pool) + identifiers.update(unary_unary.inlines) + + unary_stream = _assemble(service.unary_stream_scenarios(), identifiers, + _InlineUnaryStreamMethod, _EventUnaryStreamMethod, + _UnaryStreamAdaptation, control, pool) + identifiers.update(unary_stream.inlines) + + stream_unary = _assemble(service.stream_unary_scenarios(), identifiers, + _InlineStreamUnaryMethod, _EventStreamUnaryMethod, + _StreamUnaryAdaptation, control, pool) + identifiers.update(stream_unary.inlines) + + stream_stream = _assemble(service.stream_stream_scenarios(), identifiers, + _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, + pool) + identifiers.update(stream_stream.inlines) + + methods = dict(unary_unary.methods) + methods.update(unary_stream.methods) + methods.update(stream_unary.methods) + methods.update(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + inlines = dict(unary_unary.inlines) + inlines.update(unary_stream.inlines) + inlines.update(stream_unary.inlines) + inlines.update(stream_stream.inlines) + events = dict(unary_unary.events) + events.update(unary_stream.events) + events.update(stream_unary.events) + events.update(stream_stream.events) + + return TestServiceDigest( + methods, inlines, events, + _MultiMethodImplementation(adaptations, control, pool), + unary_unary.messages, unary_stream.messages, stream_unary.messages, + stream_stream.messages) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index df620b19ba..703eef3a82 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Test code for the Face layer of RPC Framework.""" from __future__ import division @@ -55,457 +54,470 @@ from tests.unit.framework.interfaces.face import test_interfaces # pylint: disa class _PauseableIterator(object): - def __init__(self, upstream): - self._upstream = upstream - self._condition = threading.Condition() - self._paused = False + def __init__(self, upstream): + self._upstream = upstream + self._condition = threading.Condition() + self._paused = False - @contextlib.contextmanager - def pause(self): - with self._condition: - self._paused = True - yield - with self._condition: - self._paused = False - self._condition.notify_all() + @contextlib.contextmanager + def pause(self): + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() - def __iter__(self): - return self + def __iter__(self): + return self - def __next__(self): - return self.next() + def __next__(self): + return self.next() - def next(self): - with self._condition: - while self._paused: - self._condition.wait() - return next(self._upstream) + def next(self): + with self._condition: + while self._paused: + self._condition.wait() + return next(self._upstream) class _Callback(object): - def __init__(self): - self._condition = threading.Condition() - self._called = False - self._passed_future = None - self._passed_other_stuff = None - - def __call__(self, *args, **kwargs): - with self._condition: - self._called = True - if args: - self._passed_future = args[0] - if 1 < len(args) or kwargs: - self._passed_other_stuff = tuple(args[1:]), dict(kwargs) - self._condition.notify_all() - - def future(self): - with self._condition: - while True: - if self._passed_other_stuff is not None: - raise ValueError( - 'Test callback passed unexpected values: %s', - self._passed_other_stuff) - elif self._called: - return self._passed_future - else: - self._condition.wait() - - -class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)): - """A test of the Face layer of RPC Framework. + def __init__(self): + self._condition = threading.Condition() + self._called = False + self._passed_future = None + self._passed_other_stuff = None + + def __call__(self, *args, **kwargs): + with self._condition: + self._called = True + if args: + self._passed_future = args[0] + if 1 < len(args) or kwargs: + self._passed_other_stuff = tuple(args[1:]), dict(kwargs) + self._condition.notify_all() + + def future(self): + with self._condition: + while True: + if self._passed_other_stuff is not None: + raise ValueError( + 'Test callback passed unexpected values: %s', + self._passed_other_stuff) + elif self._called: + return self._passed_future + else: + self._condition.wait() + + +class TestCase( + six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, + unittest.TestCase)): + """A test of the Face layer of RPC Framework. Concrete subclasses must have an "implementation" attribute of type test_interfaces.Implementation and an "invoker_constructor" attribute of type _invocation.InvokerConstructor. """ - NAME = 'FutureInvocationAsynchronousEventServiceTest' + NAME = 'FutureInvocationAsynchronousEventServiceTest' - def setUp(self): - """See unittest.TestCase.setUp for full specification. + def setUp(self): + """See unittest.TestCase.setUp for full specification. Overriding implementations must call this implementation. """ - self._control = test_control.PauseFailControl() - self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) - self._digest = _digest.digest( - _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool) + self._control = test_control.PauseFailControl() + self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) + self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE, + self._control, self._digest_pool) - generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( - self._digest.methods, self._digest.event_method_implementations, None) - self._invoker = self.invoker_constructor.construct_invoker( - generic_stub, dynamic_stubs, self._digest.methods) + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.event_method_implementations, + None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) - def tearDown(self): - """See unittest.TestCase.tearDown for full specification. + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. Overriding implementations must call this implementation. """ - self._invoker = None - self.implementation.destantiate(self._memo) - self._digest_pool.shutdown(wait=True) - - def testSuccessfulUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - response = response_future.result() - - test_messages.verify(request, response, self) - self.assertIs(callback.future(), response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(request, responses, self) - - def testSuccessfulStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - callback = _Callback() - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_future = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - future_passed_to_callback = callback.future() - response = future_passed_to_callback.result() - - test_messages.verify(requests, response, self) - self.assertIs(future_passed_to_callback, response_future) - self.assertIsNone(response_future.exception()) - self.assertIsNone(response_future.traceback()) - - def testSuccessfulStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - request_iterator = _PauseableIterator(iter(requests)) - - # Use of a paused iterator of requests allows us to test that control is - # returned to calling code before the iterator yields any requests. - with request_iterator.pause(): - response_iterator = self._invoker.future(group, method)( - request_iterator, test_constants.LONG_TIMEOUT) - responses = list(response_iterator) - - test_messages.verify(requests, responses, self) - - def testSequentialInvocations(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - - test_messages.verify(first_request, first_response, self) - - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - second_response = second_response_future.result() - - test_messages.verify(second_request, second_response, self) - - def testParallelInvocations(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures = [] - for _ in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - requests.append(request) - response_futures.append(response_future) - - responses = [ - response_future.result() for response_future in response_futures] - - for request, response in zip(requests, responses): - test_messages.verify(request, response, self) - - def testWaitingForSomeButNotAllParallelInvocations(self): - pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = [] - response_futures_to_indices = {} - for index in range(test_constants.THREAD_CONCURRENCY): - request = test_messages.request() - inner_response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - outer_response_future = pool.submit(inner_response_future.result) - requests.append(request) - response_futures_to_indices[outer_response_future] = index - - some_completed_response_futures_iterator = itertools.islice( - futures.as_completed(response_futures_to_indices), - test_constants.THREAD_CONCURRENCY // 2) - for response_future in some_completed_response_futures_iterator: - index = response_futures_to_indices[response_future] - test_messages.verify(requests[index], response_future.result(), self) - pool.shutdown(wait=True) - - def testCancelledUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testCancelledStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_future.add_done_callback(callback) - cancel_method_return_value = response_future.cancel() - - self.assertIs(callback.future(), response_future) - self.assertFalse(cancel_method_return_value) - self.assertTrue(response_future.cancelled()) - with self.assertRaises(future.CancelledError): - response_future.result() - with self.assertRaises(future.CancelledError): - response_future.exception() - with self.assertRaises(future.CancelledError): - response_future.traceback() - - def testCancelledStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - iter(requests), test_constants.LONG_TIMEOUT) - response_iterator.cancel() - - with self.assertRaises(face.CancellationError): - next(response_iterator) - - def testExpiredUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future( - group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance( - response_future.exception(), face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance( - response_future.exception(), face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testExpiredStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - - with self._control.pause(): - response_future = self._invoker.future(group, method)( - iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - self.assertIs(callback.future(), response_future) - self.assertIsInstance( - response_future.exception(), face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsInstance( - response_future.exception(), face.AbortionError) - self.assertIsNotNone(response_future.traceback()) - - def testExpiredStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - with self._control.pause(): - response_iterator = self._invoker.future(group, method)( - iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) - with self.assertRaises(face.ExpirationError): - list(response_iterator) - - def testFailedUnaryRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_unary_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance( - response_future.exception(), face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedUnaryRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.unary_stream_messages_sequences)): - for test_messages in test_messages_sequence: - request = test_messages.request() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises(face.ExpirationError): - response_iterator = self._invoker.future(group, method)( - request, _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) - - def testFailedStreamRequestUnaryResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_unary_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - callback = _Callback() - abortion_callback = _Callback() - - with self._control.fail(): - response_future = self._invoker.future(group, method)( - iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) - response_future.add_done_callback(callback) - response_future.add_abortion_callback(abortion_callback) - - self.assertIs(callback.future(), response_future) - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is - # indistinguishable from simply not having called its - # response_callback before the expiration of the RPC. - self.assertIsInstance( - response_future.exception(), face.ExpirationError) - with self.assertRaises(face.ExpirationError): - response_future.result() - self.assertIsNotNone(response_future.traceback()) - self.assertIsNotNone(abortion_callback.future()) - - def testFailedStreamRequestStreamResponse(self): - for (group, method), test_messages_sequence in ( - six.iteritems(self._digest.stream_stream_messages_sequences)): - for test_messages in test_messages_sequence: - requests = test_messages.requests() - - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_consumer before the - # expiration of the RPC. - with self._control.fail(), self.assertRaises(face.ExpirationError): - response_iterator = self._invoker.future(group, method)( - iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) - list(response_iterator) + self._invoker = None + self.implementation.destantiate(self._memo) + self._digest_pool.shutdown(wait=True) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + response = response_future.result() + + test_messages.verify(request, response, self) + self.assertIs(callback.future(), response_future) + self.assertIsNone(response_future.exception()) + self.assertIsNone(response_future.traceback()) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + callback = _Callback() + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_future = self._invoker.future(group, method)( + request_iterator, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + future_passed_to_callback = callback.future() + response = future_passed_to_callback.result() + + test_messages.verify(requests, response, self) + self.assertIs(future_passed_to_callback, response_future) + self.assertIsNone(response_future.exception()) + self.assertIsNone(response_future.traceback()) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_iterator = self._invoker.future(group, method)( + request_iterator, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + + test_messages.verify(first_request, first_response, self) + + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + second_response = second_response_future.result() + + test_messages.verify(second_request, second_response, self) + + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.THREAD_CONCURRENCY): + request = test_messages.request() + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() + for response_future in response_futures + ] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + + def testWaitingForSomeButNotAllParallelInvocations(self): + pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.THREAD_CONCURRENCY): + request = test_messages.request() + inner_response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + outer_response_future = pool.submit( + inner_response_future.result) + requests.append(request) + response_futures_to_indices[outer_response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.THREAD_CONCURRENCY // 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], + response_future.result(), self) + pool.shutdown(wait=True) + + def testCancelledUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + cancel_method_return_value = response_future.cancel() + + self.assertIs(callback.future(), response_future) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + with self.assertRaises(future.CancelledError): + response_future.result() + with self.assertRaises(future.CancelledError): + response_future.exception() + with self.assertRaises(future.CancelledError): + response_future.traceback() + + def testCancelledUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(face.CancellationError): + next(response_iterator) + + def testCancelledStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + cancel_method_return_value = response_future.cancel() + + self.assertIs(callback.future(), response_future) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + with self.assertRaises(future.CancelledError): + response_future.result() + with self.assertRaises(future.CancelledError): + response_future.exception() + with self.assertRaises(future.CancelledError): + response_future.traceback() + + def testCancelledStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(face.CancellationError): + next(response_iterator) + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) + self.assertIsInstance(response_future.exception(), + face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + self.assertIsInstance(response_future.exception(), + face.AbortionError) + self.assertIsNotNone(response_future.traceback()) + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + with self.assertRaises(face.ExpirationError): + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) + self.assertIsInstance(response_future.exception(), + face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + self.assertIsInstance(response_future.exception(), + face.AbortionError) + self.assertIsNotNone(response_future.traceback()) + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) + with self.assertRaises(face.ExpirationError): + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_unary_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + abortion_callback = _Callback() + + with self._control.fail(): + response_future = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + response_future.add_abortion_callback(abortion_callback) + + self.assertIs(callback.future(), response_future) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance(response_future.exception(), + face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + self.assertIsNotNone(response_future.traceback()) + self.assertIsNotNone(abortion_callback.future()) + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.unary_stream_messages_sequences)): + for test_messages in test_messages_sequence: + request = test_messages.request() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self._control.fail(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_unary_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = _Callback() + abortion_callback = _Callback() + + with self._control.fail(): + response_future = self._invoker.future(group, method)( + iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + response_future.add_abortion_callback(abortion_callback) + + self.assertIs(callback.future(), response_future) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance(response_future.exception(), + face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + self.assertIsNotNone(response_future.traceback()) + self.assertIsNotNone(abortion_callback.future()) + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + six.iteritems(self._digest.stream_stream_messages_sequences)): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self._control.fail(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.future(group, method)( + iter(requests), + _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py index ac487bed4f..4e144a3635 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Coverage across the Face layer's generic-to-dynamic range for invocation.""" import abc @@ -65,149 +64,149 @@ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = { class Invoker(six.with_metaclass(abc.ABCMeta)): - """A type used to invoke test RPCs.""" + """A type used to invoke test RPCs.""" - @abc.abstractmethod - def blocking(self, group, name): - """Invokes an RPC with blocking control flow.""" - raise NotImplementedError() + @abc.abstractmethod + def blocking(self, group, name): + """Invokes an RPC with blocking control flow.""" + raise NotImplementedError() - @abc.abstractmethod - def future(self, group, name): - """Invokes an RPC with future control flow.""" - raise NotImplementedError() + @abc.abstractmethod + def future(self, group, name): + """Invokes an RPC with future control flow.""" + raise NotImplementedError() - @abc.abstractmethod - def event(self, group, name): - """Invokes an RPC with event control flow.""" - raise NotImplementedError() + @abc.abstractmethod + def event(self, group, name): + """Invokes an RPC with event control flow.""" + raise NotImplementedError() class InvokerConstructor(six.with_metaclass(abc.ABCMeta)): - """A type used to create Invokers.""" + """A type used to create Invokers.""" - @abc.abstractmethod - def name(self): - """Specifies the name of the Invoker constructed by this object.""" - raise NotImplementedError() + @abc.abstractmethod + def name(self): + """Specifies the name of the Invoker constructed by this object.""" + raise NotImplementedError() - @abc.abstractmethod - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - """Constructs an Invoker for the given stubs and methods.""" - raise NotImplementedError() + @abc.abstractmethod + def construct_invoker(self, generic_stub, dynamic_stubs, methods): + """Constructs an Invoker for the given stubs and methods.""" + raise NotImplementedError() class _GenericInvoker(Invoker): - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods + def __init__(self, generic_stub, methods): + self._stub = generic_stub + self._methods = methods - def _behavior(self, group, name, cardinality_to_generic_method): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, cardinality_to_generic_method[method_cardinality]) - return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) + def _behavior(self, group, name, cardinality_to_generic_method): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr(self._stub, + cardinality_to_generic_method[method_cardinality]) + return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) - def blocking(self, group, name): - return self._behavior( - group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) + def blocking(self, group, name): + return self._behavior(group, name, + _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) - def future(self, group, name): - return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) + def future(self, group, name): + return self._behavior(group, name, + _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) - def event(self, group, name): - return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) + def event(self, group, name): + return self._behavior(group, name, + _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) class _GenericInvokerConstructor(InvokerConstructor): - def name(self): - return 'GenericInvoker' + def name(self): + return 'GenericInvoker' - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _GenericInvoker(generic_stub, methods) + def construct_invoker(self, generic_stub, dynamic_stub, methods): + return _GenericInvoker(generic_stub, methods) class _MultiCallableInvoker(Invoker): - def __init__(self, generic_stub, methods): - self._stub = generic_stub - self._methods = methods + def __init__(self, generic_stub, methods): + self._stub = generic_stub + self._methods = methods - def _multi_callable(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - return behavior(group, name) + def _multi_callable(self, group, name): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, + _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) + return behavior(group, name) - def blocking(self, group, name): - return self._multi_callable(group, name) + def blocking(self, group, name): + return self._multi_callable(group, name) - def future(self, group, name): - method_cardinality = self._methods[group, name].cardinality() - behavior = getattr( - self._stub, - _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) - if method_cardinality in ( - cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return behavior(group, name).future - else: - return behavior(group, name) + def future(self, group, name): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, + _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) + if method_cardinality in (cardinality.Cardinality.UNARY_UNARY, + cardinality.Cardinality.STREAM_UNARY): + return behavior(group, name).future + else: + return behavior(group, name) - def event(self, group, name): - return self._multi_callable(group, name).event + def event(self, group, name): + return self._multi_callable(group, name).event class _MultiCallableInvokerConstructor(InvokerConstructor): - def name(self): - return 'MultiCallableInvoker' + def name(self): + return 'MultiCallableInvoker' - def construct_invoker(self, generic_stub, dynamic_stub, methods): - return _MultiCallableInvoker(generic_stub, methods) + def construct_invoker(self, generic_stub, dynamic_stub, methods): + return _MultiCallableInvoker(generic_stub, methods) class _DynamicInvoker(Invoker): - def __init__(self, dynamic_stubs, methods): - self._stubs = dynamic_stubs - self._methods = methods + def __init__(self, dynamic_stubs, methods): + self._stubs = dynamic_stubs + self._methods = methods - def blocking(self, group, name): - return getattr(self._stubs[group], name) + def blocking(self, group, name): + return getattr(self._stubs[group], name) - def future(self, group, name): - if self._methods[group, name].cardinality() in ( - cardinality.Cardinality.UNARY_UNARY, - cardinality.Cardinality.STREAM_UNARY): - return getattr(self._stubs[group], name).future - else: - return getattr(self._stubs[group], name) + def future(self, group, name): + if self._methods[group, name].cardinality() in ( + cardinality.Cardinality.UNARY_UNARY, + cardinality.Cardinality.STREAM_UNARY): + return getattr(self._stubs[group], name).future + else: + return getattr(self._stubs[group], name) - def event(self, group, name): - return getattr(self._stubs[group], name).event + def event(self, group, name): + return getattr(self._stubs[group], name).event class _DynamicInvokerConstructor(InvokerConstructor): - def name(self): - return 'DynamicInvoker' + def name(self): + return 'DynamicInvoker' - def construct_invoker(self, generic_stub, dynamic_stubs, methods): - return _DynamicInvoker(dynamic_stubs, methods) + def construct_invoker(self, generic_stub, dynamic_stubs, methods): + return _DynamicInvoker(dynamic_stubs, methods) def invoker_constructors(): - """Creates a sequence of InvokerConstructors to use in tests of RPCs. + """Creates a sequence of InvokerConstructors to use in tests of RPCs. Returns: A sequence of InvokerConstructors. """ - return ( - _GenericInvokerConstructor(), - _MultiCallableInvokerConstructor(), - _DynamicInvokerConstructor(), - ) + return ( + _GenericInvokerConstructor(), + _MultiCallableInvokerConstructor(), + _DynamicInvokerConstructor(),) diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py index f13dff0558..f14ac6a987 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Private interfaces implemented by data sets used in Face-layer tests.""" import abc @@ -38,12 +37,13 @@ from grpc.framework.interfaces.face import face # pylint: disable=unused-import from tests.unit.framework.interfaces.face import test_interfaces -class UnaryUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-unary method.""" +class UnaryUnaryTestMethodImplementation( + six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): + """A controllable implementation of a unary-unary method.""" - @abc.abstractmethod - def service(self, request, response_callback, context, control): - """Services an RPC that accepts one message and produces one message. + @abc.abstractmethod + def service(self, request, response_callback, context, control): + """Services an RPC that accepts one message and produces one message. Args: request: The single request message for the RPC. @@ -56,15 +56,15 @@ class UnaryUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_in abandonment.Abandoned: May or may not be raised when the RPC has been aborted. """ - raise NotImplementedError() + raise NotImplementedError() class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-unary-response message pairings.""" + """A type for unary-request-unary-response message pairings.""" - @abc.abstractmethod - def request(self): - """Affords a request message. + @abc.abstractmethod + def request(self): + """Affords a request message. Implementations of this method should return a different message with each call so that multiple test executions of the test method may be made with @@ -73,11 +73,11 @@ class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): Returns: A request message. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def verify(self, request, response, test_case): - """Verifies that the computed response matches the given request. + @abc.abstractmethod + def verify(self, request, response, test_case): + """Verifies that the computed response matches the given request. Args: request: A request message. @@ -88,15 +88,16 @@ class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): AssertionError: If the request and response do not match, indicating that there was some problem executing the RPC under test. """ - raise NotImplementedError() + raise NotImplementedError() -class UnaryStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a unary-stream method.""" +class UnaryStreamTestMethodImplementation( + six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): + """A controllable implementation of a unary-stream method.""" - @abc.abstractmethod - def service(self, request, response_consumer, context, control): - """Services an RPC that takes one message and produces a stream of messages. + @abc.abstractmethod + def service(self, request, response_consumer, context, control): + """Services an RPC that takes one message and produces a stream of messages. Args: request: The single request message for the RPC. @@ -109,15 +110,15 @@ class UnaryStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_i abandonment.Abandoned: May or may not be raised when the RPC has been aborted. """ - raise NotImplementedError() + raise NotImplementedError() class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for unary-request-stream-response message pairings.""" + """A type for unary-request-stream-response message pairings.""" - @abc.abstractmethod - def request(self): - """Affords a request message. + @abc.abstractmethod + def request(self): + """Affords a request message. Implementations of this method should return a different message with each call so that multiple test executions of the test method may be made with @@ -126,11 +127,11 @@ class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)): Returns: A request message. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def verify(self, request, responses, test_case): - """Verifies that the computed responses match the given request. + @abc.abstractmethod + def verify(self, request, responses, test_case): + """Verifies that the computed responses match the given request. Args: request: A request message. @@ -141,15 +142,16 @@ class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)): AssertionError: If the request and responses do not match, indicating that there was some problem executing the RPC under test. """ - raise NotImplementedError() + raise NotImplementedError() -class StreamUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-unary method.""" +class StreamUnaryTestMethodImplementation( + six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): + """A controllable implementation of a stream-unary method.""" - @abc.abstractmethod - def service(self, response_callback, context, control): - """Services an RPC that takes a stream of messages and produces one message. + @abc.abstractmethod + def service(self, response_callback, context, control): + """Services an RPC that takes a stream of messages and produces one message. Args: response_callback: A callback to be called to accept the response message @@ -169,15 +171,15 @@ class StreamUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_i abandonment.Abandoned: May or may not be raised when the RPC has been aborted. """ - raise NotImplementedError() + raise NotImplementedError() class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-unary-response message pairings.""" + """A type for stream-request-unary-response message pairings.""" - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. Implementations of this method should return a different sequences with each call so that multiple test executions of the test method may be made with @@ -186,11 +188,11 @@ class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): Returns: A sequence of request messages. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def verify(self, requests, response, test_case): - """Verifies that the computed response matches the given requests. + @abc.abstractmethod + def verify(self, requests, response, test_case): + """Verifies that the computed response matches the given requests. Args: requests: A sequence of request messages. @@ -201,15 +203,16 @@ class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)): AssertionError: If the requests and response do not match, indicating that there was some problem executing the RPC under test. """ - raise NotImplementedError() + raise NotImplementedError() -class StreamStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): - """A controllable implementation of a stream-stream method.""" +class StreamStreamTestMethodImplementation( + six.with_metaclass(abc.ABCMeta, test_interfaces.Method)): + """A controllable implementation of a stream-stream method.""" - @abc.abstractmethod - def service(self, response_consumer, context, control): - """Services an RPC that accepts and produces streams of messages. + @abc.abstractmethod + def service(self, response_consumer, context, control): + """Services an RPC that accepts and produces streams of messages. Args: response_consumer: A stream.Consumer to be called to accept the response @@ -229,15 +232,15 @@ class StreamStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_ abandonment.Abandoned: May or may not be raised when the RPC has been aborted. """ - raise NotImplementedError() + raise NotImplementedError() class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)): - """A type for stream-request-stream-response message pairings.""" + """A type for stream-request-stream-response message pairings.""" - @abc.abstractmethod - def requests(self): - """Affords a sequence of request messages. + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. Implementations of this method should return a different sequences with each call so that multiple test executions of the test method may be made with @@ -246,11 +249,11 @@ class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)): Returns: A sequence of request messages. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def verify(self, requests, responses, test_case): - """Verifies that the computed response matches the given requests. + @abc.abstractmethod + def verify(self, requests, responses, test_case): + """Verifies that the computed response matches the given requests. Args: requests: A sequence of request messages. @@ -261,15 +264,15 @@ class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)): AssertionError: If the requests and responses do not match, indicating that there was some problem executing the RPC under test. """ - raise NotImplementedError() + raise NotImplementedError() class TestService(six.with_metaclass(abc.ABCMeta)): - """A specification of implemented methods to use in tests.""" + """A specification of implemented methods to use in tests.""" - @abc.abstractmethod - def unary_unary_scenarios(self): - """Affords unary-request-unary-response test methods and their messages. + @abc.abstractmethod + def unary_unary_scenarios(self): + """Affords unary-request-unary-response test methods and their messages. Returns: A dict from method group-name pair to implementation/messages pair. The @@ -277,11 +280,11 @@ class TestService(six.with_metaclass(abc.ABCMeta)): and the second element is a sequence of UnaryUnaryTestMethodMessages objects. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def unary_stream_scenarios(self): - """Affords unary-request-stream-response test methods and their messages. + @abc.abstractmethod + def unary_stream_scenarios(self): + """Affords unary-request-stream-response test methods and their messages. Returns: A dict from method group-name pair to implementation/messages pair. The @@ -289,11 +292,11 @@ class TestService(six.with_metaclass(abc.ABCMeta)): object and the second element is a sequence of UnaryStreamTestMethodMessages objects. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def stream_unary_scenarios(self): - """Affords stream-request-unary-response test methods and their messages. + @abc.abstractmethod + def stream_unary_scenarios(self): + """Affords stream-request-unary-response test methods and their messages. Returns: A dict from method group-name pair to implementation/messages pair. The @@ -301,11 +304,11 @@ class TestService(six.with_metaclass(abc.ABCMeta)): object and the second element is a sequence of StreamUnaryTestMethodMessages objects. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def stream_stream_scenarios(self): - """Affords stream-request-stream-response test methods and their messages. + @abc.abstractmethod + def stream_stream_scenarios(self): + """Affords stream-request-stream-response test methods and their messages. Returns: A dict from method group-name pair to implementation/messages pair. The @@ -313,4 +316,4 @@ class TestService(six.with_metaclass(abc.ABCMeta)): object and the second element is a sequence of StreamStreamTestMethodMessages objects. """ - raise NotImplementedError() + raise NotImplementedError() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py index 5299655bb3..41a55c13f4 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Examples of Python implementations of the stock.proto Stock service.""" from grpc.framework.common import cardinality @@ -44,353 +43,363 @@ _price = lambda symbol_name: float(hash(symbol_name) % 4096) def _get_last_trade_price(stock_request, stock_reply_callback, control, active): - """A unary-request, unary-response test method.""" - control.control() - if active(): - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, price=_price(stock_request.symbol))) - else: - raise abandonment.Abandoned() - - -def _get_last_trade_price_multiple(stock_reply_consumer, control, active): - """A stream-request, stream-response test method.""" - def stock_reply_for_stock_request(stock_request): + """A unary-request, unary-response test method.""" control.control() if active(): - return stock_pb2.StockReply( - symbol=stock_request.symbol, price=_price(stock_request.symbol)) + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price( + stock_request.symbol))) else: - raise abandonment.Abandoned() - - class StockRequestConsumer(stream.Consumer): + raise abandonment.Abandoned() - def consume(self, stock_request): - stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request)) - def terminate(self): - control.control() - stock_reply_consumer.terminate() +def _get_last_trade_price_multiple(stock_reply_consumer, control, active): + """A stream-request, stream-response test method.""" - def consume_and_terminate(self, stock_request): - stock_reply_consumer.consume_and_terminate( - stock_reply_for_stock_request(stock_request)) + def stock_reply_for_stock_request(stock_request): + control.control() + if active(): + return stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol)) + else: + raise abandonment.Abandoned() - return StockRequestConsumer() + class StockRequestConsumer(stream.Consumer): + def consume(self, stock_request): + stock_reply_consumer.consume( + stock_reply_for_stock_request(stock_request)) -def _watch_future_trades(stock_request, stock_reply_consumer, control, active): - """A unary-request, stream-response test method.""" - base_price = _price(stock_request.symbol) - for index in range(stock_request.num_trades_to_watch): - control.control() - if active(): - stock_reply_consumer.consume( - stock_pb2.StockReply( - symbol=stock_request.symbol, price=base_price + index)) - else: - raise abandonment.Abandoned() - stock_reply_consumer.terminate() + def terminate(self): + control.control() + stock_reply_consumer.terminate() + def consume_and_terminate(self, stock_request): + stock_reply_consumer.consume_and_terminate( + stock_reply_for_stock_request(stock_request)) -def _get_highest_trade_price(stock_reply_callback, control, active): - """A stream-request, unary-response test method.""" + return StockRequestConsumer() - class StockRequestConsumer(stream.Consumer): - """Keeps an ongoing record of the most valuable symbol yet consumed.""" - def __init__(self): - self._symbol = None - self._price = None - - def consume(self, stock_request): - control.control() - if active(): - if self._price is None: - self._symbol = stock_request.symbol - self._price = _price(stock_request.symbol) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - self._symbol = stock_request.symbol - self._price = candidate_price - - def terminate(self): - control.control() - if active(): - if self._symbol is None: - raise ValueError() - else: - stock_reply_callback( - stock_pb2.StockReply(symbol=self._symbol, price=self._price)) - self._symbol = None - self._price = None - - def consume_and_terminate(self, stock_request): - control.control() - if active(): - if self._price is None: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, - price=_price(stock_request.symbol))) - else: - candidate_price = _price(stock_request.symbol) - if self._price < candidate_price: - stock_reply_callback( - stock_pb2.StockReply( - symbol=stock_request.symbol, price=candidate_price)) - else: - stock_reply_callback( +def _watch_future_trades(stock_request, stock_reply_consumer, control, active): + """A unary-request, stream-response test method.""" + base_price = _price(stock_request.symbol) + for index in range(stock_request.num_trades_to_watch): + control.control() + if active(): + stock_reply_consumer.consume( stock_pb2.StockReply( - symbol=self._symbol, price=self._price)) + symbol=stock_request.symbol, price=base_price + index)) + else: + raise abandonment.Abandoned() + stock_reply_consumer.terminate() - self._symbol = None - self._price = None - return StockRequestConsumer() +def _get_highest_trade_price(stock_reply_callback, control, active): + """A stream-request, unary-response test method.""" + + class StockRequestConsumer(stream.Consumer): + """Keeps an ongoing record of the most valuable symbol yet consumed.""" + + def __init__(self): + self._symbol = None + self._price = None + + def consume(self, stock_request): + control.control() + if active(): + if self._price is None: + self._symbol = stock_request.symbol + self._price = _price(stock_request.symbol) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + self._symbol = stock_request.symbol + self._price = candidate_price + + def terminate(self): + control.control() + if active(): + if self._symbol is None: + raise ValueError() + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + self._symbol = None + self._price = None + + def consume_and_terminate(self, stock_request): + control.control() + if active(): + if self._price is None: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=candidate_price)) + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + + self._symbol = None + self._price = None + + return StockRequestConsumer() class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation): - """GetLastTradePrice for use in tests.""" + """GetLastTradePrice for use in tests.""" - def group(self): - return _STOCK_GROUP_NAME + def group(self): + return _STOCK_GROUP_NAME - def name(self): - return 'GetLastTradePrice' + def name(self): + return 'GetLastTradePrice' - def cardinality(self): - return cardinality.Cardinality.UNARY_UNARY + def cardinality(self): + return cardinality.Cardinality.UNARY_UNARY - def request_class(self): - return stock_pb2.StockRequest + def request_class(self): + return stock_pb2.StockRequest - def response_class(self): - return stock_pb2.StockReply + def response_class(self): + return stock_pb2.StockReply - def serialize_request(self, request): - return request.SerializeToString() + def serialize_request(self, request): + return request.SerializeToString() - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) - def serialize_response(self, response): - return response.SerializeToString() + def serialize_response(self, response): + return response.SerializeToString() - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) - def service(self, request, response_callback, context, control): - _get_last_trade_price( - request, response_callback, control, context.is_active) + def service(self, request, response_callback, context, control): + _get_last_trade_price(request, response_callback, control, + context.is_active) class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages): - def __init__(self): - self._index = 0 + def __init__(self): + self._index = 0 - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest(symbol=symbol) + def request(self): + symbol = _SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest(symbol=symbol) - def verify(self, request, response, test_case): - test_case.assertEqual(request.symbol, response.symbol) - test_case.assertEqual(_price(request.symbol), response.price) + def verify(self, request, response, test_case): + test_case.assertEqual(request.symbol, response.symbol) + test_case.assertEqual(_price(request.symbol), response.price) class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation): - """GetLastTradePriceMultiple for use in tests.""" + """GetLastTradePriceMultiple for use in tests.""" - def group(self): - return _STOCK_GROUP_NAME + def group(self): + return _STOCK_GROUP_NAME - def name(self): - return 'GetLastTradePriceMultiple' + def name(self): + return 'GetLastTradePriceMultiple' - def cardinality(self): - return cardinality.Cardinality.STREAM_STREAM + def cardinality(self): + return cardinality.Cardinality.STREAM_STREAM - def request_class(self): - return stock_pb2.StockRequest + def request_class(self): + return stock_pb2.StockRequest - def response_class(self): - return stock_pb2.StockReply + def response_class(self): + return stock_pb2.StockReply - def serialize_request(self, request): - return request.SerializeToString() + def serialize_request(self, request): + return request.SerializeToString() - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) - def serialize_response(self, response): - return response.SerializeToString() + def serialize_response(self, response): + return response.SerializeToString() - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) - def service(self, response_consumer, context, control): - return _get_last_trade_price_multiple( - response_consumer, control, context.is_active) + def service(self, response_consumer, context, control): + return _get_last_trade_price_multiple(response_consumer, control, + context.is_active) class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages): - """Pairs of message streams for use with GetLastTradePriceMultiple.""" + """Pairs of message streams for use with GetLastTradePriceMultiple.""" - def __init__(self): - self._index = 0 + def __init__(self): + self._index = 0 - def requests(self): - base_index = self._index - self._index += 1 - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) - for index in range(test_constants.STREAM_LENGTH)] + def requests(self): + base_index = self._index + self._index += 1 + return [ + stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) + for index in range(test_constants.STREAM_LENGTH) + ] - def verify(self, requests, responses, test_case): - test_case.assertEqual(len(requests), len(responses)) - for stock_request, stock_reply in zip(requests, responses): - test_case.assertEqual(stock_request.symbol, stock_reply.symbol) - test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) + def verify(self, requests, responses, test_case): + test_case.assertEqual(len(requests), len(responses)) + for stock_request, stock_reply in zip(requests, responses): + test_case.assertEqual(stock_request.symbol, stock_reply.symbol) + test_case.assertEqual( + _price(stock_request.symbol), stock_reply.price) class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation): - """WatchFutureTrades for use in tests.""" + """WatchFutureTrades for use in tests.""" - def group(self): - return _STOCK_GROUP_NAME + def group(self): + return _STOCK_GROUP_NAME - def name(self): - return 'WatchFutureTrades' + def name(self): + return 'WatchFutureTrades' - def cardinality(self): - return cardinality.Cardinality.UNARY_STREAM + def cardinality(self): + return cardinality.Cardinality.UNARY_STREAM - def request_class(self): - return stock_pb2.StockRequest + def request_class(self): + return stock_pb2.StockRequest - def response_class(self): - return stock_pb2.StockReply + def response_class(self): + return stock_pb2.StockReply - def serialize_request(self, request): - return request.SerializeToString() + def serialize_request(self, request): + return request.SerializeToString() - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) - def serialize_response(self, response): - return response.SerializeToString() + def serialize_response(self, response): + return response.SerializeToString() - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) - def service(self, request, response_consumer, context, control): - _watch_future_trades(request, response_consumer, control, context.is_active) + def service(self, request, response_consumer, context, control): + _watch_future_trades(request, response_consumer, control, + context.is_active) class WatchFutureTradesMessages(_service.UnaryStreamTestMessages): - """Pairs of a single request message and a sequence of response messages.""" + """Pairs of a single request message and a sequence of response messages.""" - def __init__(self): - self._index = 0 + def __init__(self): + self._index = 0 - def request(self): - symbol = _SYMBOL_FORMAT % self._index - self._index += 1 - return stock_pb2.StockRequest( - symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) + def request(self): + symbol = _SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest( + symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) - def verify(self, request, responses, test_case): - test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) - base_price = _price(request.symbol) - for index, response in enumerate(responses): - test_case.assertEqual(base_price + index, response.price) + def verify(self, request, responses, test_case): + test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) + base_price = _price(request.symbol) + for index, response in enumerate(responses): + test_case.assertEqual(base_price + index, response.price) class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation): - """GetHighestTradePrice for use in tests.""" + """GetHighestTradePrice for use in tests.""" - def group(self): - return _STOCK_GROUP_NAME + def group(self): + return _STOCK_GROUP_NAME - def name(self): - return 'GetHighestTradePrice' + def name(self): + return 'GetHighestTradePrice' - def cardinality(self): - return cardinality.Cardinality.STREAM_UNARY + def cardinality(self): + return cardinality.Cardinality.STREAM_UNARY - def request_class(self): - return stock_pb2.StockRequest + def request_class(self): + return stock_pb2.StockRequest - def response_class(self): - return stock_pb2.StockReply + def response_class(self): + return stock_pb2.StockReply - def serialize_request(self, request): - return request.SerializeToString() + def serialize_request(self, request): + return request.SerializeToString() - def deserialize_request(self, serialized_request): - return stock_pb2.StockRequest.FromString(serialized_request) + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) - def serialize_response(self, response): - return response.SerializeToString() + def serialize_response(self, response): + return response.SerializeToString() - def deserialize_response(self, serialized_response): - return stock_pb2.StockReply.FromString(serialized_response) + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) - def service(self, response_callback, context, control): - return _get_highest_trade_price( - response_callback, control, context.is_active) + def service(self, response_callback, context, control): + return _get_highest_trade_price(response_callback, control, + context.is_active) class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages): - def requests(self): - return [ - stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) - for index in range(test_constants.STREAM_LENGTH)] - - def verify(self, requests, response, test_case): - price = None - symbol = None - for stock_request in requests: - current_symbol = stock_request.symbol - current_price = _price(current_symbol) - if price is None or price < current_price: - price = current_price - symbol = current_symbol - test_case.assertEqual(price, response.price) - test_case.assertEqual(symbol, response.symbol) + def requests(self): + return [ + stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) + for index in range(test_constants.STREAM_LENGTH) + ] + + def verify(self, requests, response, test_case): + price = None + symbol = None + for stock_request in requests: + current_symbol = stock_request.symbol + current_price = _price(current_symbol) + if price is None or price < current_price: + price = current_price + symbol = current_symbol + test_case.assertEqual(price, response.price) + test_case.assertEqual(symbol, response.symbol) class StockTestService(_service.TestService): - """A corpus of test data with one method of each RPC cardinality.""" - - def unary_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePrice'): ( - GetLastTradePrice(), [GetLastTradePriceMessages()]), - } - - def unary_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'WatchFutureTrades'): ( - WatchFutureTrades(), [WatchFutureTradesMessages()]), - } - - def stream_unary_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): ( - GetHighestTradePrice(), [GetHighestTradePriceMessages()]) - } - - def stream_stream_scenarios(self): - return { - (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): ( - GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]), - } + """A corpus of test data with one method of each RPC cardinality.""" + + def unary_unary_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetLastTradePrice'): + (GetLastTradePrice(), [GetLastTradePriceMessages()]), + } + + def unary_stream_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'WatchFutureTrades'): + (WatchFutureTrades(), [WatchFutureTradesMessages()]), + } + + def stream_unary_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): + (GetHighestTradePrice(), [GetHighestTradePriceMessages()]) + } + + def stream_stream_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): + (GetLastTradePriceMultiple(), + [GetLastTradePriceMultipleMessages()]), + } STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py index 71de9d835e..d84e1fc136 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Tools for creating tests of implementations of the Face layer.""" # unittest is referenced from specification in this module. @@ -40,12 +39,11 @@ from tests.unit.framework.interfaces.face import test_interfaces # pylint: disa _TEST_CASE_SUPERCLASSES = ( _blocking_invocation_inline_service.TestCase, - _future_invocation_asynchronous_event_service.TestCase, -) + _future_invocation_asynchronous_event_service.TestCase,) def test_cases(implementation): - """Creates unittest.TestCase classes for a given Face layer implementation. + """Creates unittest.TestCase classes for a given Face layer implementation. Args: implementation: A test_interfaces.Implementation specifying creation and @@ -55,13 +53,14 @@ def test_cases(implementation): A sequence of subclasses of unittest.TestCase defining tests of the specified Face layer implementation. """ - test_case_classes = [] - for invoker_constructor in _invocation.invoker_constructors(): - for super_class in _TEST_CASE_SUPERCLASSES: - test_case_classes.append( - type(invoker_constructor.name() + super_class.NAME, (super_class,), - {'implementation': implementation, - 'invoker_constructor': invoker_constructor, - '__module__': implementation.__module__, - })) - return test_case_classes + test_case_classes = [] + for invoker_constructor in _invocation.invoker_constructors(): + for super_class in _TEST_CASE_SUPERCLASSES: + test_case_classes.append( + type(invoker_constructor.name() + super_class.NAME, ( + super_class,), { + 'implementation': implementation, + 'invoker_constructor': invoker_constructor, + '__module__': implementation.__module__, + })) + return test_case_classes diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py index 40f38e68ba..a789d435b4 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py @@ -26,7 +26,6 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - """Interfaces used in tests of implementations of the Face layer.""" import abc @@ -38,103 +37,102 @@ from grpc.framework.interfaces.face import face # pylint: disable=unused-import class Method(six.with_metaclass(abc.ABCMeta)): - """Specifies a method to be used in tests.""" + """Specifies a method to be used in tests.""" - @abc.abstractmethod - def group(self): - """Identify the group of the method. + @abc.abstractmethod + def group(self): + """Identify the group of the method. Returns: The group of the method. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def name(self): - """Identify the name of the method. + @abc.abstractmethod + def name(self): + """Identify the name of the method. Returns: The name of the method. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def cardinality(self): - """Identify the cardinality of the method. + @abc.abstractmethod + def cardinality(self): + """Identify the cardinality of the method. Returns: A cardinality.Cardinality value describing the streaming semantics of the method. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def request_class(self): - """Identify the class used for the method's request objects. + @abc.abstractmethod + def request_class(self): + """Identify the class used for the method's request objects. Returns: The class object of the class to which the method's request objects belong. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def response_class(self): - """Identify the class used for the method's response objects. + @abc.abstractmethod + def response_class(self): + """Identify the class used for the method's response objects. Returns: The class object of the class to which the method's response objects belong. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def serialize_request(self, request): - """Serialize the given request object. + @abc.abstractmethod + def serialize_request(self, request): + """Serialize the given request object. Args: request: A request object appropriate for this method. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def deserialize_request(self, serialized_request): - """Synthesize a request object from a given bytestring. + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Synthesize a request object from a given bytestring. Args: serialized_request: A bytestring deserializable into a request object appropriate for this method. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def serialize_response(self, response): - """Serialize the given response object. + @abc.abstractmethod + def serialize_response(self, response): + """Serialize the given response object. Args: response: A response object appropriate for this method. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def deserialize_response(self, serialized_response): - """Synthesize a response object from a given bytestring. + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Synthesize a response object from a given bytestring. Args: serialized_response: A bytestring deserializable into a response object appropriate for this method. """ - raise NotImplementedError() + raise NotImplementedError() class Implementation(six.with_metaclass(abc.ABCMeta)): - """Specifies an implementation of the Face layer.""" + """Specifies an implementation of the Face layer.""" - @abc.abstractmethod - def instantiate( - self, methods, method_implementations, - multi_method_implementation): - """Instantiates the Face layer implementation to be used in a test. + @abc.abstractmethod + def instantiate(self, methods, method_implementations, + multi_method_implementation): + """Instantiates the Face layer implementation to be used in a test. Args: methods: A sequence of Method objects describing the methods available to @@ -151,69 +149,69 @@ class Implementation(six.with_metaclass(abc.ABCMeta)): passed to destantiate at the conclusion of the test. The returned stubs must be backed by the provided implementations. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def destantiate(self, memo): - """Destroys the Face layer implementation under test. + @abc.abstractmethod + def destantiate(self, memo): + """Destroys the Face layer implementation under test. Args: memo: The object from the third position of the return value of a call to instantiate. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def invocation_metadata(self): - """Provides the metadata to be used when invoking a test RPC. + @abc.abstractmethod + def invocation_metadata(self): + """Provides the metadata to be used when invoking a test RPC. Returns: An object to use as the supplied-at-invocation-time metadata in a test RPC. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def initial_metadata(self): - """Provides the metadata for use as a test RPC's first servicer metadata. + @abc.abstractmethod + def initial_metadata(self): + """Provides the metadata for use as a test RPC's first servicer metadata. Returns: An object to use as the from-the-servicer-before-responses metadata in a test RPC. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def terminal_metadata(self): - """Provides the metadata for use as a test RPC's second servicer metadata. + @abc.abstractmethod + def terminal_metadata(self): + """Provides the metadata for use as a test RPC's second servicer metadata. Returns: An object to use as the from-the-servicer-after-all-responses metadata in a test RPC. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def code(self): - """Provides the value for use as a test RPC's code. + @abc.abstractmethod + def code(self): + """Provides the value for use as a test RPC's code. Returns: An object to use as the from-the-servicer code in a test RPC. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def details(self): - """Provides the value for use as a test RPC's details. + @abc.abstractmethod + def details(self): + """Provides the value for use as a test RPC's details. Returns: An object to use as the from-the-servicer details in a test RPC. """ - raise NotImplementedError() + raise NotImplementedError() - @abc.abstractmethod - def metadata_transmitted(self, original_metadata, transmitted_metadata): - """Identifies whether or not metadata was properly transmitted. + @abc.abstractmethod + def metadata_transmitted(self, original_metadata, transmitted_metadata): + """Identifies whether or not metadata was properly transmitted. Args: original_metadata: A metadata value passed to the Face interface @@ -226,4 +224,4 @@ class Implementation(six.with_metaclass(abc.ABCMeta)): Whether or not the metadata was properly transmitted by the Face interface implementation under test. """ - raise NotImplementedError() + raise NotImplementedError() |