diff options
author | 2016-01-27 13:46:00 -0800 | |
---|---|---|
committer | 2016-01-27 13:46:00 -0800 | |
commit | 7a5e89570c5ae242ec3e841fe1061f82e4ed5dad (patch) | |
tree | a5a7e7ffc844fbc16bf1210cceeaf3ec1bc57d75 /src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py | |
parent | 1d71a0e084122599d760f550b6f97bb201f771e6 (diff) | |
parent | 26e4f5b1b6478809edc576646d3b4cd24562d549 (diff) |
Merge branch 'master' into dynamic_sizing2
Diffstat (limited to 'src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py')
-rw-r--r-- | src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py | 55 |
1 files changed, 38 insertions, 17 deletions
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index fc8daa992f..1d36a931e8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -31,8 +31,10 @@ import abc import contextlib +import itertools import threading import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. from grpc.framework.foundation import logging_pool @@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: @@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for request, response in zip(requests, responses): test_messages.verify(request, response, self) - def testParallelInvocations(self): + def testWaitingForSomeButNotAllParallelInvocations(self): + pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - @unittest.skip('TODO(nathaniel): implement.') - def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + inner_response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + outer_response_future = pool.submit(inner_response_future.result) + requests.append(request) + response_futures_to_indices[outer_response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) def testCancelledUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( |