diff options
author | Nathaniel Manista <nathaniel@google.com> | 2016-01-27 18:50:40 +0000 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2016-01-27 18:50:40 +0000 |
commit | c1739ea8428b18943dd525ed877ff63b937907b6 (patch) | |
tree | b360ac0e3da56fee8715be33addbf072204589b1 /src/python/grpcio/tests/unit | |
parent | 6e1dd9ad065ce3b1c6635ad6d5b809730331289f (diff) |
Implement three missing face test methods
Diffstat (limited to 'src/python/grpcio/tests/unit')
2 files changed, 68 insertions, 7 deletions
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index 3bcefa601d..c8a3a1bc74 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -30,9 +30,12 @@ """Test code for the Face layer of RPC Framework.""" import abc +import itertools import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. +from grpc.framework.foundation import logging_pool from grpc.framework.interfaces.face import face from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() for response_future in response_futures] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + pool.shutdown(wait=True) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures_to_indices[response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) @unittest.skip('Cancellation impossible with blocking control flow!') def testCancelledUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index 00295ad5cc..1d36a931e8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -31,8 +31,10 @@ import abc import contextlib +import itertools import threading import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. from grpc.framework.foundation import logging_pool @@ -254,9 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for request, response in zip(requests, responses): test_messages.verify(request, response, self) - @unittest.skip('TODO(nathaniel): implement.') def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + inner_response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + outer_response_future = pool.submit(inner_response_future.result) + requests.append(request) + response_futures_to_indices[outer_response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) def testCancelledUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( |