diff options
author | Vijay Pai <vpai@google.com> | 2016-01-27 13:46:00 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2016-01-27 13:46:00 -0800 |
commit | 7a5e89570c5ae242ec3e841fe1061f82e4ed5dad (patch) | |
tree | a5a7e7ffc844fbc16bf1210cceeaf3ec1bc57d75 /src/python/grpcio/tests/unit/framework/interfaces | |
parent | 1d71a0e084122599d760f550b6f97bb201f771e6 (diff) | |
parent | 26e4f5b1b6478809edc576646d3b4cd24562d549 (diff) |
Merge branch 'master' into dynamic_sizing2
Diffstat (limited to 'src/python/grpcio/tests/unit/framework/interfaces')
3 files changed, 86 insertions, 25 deletions
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index 3bcefa601d..c8a3a1bc74 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -30,9 +30,12 @@ """Test code for the Face layer of RPC Framework.""" import abc +import itertools import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. +from grpc.framework.foundation import logging_pool from grpc.framework.interfaces.face import face from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() for response_future in response_futures] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + pool.shutdown(wait=True) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures_to_indices[response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) @unittest.skip('Cancellation impossible with blocking control flow!') def testCancelledUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index fc8daa992f..1d36a931e8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -31,8 +31,10 @@ import abc import contextlib +import itertools import threading import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. from grpc.framework.foundation import logging_pool @@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: @@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for request, response in zip(requests, responses): test_messages.verify(request, response, self) - def testParallelInvocations(self): + def testWaitingForSomeButNotAllParallelInvocations(self): + pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - @unittest.skip('TODO(nathaniel): implement.') - def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + inner_response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + outer_response_future = pool.submit(inner_response_future.result) + requests.append(request) + response_futures_to_indices[outer_response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) def testCancelledUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py index 2e444ff09d..42a7f4e3b8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -76,7 +76,7 @@ class Receiver(face.ResponseReceiver): def unary_response(self): with self._condition: if self._abortion is not None: - raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + raise AssertionError('Aborted: "{}"!'.format(self._abortion)) elif len(self._responses) != 1: raise AssertionError( '%d responses received, not exactly one!', len(self._responses)) @@ -88,7 +88,7 @@ class Receiver(face.ResponseReceiver): if self._abortion is None: return list(self._responses) else: - raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + raise AssertionError('Aborted: "{}"!'.format(self._abortion)) def abortion(self): with self._condition: |