aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio_tests/tests/unit/framework/interfaces
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio_tests/tests/unit/framework/interfaces')
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py1
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py2
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py465
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py550
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py876
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py187
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py159
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py519
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py27
-rw-r--r--src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py144
11 files changed, 1477 insertions, 1455 deletions
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
index 7086519106..b89398809f 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/__init__.py
@@ -26,5 +26,3 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
index 1ea356c0bf..2aec25c9ef 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_3069_test_constant.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""A test constant working around issue 3069."""
# test_constants is referenced from specification in this module.
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
index 7086519106..b89398809f 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/__init__.py
@@ -26,5 +26,3 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index e338aaa396..a79834f96f 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Test code for the Face layer of RPC Framework."""
from __future__ import division
@@ -50,246 +49,254 @@ from tests.unit.framework.interfaces.face import _stock_service
from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import
-class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)):
- """A test of the Face layer of RPC Framework.
+class TestCase(
+ six.with_metaclass(abc.ABCMeta, test_coverage.Coverage,
+ unittest.TestCase)):
+ """A test of the Face layer of RPC Framework.
Concrete subclasses must have an "implementation" attribute of type
test_interfaces.Implementation and an "invoker_constructor" attribute of type
_invocation.InvokerConstructor.
"""
- NAME = 'BlockingInvocationInlineServiceTest'
+ NAME = 'BlockingInvocationInlineServiceTest'
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
- self._control = test_control.PauseFailControl()
- self._digest = _digest.digest(
- _stock_service.STOCK_TEST_SERVICE, self._control, None)
+ self._control = test_control.PauseFailControl()
+ self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE,
+ self._control, None)
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.inline_method_implementations, None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.inline_method_implementations,
+ None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
- self._invoker = None
- self.implementation.destantiate(self._memo)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response, call = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT, with_call=True)
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- response, call = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- response_iterator = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response = self._invoker.blocking(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
-
- test_messages.verify(first_request, first_response, self)
-
- second_response = self._invoker.blocking(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
-
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures = []
- for _ in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = pool.submit(
- self._invoker.blocking(group, method), request,
- test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures.append(response_future)
-
- responses = [
- response_future.result() for response_future in response_futures]
-
- for request, response in zip(requests, responses):
- test_messages.verify(request, response, self)
- pool.shutdown(wait=True)
-
- def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures_to_indices = {}
- for index in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = pool.submit(
- self._invoker.blocking(group, method), request,
- test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures_to_indices[response_future] = index
-
- some_completed_response_futures_iterator = itertools.islice(
- futures.as_completed(response_futures_to_indices),
- test_constants.THREAD_CONCURRENCY // 2)
- for response_future in some_completed_response_futures_iterator:
- index = response_futures_to_indices[response_future]
- test_messages.verify(requests[index], response_future.result(), self)
- pool.shutdown(wait=True)
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledUnaryRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledUnaryRequestStreamResponse(self):
- raise NotImplementedError()
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledStreamRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @unittest.skip('Cancellation impossible with blocking control flow!')
- def testCancelledStreamRequestStreamResponse(self):
- raise NotImplementedError()
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- self._invoker.blocking(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- response_iterator = self._invoker.blocking(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- self._invoker.blocking(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause(), self.assertRaises(
- face.ExpirationError):
- response_iterator = self._invoker.blocking(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- response_iterator = self._invoker.blocking(group, method)(
- request, test_constants.LONG_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.fail(), self.assertRaises(face.RemoteError):
- response_iterator = self._invoker.blocking(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- list(response_iterator)
+ self._invoker = None
+ self.implementation.destantiate(self._memo)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response, call = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT, with_call=True)
+
+ test_messages.verify(request, response, self)
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response, call = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT, with_call=True)
+
+ test_messages.verify(requests, response, self)
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response = self._invoker.blocking(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response = self._invoker.blocking(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result()
+ for response_future in response_futures
+ ]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+ pool.shutdown(wait=True)
+
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures_to_indices[response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.THREAD_CONCURRENCY // 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index],
+ response_future.result(), self)
+ pool.shutdown(wait=True)
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledUnaryRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledUnaryRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledStreamRequestUnaryResponse(self):
+ raise NotImplementedError()
+
+ @unittest.skip('Cancellation impossible with blocking control flow!')
+ def testCancelledStreamRequestStreamResponse(self):
+ raise NotImplementedError()
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ self._invoker.blocking(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.blocking(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ self._invoker.blocking(group, method)(
+ iter(requests),
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests),
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ response_iterator = self._invoker.blocking(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.fail(), self.assertRaises(face.RemoteError):
+ response_iterator = self._invoker.blocking(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
index f0befb0b27..0411da0a66 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_digest.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Code for making a service.TestService more amenable to use in tests."""
import collections
@@ -49,17 +48,16 @@ _IDENTITY = lambda x: x
class TestServiceDigest(
- collections.namedtuple(
- 'TestServiceDigest',
- ('methods',
- 'inline_method_implementations',
- 'event_method_implementations',
- 'multi_method_implementation',
- 'unary_unary_messages_sequences',
- 'unary_stream_messages_sequences',
- 'stream_unary_messages_sequences',
- 'stream_stream_messages_sequences',))):
- """A transformation of a service.TestService.
+ collections.namedtuple('TestServiceDigest', (
+ 'methods',
+ 'inline_method_implementations',
+ 'event_method_implementations',
+ 'multi_method_implementation',
+ 'unary_unary_messages_sequences',
+ 'unary_stream_messages_sequences',
+ 'stream_unary_messages_sequences',
+ 'stream_stream_messages_sequences',))):
+ """A transformation of a service.TestService.
Attributes:
methods: A dict from method group-name pair to test_interfaces.Method object
@@ -88,303 +86,308 @@ class TestServiceDigest(
class _BufferingConsumer(stream.Consumer):
- """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
+ """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
- def __init__(self):
- self.consumed = []
- self.terminated = False
+ def __init__(self):
+ self.consumed = []
+ self.terminated = False
- def consume(self, value):
- self.consumed.append(value)
+ def consume(self, value):
+ self.consumed.append(value)
- def terminate(self):
- self.terminated = True
+ def terminate(self):
+ self.terminated = True
- def consume_and_terminate(self, value):
- self.consumed.append(value)
- self.terminated = True
+ def consume_and_terminate(self, value):
+ self.consumed.append(value)
+ self.terminated = True
class _InlineUnaryUnaryMethod(face.MethodImplementation):
- def __init__(self, unary_unary_test_method, control):
- self._test_method = unary_unary_test_method
- self._control = control
+ def __init__(self, unary_unary_test_method, control):
+ self._test_method = unary_unary_test_method
+ self._control = control
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.INLINE
+ self.cardinality = cardinality.Cardinality.UNARY_UNARY
+ self.style = style.Service.INLINE
- def unary_unary_inline(self, request, context):
- response_list = []
- self._test_method.service(
- request, response_list.append, context, self._control)
- return response_list.pop(0)
+ def unary_unary_inline(self, request, context):
+ response_list = []
+ self._test_method.service(request, response_list.append, context,
+ self._control)
+ return response_list.pop(0)
class _EventUnaryUnaryMethod(face.MethodImplementation):
- def __init__(self, unary_unary_test_method, control, pool):
- self._test_method = unary_unary_test_method
- self._control = control
- self._pool = pool
+ def __init__(self, unary_unary_test_method, control, pool):
+ self._test_method = unary_unary_test_method
+ self._control = control
+ self._pool = pool
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.EVENT
+ self.cardinality = cardinality.Cardinality.UNARY_UNARY
+ self.style = style.Service.EVENT
- def unary_unary_event(self, request, response_callback, context):
- if self._pool is None:
- self._test_method.service(
- request, response_callback, context, self._control)
- else:
- self._pool.submit(
- self._test_method.service, request, response_callback, context,
- self._control)
+ def unary_unary_event(self, request, response_callback, context):
+ if self._pool is None:
+ self._test_method.service(request, response_callback, context,
+ self._control)
+ else:
+ self._pool.submit(self._test_method.service, request,
+ response_callback, context, self._control)
class _InlineUnaryStreamMethod(face.MethodImplementation):
- def __init__(self, unary_stream_test_method, control):
- self._test_method = unary_stream_test_method
- self._control = control
+ def __init__(self, unary_stream_test_method, control):
+ self._test_method = unary_stream_test_method
+ self._control = control
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.INLINE
+ self.cardinality = cardinality.Cardinality.UNARY_STREAM
+ self.style = style.Service.INLINE
- def unary_stream_inline(self, request, context):
- response_consumer = _BufferingConsumer()
- self._test_method.service(
- request, response_consumer, context, self._control)
- for response in response_consumer.consumed:
- yield response
+ def unary_stream_inline(self, request, context):
+ response_consumer = _BufferingConsumer()
+ self._test_method.service(request, response_consumer, context,
+ self._control)
+ for response in response_consumer.consumed:
+ yield response
class _EventUnaryStreamMethod(face.MethodImplementation):
- def __init__(self, unary_stream_test_method, control, pool):
- self._test_method = unary_stream_test_method
- self._control = control
- self._pool = pool
+ def __init__(self, unary_stream_test_method, control, pool):
+ self._test_method = unary_stream_test_method
+ self._control = control
+ self._pool = pool
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.EVENT
+ self.cardinality = cardinality.Cardinality.UNARY_STREAM
+ self.style = style.Service.EVENT
- def unary_stream_event(self, request, response_consumer, context):
- if self._pool is None:
- self._test_method.service(
- request, response_consumer, context, self._control)
- else:
- self._pool.submit(
- self._test_method.service, request, response_consumer, context,
- self._control)
+ def unary_stream_event(self, request, response_consumer, context):
+ if self._pool is None:
+ self._test_method.service(request, response_consumer, context,
+ self._control)
+ else:
+ self._pool.submit(self._test_method.service, request,
+ response_consumer, context, self._control)
class _InlineStreamUnaryMethod(face.MethodImplementation):
- def __init__(self, stream_unary_test_method, control):
- self._test_method = stream_unary_test_method
- self._control = control
+ def __init__(self, stream_unary_test_method, control):
+ self._test_method = stream_unary_test_method
+ self._control = control
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.INLINE
+ self.cardinality = cardinality.Cardinality.STREAM_UNARY
+ self.style = style.Service.INLINE
- def stream_unary_inline(self, request_iterator, context):
- response_list = []
- request_consumer = self._test_method.service(
- response_list.append, context, self._control)
- for request in request_iterator:
- request_consumer.consume(request)
- request_consumer.terminate()
- return response_list.pop(0)
+ def stream_unary_inline(self, request_iterator, context):
+ response_list = []
+ request_consumer = self._test_method.service(response_list.append,
+ context, self._control)
+ for request in request_iterator:
+ request_consumer.consume(request)
+ request_consumer.terminate()
+ return response_list.pop(0)
class _EventStreamUnaryMethod(face.MethodImplementation):
- def __init__(self, stream_unary_test_method, control, pool):
- self._test_method = stream_unary_test_method
- self._control = control
- self._pool = pool
+ def __init__(self, stream_unary_test_method, control, pool):
+ self._test_method = stream_unary_test_method
+ self._control = control
+ self._pool = pool
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.EVENT
+ self.cardinality = cardinality.Cardinality.STREAM_UNARY
+ self.style = style.Service.EVENT
- def stream_unary_event(self, response_callback, context):
- request_consumer = self._test_method.service(
- response_callback, context, self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+ def stream_unary_event(self, response_callback, context):
+ request_consumer = self._test_method.service(response_callback, context,
+ self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer,
+ self._pool)
class _InlineStreamStreamMethod(face.MethodImplementation):
- def __init__(self, stream_stream_test_method, control):
- self._test_method = stream_stream_test_method
- self._control = control
+ def __init__(self, stream_stream_test_method, control):
+ self._test_method = stream_stream_test_method
+ self._control = control
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.INLINE
+ self.cardinality = cardinality.Cardinality.STREAM_STREAM
+ self.style = style.Service.INLINE
- def stream_stream_inline(self, request_iterator, context):
- response_consumer = _BufferingConsumer()
- request_consumer = self._test_method.service(
- response_consumer, context, self._control)
+ def stream_stream_inline(self, request_iterator, context):
+ response_consumer = _BufferingConsumer()
+ request_consumer = self._test_method.service(response_consumer, context,
+ self._control)
- for request in request_iterator:
- request_consumer.consume(request)
- while response_consumer.consumed:
- yield response_consumer.consumed.pop(0)
- response_consumer.terminate()
+ for request in request_iterator:
+ request_consumer.consume(request)
+ while response_consumer.consumed:
+ yield response_consumer.consumed.pop(0)
+ response_consumer.terminate()
class _EventStreamStreamMethod(face.MethodImplementation):
- def __init__(self, stream_stream_test_method, control, pool):
- self._test_method = stream_stream_test_method
- self._control = control
- self._pool = pool
+ def __init__(self, stream_stream_test_method, control, pool):
+ self._test_method = stream_stream_test_method
+ self._control = control
+ self._pool = pool
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.EVENT
+ self.cardinality = cardinality.Cardinality.STREAM_STREAM
+ self.style = style.Service.EVENT
- def stream_stream_event(self, response_consumer, context):
- request_consumer = self._test_method.service(
- response_consumer, context, self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+ def stream_stream_event(self, response_consumer, context):
+ request_consumer = self._test_method.service(response_consumer, context,
+ self._control)
+ if self._pool is None:
+ return request_consumer
+ else:
+ return stream_util.ThreadSwitchingConsumer(request_consumer,
+ self._pool)
class _UnaryConsumer(stream.Consumer):
- """A Consumer that only allows consumption of exactly one value."""
-
- def __init__(self, action):
- self._lock = threading.Lock()
- self._action = action
- self._consumed = False
- self._terminated = False
-
- def consume(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
-
- self._action(value)
-
- def terminate(self):
- with self._lock:
- if not self._consumed:
- raise ValueError('Unary consumer hasn\'t yet consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._terminated = True
-
- def consume_and_terminate(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
- self._terminated = True
-
- self._action(value)
+ """A Consumer that only allows consumption of exactly one value."""
+
+ def __init__(self, action):
+ self._lock = threading.Lock()
+ self._action = action
+ self._consumed = False
+ self._terminated = False
+
+ def consume(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+
+ self._action(value)
+
+ def terminate(self):
+ with self._lock:
+ if not self._consumed:
+ raise ValueError('Unary consumer hasn\'t yet consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._terminated = True
+
+ def consume_and_terminate(self, value):
+ with self._lock:
+ if self._consumed:
+ raise ValueError('Unary consumer already consumed!')
+ elif self._terminated:
+ raise ValueError('Unary consumer already terminated!')
+ else:
+ self._consumed = True
+ self._terminated = True
+
+ self._action(value)
class _UnaryUnaryAdaptation(object):
- def __init__(self, unary_unary_test_method):
- self._method = unary_unary_test_method
+ def __init__(self, unary_unary_test_method):
+ self._method = unary_unary_test_method
+
+ def service(self, response_consumer, context, control):
+
+ def action(request):
+ self._method.service(request,
+ response_consumer.consume_and_terminate,
+ context, control)
- def service(self, response_consumer, context, control):
- def action(request):
- self._method.service(
- request, response_consumer.consume_and_terminate, context, control)
- return _UnaryConsumer(action)
+ return _UnaryConsumer(action)
class _UnaryStreamAdaptation(object):
- def __init__(self, unary_stream_test_method):
- self._method = unary_stream_test_method
+ def __init__(self, unary_stream_test_method):
+ self._method = unary_stream_test_method
+
+ def service(self, response_consumer, context, control):
+
+ def action(request):
+ self._method.service(request, response_consumer, context, control)
- def service(self, response_consumer, context, control):
- def action(request):
- self._method.service(request, response_consumer, context, control)
- return _UnaryConsumer(action)
+ return _UnaryConsumer(action)
class _StreamUnaryAdaptation(object):
- def __init__(self, stream_unary_test_method):
- self._method = stream_unary_test_method
+ def __init__(self, stream_unary_test_method):
+ self._method = stream_unary_test_method
- def service(self, response_consumer, context, control):
- return self._method.service(
- response_consumer.consume_and_terminate, context, control)
+ def service(self, response_consumer, context, control):
+ return self._method.service(response_consumer.consume_and_terminate,
+ context, control)
class _MultiMethodImplementation(face.MultiMethodImplementation):
- def __init__(self, methods, control, pool):
- self._methods = methods
- self._control = control
- self._pool = pool
+ def __init__(self, methods, control, pool):
+ self._methods = methods
+ self._control = control
+ self._pool = pool
- def service(self, group, name, response_consumer, context):
- method = self._methods.get(group, name, None)
- if method is None:
- raise face.NoSuchMethodError(group, name)
- elif self._pool is None:
- return method(response_consumer, context, self._control)
- else:
- request_consumer = method(response_consumer, context, self._control)
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
+ def service(self, group, name, response_consumer, context):
+ method = self._methods.get(group, name, None)
+ if method is None:
+ raise face.NoSuchMethodError(group, name)
+ elif self._pool is None:
+ return method(response_consumer, context, self._control)
+ else:
+ request_consumer = method(response_consumer, context, self._control)
+ return stream_util.ThreadSwitchingConsumer(request_consumer,
+ self._pool)
class _Assembly(
- collections.namedtuple(
- '_Assembly',
- ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
- """An intermediate structure created when creating a TestServiceDigest."""
-
-
-def _assemble(
- scenarios, identifiers, inline_method_constructor, event_method_constructor,
- adapter, control, pool):
- """Creates an _Assembly from the given scenarios."""
- methods = {}
- inlines = {}
- events = {}
- adaptations = {}
- messages = {}
- for identifier, scenario in six.iteritems(scenarios):
- if identifier in identifiers:
- raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
-
- test_method = scenario[0]
- inline_method = inline_method_constructor(test_method, control)
- event_method = event_method_constructor(test_method, control, pool)
- adaptation = adapter(test_method)
-
- methods[identifier] = test_method
- inlines[identifier] = inline_method
- events[identifier] = event_method
- adaptations[identifier] = adaptation
- messages[identifier] = scenario[1]
-
- return _Assembly(methods, inlines, events, adaptations, messages)
+ collections.namedtuple(
+ '_Assembly',
+ ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
+ """An intermediate structure created when creating a TestServiceDigest."""
+
+
+def _assemble(scenarios, identifiers, inline_method_constructor,
+ event_method_constructor, adapter, control, pool):
+ """Creates an _Assembly from the given scenarios."""
+ methods = {}
+ inlines = {}
+ events = {}
+ adaptations = {}
+ messages = {}
+ for identifier, scenario in six.iteritems(scenarios):
+ if identifier in identifiers:
+ raise ValueError('Repeated identifier "(%s, %s)"!' % identifier)
+
+ test_method = scenario[0]
+ inline_method = inline_method_constructor(test_method, control)
+ event_method = event_method_constructor(test_method, control, pool)
+ adaptation = adapter(test_method)
+
+ methods[identifier] = test_method
+ inlines[identifier] = inline_method
+ events[identifier] = event_method
+ adaptations[identifier] = adaptation
+ messages[identifier] = scenario[1]
+
+ return _Assembly(methods, inlines, events, adaptations, messages)
def digest(service, control, pool):
- """Creates a TestServiceDigest from a TestService.
+ """Creates a TestServiceDigest from a TestService.
Args:
service: A _service.TestService.
@@ -396,51 +399,48 @@ def digest(service, control, pool):
Returns:
A TestServiceDigest synthesized from the given service.TestService.
"""
- identifiers = set()
-
- unary_unary = _assemble(
- service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod,
- _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
- identifiers.update(unary_unary.inlines)
-
- unary_stream = _assemble(
- service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod,
- _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
- identifiers.update(unary_stream.inlines)
-
- stream_unary = _assemble(
- service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod,
- _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
- identifiers.update(stream_unary.inlines)
-
- stream_stream = _assemble(
- service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod,
- _EventStreamStreamMethod, _IDENTITY, control, pool)
- identifiers.update(stream_stream.inlines)
-
- methods = dict(unary_unary.methods)
- methods.update(unary_stream.methods)
- methods.update(stream_unary.methods)
- methods.update(stream_stream.methods)
- adaptations = dict(unary_unary.adaptations)
- adaptations.update(unary_stream.adaptations)
- adaptations.update(stream_unary.adaptations)
- adaptations.update(stream_stream.adaptations)
- inlines = dict(unary_unary.inlines)
- inlines.update(unary_stream.inlines)
- inlines.update(stream_unary.inlines)
- inlines.update(stream_stream.inlines)
- events = dict(unary_unary.events)
- events.update(unary_stream.events)
- events.update(stream_unary.events)
- events.update(stream_stream.events)
-
- return TestServiceDigest(
- methods,
- inlines,
- events,
- _MultiMethodImplementation(adaptations, control, pool),
- unary_unary.messages,
- unary_stream.messages,
- stream_unary.messages,
- stream_stream.messages)
+ identifiers = set()
+
+ unary_unary = _assemble(service.unary_unary_scenarios(), identifiers,
+ _InlineUnaryUnaryMethod, _EventUnaryUnaryMethod,
+ _UnaryUnaryAdaptation, control, pool)
+ identifiers.update(unary_unary.inlines)
+
+ unary_stream = _assemble(service.unary_stream_scenarios(), identifiers,
+ _InlineUnaryStreamMethod, _EventUnaryStreamMethod,
+ _UnaryStreamAdaptation, control, pool)
+ identifiers.update(unary_stream.inlines)
+
+ stream_unary = _assemble(service.stream_unary_scenarios(), identifiers,
+ _InlineStreamUnaryMethod, _EventStreamUnaryMethod,
+ _StreamUnaryAdaptation, control, pool)
+ identifiers.update(stream_unary.inlines)
+
+ stream_stream = _assemble(service.stream_stream_scenarios(), identifiers,
+ _InlineStreamStreamMethod,
+ _EventStreamStreamMethod, _IDENTITY, control,
+ pool)
+ identifiers.update(stream_stream.inlines)
+
+ methods = dict(unary_unary.methods)
+ methods.update(unary_stream.methods)
+ methods.update(stream_unary.methods)
+ methods.update(stream_stream.methods)
+ adaptations = dict(unary_unary.adaptations)
+ adaptations.update(unary_stream.adaptations)
+ adaptations.update(stream_unary.adaptations)
+ adaptations.update(stream_stream.adaptations)
+ inlines = dict(unary_unary.inlines)
+ inlines.update(unary_stream.inlines)
+ inlines.update(stream_unary.inlines)
+ inlines.update(stream_stream.inlines)
+ events = dict(unary_unary.events)
+ events.update(unary_stream.events)
+ events.update(stream_unary.events)
+ events.update(stream_stream.events)
+
+ return TestServiceDigest(
+ methods, inlines, events,
+ _MultiMethodImplementation(adaptations, control, pool),
+ unary_unary.messages, unary_stream.messages, stream_unary.messages,
+ stream_stream.messages)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index df620b19ba..703eef3a82 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Test code for the Face layer of RPC Framework."""
from __future__ import division
@@ -55,457 +54,470 @@ from tests.unit.framework.interfaces.face import test_interfaces # pylint: disa
class _PauseableIterator(object):
- def __init__(self, upstream):
- self._upstream = upstream
- self._condition = threading.Condition()
- self._paused = False
+ def __init__(self, upstream):
+ self._upstream = upstream
+ self._condition = threading.Condition()
+ self._paused = False
- @contextlib.contextmanager
- def pause(self):
- with self._condition:
- self._paused = True
- yield
- with self._condition:
- self._paused = False
- self._condition.notify_all()
+ @contextlib.contextmanager
+ def pause(self):
+ with self._condition:
+ self._paused = True
+ yield
+ with self._condition:
+ self._paused = False
+ self._condition.notify_all()
- def __iter__(self):
- return self
+ def __iter__(self):
+ return self
- def __next__(self):
- return self.next()
+ def __next__(self):
+ return self.next()
- def next(self):
- with self._condition:
- while self._paused:
- self._condition.wait()
- return next(self._upstream)
+ def next(self):
+ with self._condition:
+ while self._paused:
+ self._condition.wait()
+ return next(self._upstream)
class _Callback(object):
- def __init__(self):
- self._condition = threading.Condition()
- self._called = False
- self._passed_future = None
- self._passed_other_stuff = None
-
- def __call__(self, *args, **kwargs):
- with self._condition:
- self._called = True
- if args:
- self._passed_future = args[0]
- if 1 < len(args) or kwargs:
- self._passed_other_stuff = tuple(args[1:]), dict(kwargs)
- self._condition.notify_all()
-
- def future(self):
- with self._condition:
- while True:
- if self._passed_other_stuff is not None:
- raise ValueError(
- 'Test callback passed unexpected values: %s',
- self._passed_other_stuff)
- elif self._called:
- return self._passed_future
- else:
- self._condition.wait()
-
-
-class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.TestCase)):
- """A test of the Face layer of RPC Framework.
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._called = False
+ self._passed_future = None
+ self._passed_other_stuff = None
+
+ def __call__(self, *args, **kwargs):
+ with self._condition:
+ self._called = True
+ if args:
+ self._passed_future = args[0]
+ if 1 < len(args) or kwargs:
+ self._passed_other_stuff = tuple(args[1:]), dict(kwargs)
+ self._condition.notify_all()
+
+ def future(self):
+ with self._condition:
+ while True:
+ if self._passed_other_stuff is not None:
+ raise ValueError(
+ 'Test callback passed unexpected values: %s',
+ self._passed_other_stuff)
+ elif self._called:
+ return self._passed_future
+ else:
+ self._condition.wait()
+
+
+class TestCase(
+ six.with_metaclass(abc.ABCMeta, test_coverage.Coverage,
+ unittest.TestCase)):
+ """A test of the Face layer of RPC Framework.
Concrete subclasses must have an "implementation" attribute of type
test_interfaces.Implementation and an "invoker_constructor" attribute of type
_invocation.InvokerConstructor.
"""
- NAME = 'FutureInvocationAsynchronousEventServiceTest'
+ NAME = 'FutureInvocationAsynchronousEventServiceTest'
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
+ def setUp(self):
+ """See unittest.TestCase.setUp for full specification.
Overriding implementations must call this implementation.
"""
- self._control = test_control.PauseFailControl()
- self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
- self._digest = _digest.digest(
- _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool)
+ self._control = test_control.PauseFailControl()
+ self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE)
+ self._digest = _digest.digest(_stock_service.STOCK_TEST_SERVICE,
+ self._control, self._digest_pool)
- generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
- self._digest.methods, self._digest.event_method_implementations, None)
- self._invoker = self.invoker_constructor.construct_invoker(
- generic_stub, dynamic_stubs, self._digest.methods)
+ generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate(
+ self._digest.methods, self._digest.event_method_implementations,
+ None)
+ self._invoker = self.invoker_constructor.construct_invoker(
+ generic_stub, dynamic_stubs, self._digest.methods)
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
+ def tearDown(self):
+ """See unittest.TestCase.tearDown for full specification.
Overriding implementations must call this implementation.
"""
- self._invoker = None
- self.implementation.destantiate(self._memo)
- self._digest_pool.shutdown(wait=True)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- response = response_future.result()
-
- test_messages.verify(request, response, self)
- self.assertIs(callback.future(), response_future)
- self.assertIsNone(response_future.exception())
- self.assertIsNone(response_future.traceback())
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
- callback = _Callback()
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_future = self._invoker.future(group, method)(
- request_iterator, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- future_passed_to_callback = callback.future()
- response = future_passed_to_callback.result()
-
- test_messages.verify(requests, response, self)
- self.assertIs(future_passed_to_callback, response_future)
- self.assertIsNone(response_future.exception())
- self.assertIsNone(response_future.traceback())
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_iterator = self._invoker.future(group, method)(
- request_iterator, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
-
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- second_response = second_response_future.result()
-
- test_messages.verify(second_request, second_response, self)
-
- def testParallelInvocations(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures = []
- for _ in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- requests.append(request)
- response_futures.append(response_future)
-
- responses = [
- response_future.result() for response_future in response_futures]
-
- for request, response in zip(requests, responses):
- test_messages.verify(request, response, self)
-
- def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = []
- response_futures_to_indices = {}
- for index in range(test_constants.THREAD_CONCURRENCY):
- request = test_messages.request()
- inner_response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- outer_response_future = pool.submit(inner_response_future.result)
- requests.append(request)
- response_futures_to_indices[outer_response_future] = index
-
- some_completed_response_futures_iterator = itertools.islice(
- futures.as_completed(response_futures_to_indices),
- test_constants.THREAD_CONCURRENCY // 2)
- for response_future in some_completed_response_futures_iterator:
- index = response_futures_to_indices[response_future]
- test_messages.verify(requests[index], response_future.result(), self)
- pool.shutdown(wait=True)
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- cancel_method_return_value = response_future.cancel()
-
- self.assertIs(callback.future(), response_future)
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
- with self.assertRaises(future.CancelledError):
- response_future.result()
- with self.assertRaises(future.CancelledError):
- response_future.exception()
- with self.assertRaises(future.CancelledError):
- response_future.traceback()
-
- def testCancelledUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- request, test_constants.LONG_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(face.CancellationError):
- next(response_iterator)
-
- def testCancelledStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- response_future.add_done_callback(callback)
- cancel_method_return_value = response_future.cancel()
-
- self.assertIs(callback.future(), response_future)
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
- with self.assertRaises(future.CancelledError):
- response_future.result()
- with self.assertRaises(future.CancelledError):
- response_future.exception()
- with self.assertRaises(future.CancelledError):
- response_future.traceback()
-
- def testCancelledStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- iter(requests), test_constants.LONG_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(face.CancellationError):
- next(response_iterator)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(
- group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- self.assertIs(callback.future(), response_future)
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsInstance(
- response_future.exception(), face.AbortionError)
- self.assertIsNotNone(response_future.traceback())
-
- def testExpiredUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- with self.assertRaises(face.ExpirationError):
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
-
- with self._control.pause():
- response_future = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- self.assertIs(callback.future(), response_future)
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsInstance(
- response_future.exception(), face.AbortionError)
- self.assertIsNotNone(response_future.traceback())
-
- def testExpiredStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self._control.pause():
- response_iterator = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- with self.assertRaises(face.ExpirationError):
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = _Callback()
- abortion_callback = _Callback()
-
- with self._control.fail():
- response_future = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- response_future.add_abortion_callback(abortion_callback)
-
- self.assertIs(callback.future(), response_future)
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsNotNone(response_future.traceback())
- self.assertIsNotNone(abortion_callback.future())
-
- def testFailedUnaryRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self._control.fail(), self.assertRaises(face.ExpirationError):
- response_iterator = self._invoker.future(group, method)(
- request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = _Callback()
- abortion_callback = _Callback()
-
- with self._control.fail():
- response_future = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- response_future.add_done_callback(callback)
- response_future.add_abortion_callback(abortion_callback)
-
- self.assertIs(callback.future(), response_future)
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(
- response_future.exception(), face.ExpirationError)
- with self.assertRaises(face.ExpirationError):
- response_future.result()
- self.assertIsNotNone(response_future.traceback())
- self.assertIsNotNone(abortion_callback.future())
-
- def testFailedStreamRequestStreamResponse(self):
- for (group, method), test_messages_sequence in (
- six.iteritems(self._digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self._control.fail(), self.assertRaises(face.ExpirationError):
- response_iterator = self._invoker.future(group, method)(
- iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT)
- list(response_iterator)
+ self._invoker = None
+ self.implementation.destantiate(self._memo)
+ self._digest_pool.shutdown(wait=True)
+
+ def testSuccessfulUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ response = response_future.result()
+
+ test_messages.verify(request, response, self)
+ self.assertIs(callback.future(), response_future)
+ self.assertIsNone(response_future.exception())
+ self.assertIsNone(response_future.traceback())
+
+ def testSuccessfulUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(request, responses, self)
+
+ def testSuccessfulStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+ callback = _Callback()
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_future = self._invoker.future(group, method)(
+ request_iterator, test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ future_passed_to_callback = callback.future()
+ response = future_passed_to_callback.result()
+
+ test_messages.verify(requests, response, self)
+ self.assertIs(future_passed_to_callback, response_future)
+ self.assertIsNone(response_future.exception())
+ self.assertIsNone(response_future.traceback())
+
+ def testSuccessfulStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ request_iterator = _PauseableIterator(iter(requests))
+
+ # Use of a paused iterator of requests allows us to test that control is
+ # returned to calling code before the iterator yields any requests.
+ with request_iterator.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request_iterator, test_constants.LONG_TIMEOUT)
+ responses = list(response_iterator)
+
+ test_messages.verify(requests, responses, self)
+
+ def testSequentialInvocations(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ second_response = second_response_future.result()
+
+ test_messages.verify(second_request, second_response, self)
+
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result()
+ for response_future in response_futures
+ ]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.THREAD_CONCURRENCY):
+ request = test_messages.request()
+ inner_response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ outer_response_future = pool.submit(
+ inner_response_future.result)
+ requests.append(request)
+ response_futures_to_indices[outer_response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.THREAD_CONCURRENCY // 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index],
+ response_future.result(), self)
+ pool.shutdown(wait=True)
+
+ def testCancelledUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ cancel_method_return_value = response_future.cancel()
+
+ self.assertIs(callback.future(), response_future)
+ self.assertFalse(cancel_method_return_value)
+ self.assertTrue(response_future.cancelled())
+ with self.assertRaises(future.CancelledError):
+ response_future.result()
+ with self.assertRaises(future.CancelledError):
+ response_future.exception()
+ with self.assertRaises(future.CancelledError):
+ response_future.traceback()
+
+ def testCancelledUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(face.CancellationError):
+ next(response_iterator)
+
+ def testCancelledStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ response_future.add_done_callback(callback)
+ cancel_method_return_value = response_future.cancel()
+
+ self.assertIs(callback.future(), response_future)
+ self.assertFalse(cancel_method_return_value)
+ self.assertTrue(response_future.cancelled())
+ with self.assertRaises(future.CancelledError):
+ response_future.result()
+ with self.assertRaises(future.CancelledError):
+ response_future.exception()
+ with self.assertRaises(future.CancelledError):
+ response_future.traceback()
+
+ def testCancelledStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests), test_constants.LONG_TIMEOUT)
+ response_iterator.cancel()
+
+ with self.assertRaises(face.CancellationError):
+ next(response_iterator)
+
+ def testExpiredUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+ self.assertIs(callback.future(), response_future)
+ self.assertIsInstance(response_future.exception(),
+ face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+ self.assertIsInstance(response_future.exception(),
+ face.AbortionError)
+ self.assertIsNotNone(response_future.traceback())
+
+ def testExpiredUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ with self.assertRaises(face.ExpirationError):
+ list(response_iterator)
+
+ def testExpiredStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = _Callback()
+
+ with self._control.pause():
+ response_future = self._invoker.future(group, method)(
+ iter(requests),
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+ self.assertIs(callback.future(), response_future)
+ self.assertIsInstance(response_future.exception(),
+ face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+ self.assertIsInstance(response_future.exception(),
+ face.AbortionError)
+ self.assertIsNotNone(response_future.traceback())
+
+ def testExpiredStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ with self._control.pause():
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests),
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ with self.assertRaises(face.ExpirationError):
+ list(response_iterator)
+
+ def testFailedUnaryRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+ callback = _Callback()
+ abortion_callback = _Callback()
+
+ with self._control.fail():
+ response_future = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+ response_future.add_abortion_callback(abortion_callback)
+
+ self.assertIs(callback.future(), response_future)
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is
+ # indistinguishable from simply not having called its
+ # response_callback before the expiration of the RPC.
+ self.assertIsInstance(response_future.exception(),
+ face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+ self.assertIsNotNone(response_future.traceback())
+ self.assertIsNotNone(abortion_callback.future())
+
+ def testFailedUnaryRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.unary_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ request = test_messages.request()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self._control.fail(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.future(group, method)(
+ request, _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
+
+ def testFailedStreamRequestUnaryResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_unary_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+ callback = _Callback()
+ abortion_callback = _Callback()
+
+ with self._control.fail():
+ response_future = self._invoker.future(group, method)(
+ iter(requests),
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ response_future.add_done_callback(callback)
+ response_future.add_abortion_callback(abortion_callback)
+
+ self.assertIs(callback.future(), response_future)
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is
+ # indistinguishable from simply not having called its
+ # response_callback before the expiration of the RPC.
+ self.assertIsInstance(response_future.exception(),
+ face.ExpirationError)
+ with self.assertRaises(face.ExpirationError):
+ response_future.result()
+ self.assertIsNotNone(response_future.traceback())
+ self.assertIsNotNone(abortion_callback.future())
+
+ def testFailedStreamRequestStreamResponse(self):
+ for (group, method), test_messages_sequence in (
+ six.iteritems(self._digest.stream_stream_messages_sequences)):
+ for test_messages in test_messages_sequence:
+ requests = test_messages.requests()
+
+ # Because the servicer fails outside of the thread from which the
+ # servicer-side runtime called into it its failure is indistinguishable
+ # from simply not having called its response_consumer before the
+ # expiration of the RPC.
+ with self._control.fail(), self.assertRaises(
+ face.ExpirationError):
+ response_iterator = self._invoker.future(group, method)(
+ iter(requests),
+ _3069_test_constant.REALLY_SHORT_TIMEOUT)
+ list(response_iterator)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
index ac487bed4f..4e144a3635 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_invocation.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Coverage across the Face layer's generic-to-dynamic range for invocation."""
import abc
@@ -65,149 +64,149 @@ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = {
class Invoker(six.with_metaclass(abc.ABCMeta)):
- """A type used to invoke test RPCs."""
+ """A type used to invoke test RPCs."""
- @abc.abstractmethod
- def blocking(self, group, name):
- """Invokes an RPC with blocking control flow."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def blocking(self, group, name):
+ """Invokes an RPC with blocking control flow."""
+ raise NotImplementedError()
- @abc.abstractmethod
- def future(self, group, name):
- """Invokes an RPC with future control flow."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def future(self, group, name):
+ """Invokes an RPC with future control flow."""
+ raise NotImplementedError()
- @abc.abstractmethod
- def event(self, group, name):
- """Invokes an RPC with event control flow."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def event(self, group, name):
+ """Invokes an RPC with event control flow."""
+ raise NotImplementedError()
class InvokerConstructor(six.with_metaclass(abc.ABCMeta)):
- """A type used to create Invokers."""
+ """A type used to create Invokers."""
- @abc.abstractmethod
- def name(self):
- """Specifies the name of the Invoker constructed by this object."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def name(self):
+ """Specifies the name of the Invoker constructed by this object."""
+ raise NotImplementedError()
- @abc.abstractmethod
- def construct_invoker(self, generic_stub, dynamic_stubs, methods):
- """Constructs an Invoker for the given stubs and methods."""
- raise NotImplementedError()
+ @abc.abstractmethod
+ def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+ """Constructs an Invoker for the given stubs and methods."""
+ raise NotImplementedError()
class _GenericInvoker(Invoker):
- def __init__(self, generic_stub, methods):
- self._stub = generic_stub
- self._methods = methods
+ def __init__(self, generic_stub, methods):
+ self._stub = generic_stub
+ self._methods = methods
- def _behavior(self, group, name, cardinality_to_generic_method):
- method_cardinality = self._methods[group, name].cardinality()
- behavior = getattr(
- self._stub, cardinality_to_generic_method[method_cardinality])
- return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
+ def _behavior(self, group, name, cardinality_to_generic_method):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(self._stub,
+ cardinality_to_generic_method[method_cardinality])
+ return lambda *args, **kwargs: behavior(group, name, *args, **kwargs)
- def blocking(self, group, name):
- return self._behavior(
- group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
+ def blocking(self, group, name):
+ return self._behavior(group, name,
+ _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR)
- def future(self, group, name):
- return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
+ def future(self, group, name):
+ return self._behavior(group, name,
+ _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR)
- def event(self, group, name):
- return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
+ def event(self, group, name):
+ return self._behavior(group, name,
+ _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR)
class _GenericInvokerConstructor(InvokerConstructor):
- def name(self):
- return 'GenericInvoker'
+ def name(self):
+ return 'GenericInvoker'
- def construct_invoker(self, generic_stub, dynamic_stub, methods):
- return _GenericInvoker(generic_stub, methods)
+ def construct_invoker(self, generic_stub, dynamic_stub, methods):
+ return _GenericInvoker(generic_stub, methods)
class _MultiCallableInvoker(Invoker):
- def __init__(self, generic_stub, methods):
- self._stub = generic_stub
- self._methods = methods
+ def __init__(self, generic_stub, methods):
+ self._stub = generic_stub
+ self._methods = methods
- def _multi_callable(self, group, name):
- method_cardinality = self._methods[group, name].cardinality()
- behavior = getattr(
- self._stub,
- _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
- return behavior(group, name)
+ def _multi_callable(self, group, name):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub,
+ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+ return behavior(group, name)
- def blocking(self, group, name):
- return self._multi_callable(group, name)
+ def blocking(self, group, name):
+ return self._multi_callable(group, name)
- def future(self, group, name):
- method_cardinality = self._methods[group, name].cardinality()
- behavior = getattr(
- self._stub,
- _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
- if method_cardinality in (
- cardinality.Cardinality.UNARY_UNARY,
- cardinality.Cardinality.STREAM_UNARY):
- return behavior(group, name).future
- else:
- return behavior(group, name)
+ def future(self, group, name):
+ method_cardinality = self._methods[group, name].cardinality()
+ behavior = getattr(
+ self._stub,
+ _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality])
+ if method_cardinality in (cardinality.Cardinality.UNARY_UNARY,
+ cardinality.Cardinality.STREAM_UNARY):
+ return behavior(group, name).future
+ else:
+ return behavior(group, name)
- def event(self, group, name):
- return self._multi_callable(group, name).event
+ def event(self, group, name):
+ return self._multi_callable(group, name).event
class _MultiCallableInvokerConstructor(InvokerConstructor):
- def name(self):
- return 'MultiCallableInvoker'
+ def name(self):
+ return 'MultiCallableInvoker'
- def construct_invoker(self, generic_stub, dynamic_stub, methods):
- return _MultiCallableInvoker(generic_stub, methods)
+ def construct_invoker(self, generic_stub, dynamic_stub, methods):
+ return _MultiCallableInvoker(generic_stub, methods)
class _DynamicInvoker(Invoker):
- def __init__(self, dynamic_stubs, methods):
- self._stubs = dynamic_stubs
- self._methods = methods
+ def __init__(self, dynamic_stubs, methods):
+ self._stubs = dynamic_stubs
+ self._methods = methods
- def blocking(self, group, name):
- return getattr(self._stubs[group], name)
+ def blocking(self, group, name):
+ return getattr(self._stubs[group], name)
- def future(self, group, name):
- if self._methods[group, name].cardinality() in (
- cardinality.Cardinality.UNARY_UNARY,
- cardinality.Cardinality.STREAM_UNARY):
- return getattr(self._stubs[group], name).future
- else:
- return getattr(self._stubs[group], name)
+ def future(self, group, name):
+ if self._methods[group, name].cardinality() in (
+ cardinality.Cardinality.UNARY_UNARY,
+ cardinality.Cardinality.STREAM_UNARY):
+ return getattr(self._stubs[group], name).future
+ else:
+ return getattr(self._stubs[group], name)
- def event(self, group, name):
- return getattr(self._stubs[group], name).event
+ def event(self, group, name):
+ return getattr(self._stubs[group], name).event
class _DynamicInvokerConstructor(InvokerConstructor):
- def name(self):
- return 'DynamicInvoker'
+ def name(self):
+ return 'DynamicInvoker'
- def construct_invoker(self, generic_stub, dynamic_stubs, methods):
- return _DynamicInvoker(dynamic_stubs, methods)
+ def construct_invoker(self, generic_stub, dynamic_stubs, methods):
+ return _DynamicInvoker(dynamic_stubs, methods)
def invoker_constructors():
- """Creates a sequence of InvokerConstructors to use in tests of RPCs.
+ """Creates a sequence of InvokerConstructors to use in tests of RPCs.
Returns:
A sequence of InvokerConstructors.
"""
- return (
- _GenericInvokerConstructor(),
- _MultiCallableInvokerConstructor(),
- _DynamicInvokerConstructor(),
- )
+ return (
+ _GenericInvokerConstructor(),
+ _MultiCallableInvokerConstructor(),
+ _DynamicInvokerConstructor(),)
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
index f13dff0558..f14ac6a987 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_service.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Private interfaces implemented by data sets used in Face-layer tests."""
import abc
@@ -38,12 +37,13 @@ from grpc.framework.interfaces.face import face # pylint: disable=unused-import
from tests.unit.framework.interfaces.face import test_interfaces
-class UnaryUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a unary-unary method."""
+class UnaryUnaryTestMethodImplementation(
+ six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a unary-unary method."""
- @abc.abstractmethod
- def service(self, request, response_callback, context, control):
- """Services an RPC that accepts one message and produces one message.
+ @abc.abstractmethod
+ def service(self, request, response_callback, context, control):
+ """Services an RPC that accepts one message and produces one message.
Args:
request: The single request message for the RPC.
@@ -56,15 +56,15 @@ class UnaryUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_in
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for unary-request-unary-response message pairings."""
+ """A type for unary-request-unary-response message pairings."""
- @abc.abstractmethod
- def request(self):
- """Affords a request message.
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
Implementations of this method should return a different message with each
call so that multiple test executions of the test method may be made with
@@ -73,11 +73,11 @@ class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
Returns:
A request message.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def verify(self, request, response, test_case):
- """Verifies that the computed response matches the given request.
+ @abc.abstractmethod
+ def verify(self, request, response, test_case):
+ """Verifies that the computed response matches the given request.
Args:
request: A request message.
@@ -88,15 +88,16 @@ class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
AssertionError: If the request and response do not match, indicating that
there was some problem executing the RPC under test.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
-class UnaryStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a unary-stream method."""
+class UnaryStreamTestMethodImplementation(
+ six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a unary-stream method."""
- @abc.abstractmethod
- def service(self, request, response_consumer, context, control):
- """Services an RPC that takes one message and produces a stream of messages.
+ @abc.abstractmethod
+ def service(self, request, response_consumer, context, control):
+ """Services an RPC that takes one message and produces a stream of messages.
Args:
request: The single request message for the RPC.
@@ -109,15 +110,15 @@ class UnaryStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_i
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for unary-request-stream-response message pairings."""
+ """A type for unary-request-stream-response message pairings."""
- @abc.abstractmethod
- def request(self):
- """Affords a request message.
+ @abc.abstractmethod
+ def request(self):
+ """Affords a request message.
Implementations of this method should return a different message with each
call so that multiple test executions of the test method may be made with
@@ -126,11 +127,11 @@ class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
Returns:
A request message.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def verify(self, request, responses, test_case):
- """Verifies that the computed responses match the given request.
+ @abc.abstractmethod
+ def verify(self, request, responses, test_case):
+ """Verifies that the computed responses match the given request.
Args:
request: A request message.
@@ -141,15 +142,16 @@ class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
AssertionError: If the request and responses do not match, indicating that
there was some problem executing the RPC under test.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
-class StreamUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a stream-unary method."""
+class StreamUnaryTestMethodImplementation(
+ six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a stream-unary method."""
- @abc.abstractmethod
- def service(self, response_callback, context, control):
- """Services an RPC that takes a stream of messages and produces one message.
+ @abc.abstractmethod
+ def service(self, response_callback, context, control):
+ """Services an RPC that takes a stream of messages and produces one message.
Args:
response_callback: A callback to be called to accept the response message
@@ -169,15 +171,15 @@ class StreamUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_i
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for stream-request-unary-response message pairings."""
+ """A type for stream-request-unary-response message pairings."""
- @abc.abstractmethod
- def requests(self):
- """Affords a sequence of request messages.
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
Implementations of this method should return a different sequences with each
call so that multiple test executions of the test method may be made with
@@ -186,11 +188,11 @@ class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
Returns:
A sequence of request messages.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def verify(self, requests, response, test_case):
- """Verifies that the computed response matches the given requests.
+ @abc.abstractmethod
+ def verify(self, requests, response, test_case):
+ """Verifies that the computed response matches the given requests.
Args:
requests: A sequence of request messages.
@@ -201,15 +203,16 @@ class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
AssertionError: If the requests and response do not match, indicating that
there was some problem executing the RPC under test.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
-class StreamStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
- """A controllable implementation of a stream-stream method."""
+class StreamStreamTestMethodImplementation(
+ six.with_metaclass(abc.ABCMeta, test_interfaces.Method)):
+ """A controllable implementation of a stream-stream method."""
- @abc.abstractmethod
- def service(self, response_consumer, context, control):
- """Services an RPC that accepts and produces streams of messages.
+ @abc.abstractmethod
+ def service(self, response_consumer, context, control):
+ """Services an RPC that accepts and produces streams of messages.
Args:
response_consumer: A stream.Consumer to be called to accept the response
@@ -229,15 +232,15 @@ class StreamStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, test_
abandonment.Abandoned: May or may not be raised when the RPC has been
aborted.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for stream-request-stream-response message pairings."""
+ """A type for stream-request-stream-response message pairings."""
- @abc.abstractmethod
- def requests(self):
- """Affords a sequence of request messages.
+ @abc.abstractmethod
+ def requests(self):
+ """Affords a sequence of request messages.
Implementations of this method should return a different sequences with each
call so that multiple test executions of the test method may be made with
@@ -246,11 +249,11 @@ class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
Returns:
A sequence of request messages.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def verify(self, requests, responses, test_case):
- """Verifies that the computed response matches the given requests.
+ @abc.abstractmethod
+ def verify(self, requests, responses, test_case):
+ """Verifies that the computed response matches the given requests.
Args:
requests: A sequence of request messages.
@@ -261,15 +264,15 @@ class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
AssertionError: If the requests and responses do not match, indicating
that there was some problem executing the RPC under test.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
class TestService(six.with_metaclass(abc.ABCMeta)):
- """A specification of implemented methods to use in tests."""
+ """A specification of implemented methods to use in tests."""
- @abc.abstractmethod
- def unary_unary_scenarios(self):
- """Affords unary-request-unary-response test methods and their messages.
+ @abc.abstractmethod
+ def unary_unary_scenarios(self):
+ """Affords unary-request-unary-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
@@ -277,11 +280,11 @@ class TestService(six.with_metaclass(abc.ABCMeta)):
and the second element is a sequence of UnaryUnaryTestMethodMessages
objects.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def unary_stream_scenarios(self):
- """Affords unary-request-stream-response test methods and their messages.
+ @abc.abstractmethod
+ def unary_stream_scenarios(self):
+ """Affords unary-request-stream-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
@@ -289,11 +292,11 @@ class TestService(six.with_metaclass(abc.ABCMeta)):
object and the second element is a sequence of
UnaryStreamTestMethodMessages objects.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def stream_unary_scenarios(self):
- """Affords stream-request-unary-response test methods and their messages.
+ @abc.abstractmethod
+ def stream_unary_scenarios(self):
+ """Affords stream-request-unary-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
@@ -301,11 +304,11 @@ class TestService(six.with_metaclass(abc.ABCMeta)):
object and the second element is a sequence of
StreamUnaryTestMethodMessages objects.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def stream_stream_scenarios(self):
- """Affords stream-request-stream-response test methods and their messages.
+ @abc.abstractmethod
+ def stream_stream_scenarios(self):
+ """Affords stream-request-stream-response test methods and their messages.
Returns:
A dict from method group-name pair to implementation/messages pair. The
@@ -313,4 +316,4 @@ class TestService(six.with_metaclass(abc.ABCMeta)):
object and the second element is a sequence of
StreamStreamTestMethodMessages objects.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
index 5299655bb3..41a55c13f4 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_stock_service.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Examples of Python implementations of the stock.proto Stock service."""
from grpc.framework.common import cardinality
@@ -44,353 +43,363 @@ _price = lambda symbol_name: float(hash(symbol_name) % 4096)
def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
- """A unary-request, unary-response test method."""
- control.control()
- if active():
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=_price(stock_request.symbol)))
- else:
- raise abandonment.Abandoned()
-
-
-def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
- """A stream-request, stream-response test method."""
- def stock_reply_for_stock_request(stock_request):
+ """A unary-request, unary-response test method."""
control.control()
if active():
- return stock_pb2.StockReply(
- symbol=stock_request.symbol, price=_price(stock_request.symbol))
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(
+ stock_request.symbol)))
else:
- raise abandonment.Abandoned()
-
- class StockRequestConsumer(stream.Consumer):
+ raise abandonment.Abandoned()
- def consume(self, stock_request):
- stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request))
- def terminate(self):
- control.control()
- stock_reply_consumer.terminate()
+def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
+ """A stream-request, stream-response test method."""
- def consume_and_terminate(self, stock_request):
- stock_reply_consumer.consume_and_terminate(
- stock_reply_for_stock_request(stock_request))
+ def stock_reply_for_stock_request(stock_request):
+ control.control()
+ if active():
+ return stock_pb2.StockReply(
+ symbol=stock_request.symbol, price=_price(stock_request.symbol))
+ else:
+ raise abandonment.Abandoned()
- return StockRequestConsumer()
+ class StockRequestConsumer(stream.Consumer):
+ def consume(self, stock_request):
+ stock_reply_consumer.consume(
+ stock_reply_for_stock_request(stock_request))
-def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
- """A unary-request, stream-response test method."""
- base_price = _price(stock_request.symbol)
- for index in range(stock_request.num_trades_to_watch):
- control.control()
- if active():
- stock_reply_consumer.consume(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=base_price + index))
- else:
- raise abandonment.Abandoned()
- stock_reply_consumer.terminate()
+ def terminate(self):
+ control.control()
+ stock_reply_consumer.terminate()
+ def consume_and_terminate(self, stock_request):
+ stock_reply_consumer.consume_and_terminate(
+ stock_reply_for_stock_request(stock_request))
-def _get_highest_trade_price(stock_reply_callback, control, active):
- """A stream-request, unary-response test method."""
+ return StockRequestConsumer()
- class StockRequestConsumer(stream.Consumer):
- """Keeps an ongoing record of the most valuable symbol yet consumed."""
- def __init__(self):
- self._symbol = None
- self._price = None
-
- def consume(self, stock_request):
- control.control()
- if active():
- if self._price is None:
- self._symbol = stock_request.symbol
- self._price = _price(stock_request.symbol)
- else:
- candidate_price = _price(stock_request.symbol)
- if self._price < candidate_price:
- self._symbol = stock_request.symbol
- self._price = candidate_price
-
- def terminate(self):
- control.control()
- if active():
- if self._symbol is None:
- raise ValueError()
- else:
- stock_reply_callback(
- stock_pb2.StockReply(symbol=self._symbol, price=self._price))
- self._symbol = None
- self._price = None
-
- def consume_and_terminate(self, stock_request):
- control.control()
- if active():
- if self._price is None:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol,
- price=_price(stock_request.symbol)))
- else:
- candidate_price = _price(stock_request.symbol)
- if self._price < candidate_price:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=candidate_price))
- else:
- stock_reply_callback(
+def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
+ """A unary-request, stream-response test method."""
+ base_price = _price(stock_request.symbol)
+ for index in range(stock_request.num_trades_to_watch):
+ control.control()
+ if active():
+ stock_reply_consumer.consume(
stock_pb2.StockReply(
- symbol=self._symbol, price=self._price))
+ symbol=stock_request.symbol, price=base_price + index))
+ else:
+ raise abandonment.Abandoned()
+ stock_reply_consumer.terminate()
- self._symbol = None
- self._price = None
- return StockRequestConsumer()
+def _get_highest_trade_price(stock_reply_callback, control, active):
+ """A stream-request, unary-response test method."""
+
+ class StockRequestConsumer(stream.Consumer):
+ """Keeps an ongoing record of the most valuable symbol yet consumed."""
+
+ def __init__(self):
+ self._symbol = None
+ self._price = None
+
+ def consume(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ self._symbol = stock_request.symbol
+ self._price = _price(stock_request.symbol)
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ self._symbol = stock_request.symbol
+ self._price = candidate_price
+
+ def terminate(self):
+ control.control()
+ if active():
+ if self._symbol is None:
+ raise ValueError()
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=self._symbol, price=self._price))
+ self._symbol = None
+ self._price = None
+
+ def consume_and_terminate(self, stock_request):
+ control.control()
+ if active():
+ if self._price is None:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol,
+ price=_price(stock_request.symbol)))
+ else:
+ candidate_price = _price(stock_request.symbol)
+ if self._price < candidate_price:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=stock_request.symbol,
+ price=candidate_price))
+ else:
+ stock_reply_callback(
+ stock_pb2.StockReply(
+ symbol=self._symbol, price=self._price))
+
+ self._symbol = None
+ self._price = None
+
+ return StockRequestConsumer()
class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation):
- """GetLastTradePrice for use in tests."""
+ """GetLastTradePrice for use in tests."""
- def group(self):
- return _STOCK_GROUP_NAME
+ def group(self):
+ return _STOCK_GROUP_NAME
- def name(self):
- return 'GetLastTradePrice'
+ def name(self):
+ return 'GetLastTradePrice'
- def cardinality(self):
- return cardinality.Cardinality.UNARY_UNARY
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_UNARY
- def request_class(self):
- return stock_pb2.StockRequest
+ def request_class(self):
+ return stock_pb2.StockRequest
- def response_class(self):
- return stock_pb2.StockReply
+ def response_class(self):
+ return stock_pb2.StockReply
- def serialize_request(self, request):
- return request.SerializeToString()
+ def serialize_request(self, request):
+ return request.SerializeToString()
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
- def serialize_response(self, response):
- return response.SerializeToString()
+ def serialize_response(self, response):
+ return response.SerializeToString()
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
- def service(self, request, response_callback, context, control):
- _get_last_trade_price(
- request, response_callback, control, context.is_active)
+ def service(self, request, response_callback, context, control):
+ _get_last_trade_price(request, response_callback, control,
+ context.is_active)
class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages):
- def __init__(self):
- self._index = 0
+ def __init__(self):
+ self._index = 0
- def request(self):
- symbol = _SYMBOL_FORMAT % self._index
- self._index += 1
- return stock_pb2.StockRequest(symbol=symbol)
+ def request(self):
+ symbol = _SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(symbol=symbol)
- def verify(self, request, response, test_case):
- test_case.assertEqual(request.symbol, response.symbol)
- test_case.assertEqual(_price(request.symbol), response.price)
+ def verify(self, request, response, test_case):
+ test_case.assertEqual(request.symbol, response.symbol)
+ test_case.assertEqual(_price(request.symbol), response.price)
class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation):
- """GetLastTradePriceMultiple for use in tests."""
+ """GetLastTradePriceMultiple for use in tests."""
- def group(self):
- return _STOCK_GROUP_NAME
+ def group(self):
+ return _STOCK_GROUP_NAME
- def name(self):
- return 'GetLastTradePriceMultiple'
+ def name(self):
+ return 'GetLastTradePriceMultiple'
- def cardinality(self):
- return cardinality.Cardinality.STREAM_STREAM
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_STREAM
- def request_class(self):
- return stock_pb2.StockRequest
+ def request_class(self):
+ return stock_pb2.StockRequest
- def response_class(self):
- return stock_pb2.StockReply
+ def response_class(self):
+ return stock_pb2.StockReply
- def serialize_request(self, request):
- return request.SerializeToString()
+ def serialize_request(self, request):
+ return request.SerializeToString()
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
- def serialize_response(self, response):
- return response.SerializeToString()
+ def serialize_response(self, response):
+ return response.SerializeToString()
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
- def service(self, response_consumer, context, control):
- return _get_last_trade_price_multiple(
- response_consumer, control, context.is_active)
+ def service(self, response_consumer, context, control):
+ return _get_last_trade_price_multiple(response_consumer, control,
+ context.is_active)
class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages):
- """Pairs of message streams for use with GetLastTradePriceMultiple."""
+ """Pairs of message streams for use with GetLastTradePriceMultiple."""
- def __init__(self):
- self._index = 0
+ def __init__(self):
+ self._index = 0
- def requests(self):
- base_index = self._index
- self._index += 1
- return [
- stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
- for index in range(test_constants.STREAM_LENGTH)]
+ def requests(self):
+ base_index = self._index
+ self._index += 1
+ return [
+ stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index))
+ for index in range(test_constants.STREAM_LENGTH)
+ ]
- def verify(self, requests, responses, test_case):
- test_case.assertEqual(len(requests), len(responses))
- for stock_request, stock_reply in zip(requests, responses):
- test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
- test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
+ def verify(self, requests, responses, test_case):
+ test_case.assertEqual(len(requests), len(responses))
+ for stock_request, stock_reply in zip(requests, responses):
+ test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
+ test_case.assertEqual(
+ _price(stock_request.symbol), stock_reply.price)
class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation):
- """WatchFutureTrades for use in tests."""
+ """WatchFutureTrades for use in tests."""
- def group(self):
- return _STOCK_GROUP_NAME
+ def group(self):
+ return _STOCK_GROUP_NAME
- def name(self):
- return 'WatchFutureTrades'
+ def name(self):
+ return 'WatchFutureTrades'
- def cardinality(self):
- return cardinality.Cardinality.UNARY_STREAM
+ def cardinality(self):
+ return cardinality.Cardinality.UNARY_STREAM
- def request_class(self):
- return stock_pb2.StockRequest
+ def request_class(self):
+ return stock_pb2.StockRequest
- def response_class(self):
- return stock_pb2.StockReply
+ def response_class(self):
+ return stock_pb2.StockReply
- def serialize_request(self, request):
- return request.SerializeToString()
+ def serialize_request(self, request):
+ return request.SerializeToString()
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
- def serialize_response(self, response):
- return response.SerializeToString()
+ def serialize_response(self, response):
+ return response.SerializeToString()
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
- def service(self, request, response_consumer, context, control):
- _watch_future_trades(request, response_consumer, control, context.is_active)
+ def service(self, request, response_consumer, context, control):
+ _watch_future_trades(request, response_consumer, control,
+ context.is_active)
class WatchFutureTradesMessages(_service.UnaryStreamTestMessages):
- """Pairs of a single request message and a sequence of response messages."""
+ """Pairs of a single request message and a sequence of response messages."""
- def __init__(self):
- self._index = 0
+ def __init__(self):
+ self._index = 0
- def request(self):
- symbol = _SYMBOL_FORMAT % self._index
- self._index += 1
- return stock_pb2.StockRequest(
- symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
+ def request(self):
+ symbol = _SYMBOL_FORMAT % self._index
+ self._index += 1
+ return stock_pb2.StockRequest(
+ symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH)
- def verify(self, request, responses, test_case):
- test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
- base_price = _price(request.symbol)
- for index, response in enumerate(responses):
- test_case.assertEqual(base_price + index, response.price)
+ def verify(self, request, responses, test_case):
+ test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses))
+ base_price = _price(request.symbol)
+ for index, response in enumerate(responses):
+ test_case.assertEqual(base_price + index, response.price)
class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation):
- """GetHighestTradePrice for use in tests."""
+ """GetHighestTradePrice for use in tests."""
- def group(self):
- return _STOCK_GROUP_NAME
+ def group(self):
+ return _STOCK_GROUP_NAME
- def name(self):
- return 'GetHighestTradePrice'
+ def name(self):
+ return 'GetHighestTradePrice'
- def cardinality(self):
- return cardinality.Cardinality.STREAM_UNARY
+ def cardinality(self):
+ return cardinality.Cardinality.STREAM_UNARY
- def request_class(self):
- return stock_pb2.StockRequest
+ def request_class(self):
+ return stock_pb2.StockRequest
- def response_class(self):
- return stock_pb2.StockReply
+ def response_class(self):
+ return stock_pb2.StockReply
- def serialize_request(self, request):
- return request.SerializeToString()
+ def serialize_request(self, request):
+ return request.SerializeToString()
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
+ def deserialize_request(self, serialized_request):
+ return stock_pb2.StockRequest.FromString(serialized_request)
- def serialize_response(self, response):
- return response.SerializeToString()
+ def serialize_response(self, response):
+ return response.SerializeToString()
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
+ def deserialize_response(self, serialized_response):
+ return stock_pb2.StockReply.FromString(serialized_response)
- def service(self, response_callback, context, control):
- return _get_highest_trade_price(
- response_callback, control, context.is_active)
+ def service(self, response_callback, context, control):
+ return _get_highest_trade_price(response_callback, control,
+ context.is_active)
class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages):
- def requests(self):
- return [
- stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
- for index in range(test_constants.STREAM_LENGTH)]
-
- def verify(self, requests, response, test_case):
- price = None
- symbol = None
- for stock_request in requests:
- current_symbol = stock_request.symbol
- current_price = _price(current_symbol)
- if price is None or price < current_price:
- price = current_price
- symbol = current_symbol
- test_case.assertEqual(price, response.price)
- test_case.assertEqual(symbol, response.symbol)
+ def requests(self):
+ return [
+ stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index)
+ for index in range(test_constants.STREAM_LENGTH)
+ ]
+
+ def verify(self, requests, response, test_case):
+ price = None
+ symbol = None
+ for stock_request in requests:
+ current_symbol = stock_request.symbol
+ current_price = _price(current_symbol)
+ if price is None or price < current_price:
+ price = current_price
+ symbol = current_symbol
+ test_case.assertEqual(price, response.price)
+ test_case.assertEqual(symbol, response.symbol)
class StockTestService(_service.TestService):
- """A corpus of test data with one method of each RPC cardinality."""
-
- def unary_unary_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'GetLastTradePrice'): (
- GetLastTradePrice(), [GetLastTradePriceMessages()]),
- }
-
- def unary_stream_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'WatchFutureTrades'): (
- WatchFutureTrades(), [WatchFutureTradesMessages()]),
- }
-
- def stream_unary_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): (
- GetHighestTradePrice(), [GetHighestTradePriceMessages()])
- }
-
- def stream_stream_scenarios(self):
- return {
- (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): (
- GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
- }
+ """A corpus of test data with one method of each RPC cardinality."""
+
+ def unary_unary_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetLastTradePrice'):
+ (GetLastTradePrice(), [GetLastTradePriceMessages()]),
+ }
+
+ def unary_stream_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'WatchFutureTrades'):
+ (WatchFutureTrades(), [WatchFutureTradesMessages()]),
+ }
+
+ def stream_unary_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetHighestTradePrice'):
+ (GetHighestTradePrice(), [GetHighestTradePriceMessages()])
+ }
+
+ def stream_stream_scenarios(self):
+ return {
+ (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'):
+ (GetLastTradePriceMultiple(),
+ [GetLastTradePriceMultipleMessages()]),
+ }
STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
index 71de9d835e..d84e1fc136 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_cases.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Tools for creating tests of implementations of the Face layer."""
# unittest is referenced from specification in this module.
@@ -40,12 +39,11 @@ from tests.unit.framework.interfaces.face import test_interfaces # pylint: disa
_TEST_CASE_SUPERCLASSES = (
_blocking_invocation_inline_service.TestCase,
- _future_invocation_asynchronous_event_service.TestCase,
-)
+ _future_invocation_asynchronous_event_service.TestCase,)
def test_cases(implementation):
- """Creates unittest.TestCase classes for a given Face layer implementation.
+ """Creates unittest.TestCase classes for a given Face layer implementation.
Args:
implementation: A test_interfaces.Implementation specifying creation and
@@ -55,13 +53,14 @@ def test_cases(implementation):
A sequence of subclasses of unittest.TestCase defining tests of the
specified Face layer implementation.
"""
- test_case_classes = []
- for invoker_constructor in _invocation.invoker_constructors():
- for super_class in _TEST_CASE_SUPERCLASSES:
- test_case_classes.append(
- type(invoker_constructor.name() + super_class.NAME, (super_class,),
- {'implementation': implementation,
- 'invoker_constructor': invoker_constructor,
- '__module__': implementation.__module__,
- }))
- return test_case_classes
+ test_case_classes = []
+ for invoker_constructor in _invocation.invoker_constructors():
+ for super_class in _TEST_CASE_SUPERCLASSES:
+ test_case_classes.append(
+ type(invoker_constructor.name() + super_class.NAME, (
+ super_class,), {
+ 'implementation': implementation,
+ 'invoker_constructor': invoker_constructor,
+ '__module__': implementation.__module__,
+ }))
+ return test_case_classes
diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
index 40f38e68ba..a789d435b4 100644
--- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
+++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/test_interfaces.py
@@ -26,7 +26,6 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
"""Interfaces used in tests of implementations of the Face layer."""
import abc
@@ -38,103 +37,102 @@ from grpc.framework.interfaces.face import face # pylint: disable=unused-import
class Method(six.with_metaclass(abc.ABCMeta)):
- """Specifies a method to be used in tests."""
+ """Specifies a method to be used in tests."""
- @abc.abstractmethod
- def group(self):
- """Identify the group of the method.
+ @abc.abstractmethod
+ def group(self):
+ """Identify the group of the method.
Returns:
The group of the method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def name(self):
- """Identify the name of the method.
+ @abc.abstractmethod
+ def name(self):
+ """Identify the name of the method.
Returns:
The name of the method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def cardinality(self):
- """Identify the cardinality of the method.
+ @abc.abstractmethod
+ def cardinality(self):
+ """Identify the cardinality of the method.
Returns:
A cardinality.Cardinality value describing the streaming semantics of the
method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def request_class(self):
- """Identify the class used for the method's request objects.
+ @abc.abstractmethod
+ def request_class(self):
+ """Identify the class used for the method's request objects.
Returns:
The class object of the class to which the method's request objects
belong.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def response_class(self):
- """Identify the class used for the method's response objects.
+ @abc.abstractmethod
+ def response_class(self):
+ """Identify the class used for the method's response objects.
Returns:
The class object of the class to which the method's response objects
belong.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def serialize_request(self, request):
- """Serialize the given request object.
+ @abc.abstractmethod
+ def serialize_request(self, request):
+ """Serialize the given request object.
Args:
request: A request object appropriate for this method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def deserialize_request(self, serialized_request):
- """Synthesize a request object from a given bytestring.
+ @abc.abstractmethod
+ def deserialize_request(self, serialized_request):
+ """Synthesize a request object from a given bytestring.
Args:
serialized_request: A bytestring deserializable into a request object
appropriate for this method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def serialize_response(self, response):
- """Serialize the given response object.
+ @abc.abstractmethod
+ def serialize_response(self, response):
+ """Serialize the given response object.
Args:
response: A response object appropriate for this method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def deserialize_response(self, serialized_response):
- """Synthesize a response object from a given bytestring.
+ @abc.abstractmethod
+ def deserialize_response(self, serialized_response):
+ """Synthesize a response object from a given bytestring.
Args:
serialized_response: A bytestring deserializable into a response object
appropriate for this method.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
class Implementation(six.with_metaclass(abc.ABCMeta)):
- """Specifies an implementation of the Face layer."""
+ """Specifies an implementation of the Face layer."""
- @abc.abstractmethod
- def instantiate(
- self, methods, method_implementations,
- multi_method_implementation):
- """Instantiates the Face layer implementation to be used in a test.
+ @abc.abstractmethod
+ def instantiate(self, methods, method_implementations,
+ multi_method_implementation):
+ """Instantiates the Face layer implementation to be used in a test.
Args:
methods: A sequence of Method objects describing the methods available to
@@ -151,69 +149,69 @@ class Implementation(six.with_metaclass(abc.ABCMeta)):
passed to destantiate at the conclusion of the test. The returned stubs
must be backed by the provided implementations.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def destantiate(self, memo):
- """Destroys the Face layer implementation under test.
+ @abc.abstractmethod
+ def destantiate(self, memo):
+ """Destroys the Face layer implementation under test.
Args:
memo: The object from the third position of the return value of a call to
instantiate.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def invocation_metadata(self):
- """Provides the metadata to be used when invoking a test RPC.
+ @abc.abstractmethod
+ def invocation_metadata(self):
+ """Provides the metadata to be used when invoking a test RPC.
Returns:
An object to use as the supplied-at-invocation-time metadata in a test
RPC.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def initial_metadata(self):
- """Provides the metadata for use as a test RPC's first servicer metadata.
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Provides the metadata for use as a test RPC's first servicer metadata.
Returns:
An object to use as the from-the-servicer-before-responses metadata in a
test RPC.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def terminal_metadata(self):
- """Provides the metadata for use as a test RPC's second servicer metadata.
+ @abc.abstractmethod
+ def terminal_metadata(self):
+ """Provides the metadata for use as a test RPC's second servicer metadata.
Returns:
An object to use as the from-the-servicer-after-all-responses metadata in
a test RPC.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def code(self):
- """Provides the value for use as a test RPC's code.
+ @abc.abstractmethod
+ def code(self):
+ """Provides the value for use as a test RPC's code.
Returns:
An object to use as the from-the-servicer code in a test RPC.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def details(self):
- """Provides the value for use as a test RPC's details.
+ @abc.abstractmethod
+ def details(self):
+ """Provides the value for use as a test RPC's details.
Returns:
An object to use as the from-the-servicer details in a test RPC.
"""
- raise NotImplementedError()
+ raise NotImplementedError()
- @abc.abstractmethod
- def metadata_transmitted(self, original_metadata, transmitted_metadata):
- """Identifies whether or not metadata was properly transmitted.
+ @abc.abstractmethod
+ def metadata_transmitted(self, original_metadata, transmitted_metadata):
+ """Identifies whether or not metadata was properly transmitted.
Args:
original_metadata: A metadata value passed to the Face interface
@@ -226,4 +224,4 @@ class Implementation(six.with_metaclass(abc.ABCMeta)):
Whether or not the metadata was properly transmitted by the Face interface
implementation under test.
"""
- raise NotImplementedError()
+ raise NotImplementedError()