aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/unit/framework/interfaces
diff options
context:
space:
mode:
authorGravatar Vijay Pai <vpai@google.com>2016-01-27 13:46:00 -0800
committerGravatar Vijay Pai <vpai@google.com>2016-01-27 13:46:00 -0800
commit7a5e89570c5ae242ec3e841fe1061f82e4ed5dad (patch)
treea5a7e7ffc844fbc16bf1210cceeaf3ec1bc57d75 /src/python/grpcio/tests/unit/framework/interfaces
parent1d71a0e084122599d760f550b6f97bb201f771e6 (diff)
parent26e4f5b1b6478809edc576646d3b4cd24562d549 (diff)
Merge branch 'master' into dynamic_sizing2
Diffstat (limited to 'src/python/grpcio/tests/unit/framework/interfaces')
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py50
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py55
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py6
3 files changed, 86 insertions, 25 deletions
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 3bcefa601d..c8a3a1bc74 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -30,9 +30,12 @@
"""Test code for the Face layer of RPC Framework."""
import abc
+import itertools
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
test_messages.verify(second_request, second_response, self)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result() for response_future in response_futures]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+ pool.shutdown(wait=True)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures_to_indices[response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledUnaryRequestUnaryResponse(self):
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index fc8daa992f..1d36a931e8 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -31,8 +31,10 @@
import abc
import contextlib
+import itertools
import threading
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
from grpc.framework.foundation import logging_pool
@@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
test_messages.verify(second_request, second_response, self)
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
@@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for request, response in zip(requests, responses):
test_messages.verify(request, response, self)
- def testParallelInvocations(self):
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.PARALLELISM)
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ inner_response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ outer_response_future = pool.submit(inner_response_future.result)
+ requests.append(request)
+ response_futures_to_indices[outer_response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
def testCancelledUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
index 2e444ff09d..42a7f4e3b8 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -76,7 +76,7 @@ class Receiver(face.ResponseReceiver):
def unary_response(self):
with self._condition:
if self._abortion is not None:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
elif len(self._responses) != 1:
raise AssertionError(
'%d responses received, not exactly one!', len(self._responses))
@@ -88,7 +88,7 @@ class Receiver(face.ResponseReceiver):
if self._abortion is None:
return list(self._responses)
else:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
def abortion(self):
with self._condition: