aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/python/grpcio/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/grpcio/tests/unit')
-rw-r--r--src/python/grpcio/tests/unit/framework/face/__init__.py30
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/__init__.py30
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/base_util.py102
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py224
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/callback.py94
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/control.py87
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/coverage.py121
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/digest.py452
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py378
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py384
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/interfaces.py118
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/service.py321
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/stock_service.py374
-rw-r--r--src/python/grpcio/tests/unit/framework/face/testing/test_case.py81
14 files changed, 0 insertions, 2796 deletions
diff --git a/src/python/grpcio/tests/unit/framework/face/__init__.py b/src/python/grpcio/tests/unit/framework/face/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/__init__.py b/src/python/grpcio/tests/unit/framework/face/testing/__init__.py
deleted file mode 100644
index 7086519106..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/__init__.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/base_util.py b/src/python/grpcio/tests/unit/framework/face/testing/base_util.py
deleted file mode 100644
index 59652b3e90..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/base_util.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Utilities for creating Base-layer objects for use in Face-layer tests."""
-
-import abc
-
-import six
-
-# interfaces is referenced from specification in this module.
-from grpc.framework.base import util as _base_util
-from grpc.framework.base import implementations
-from grpc.framework.base import in_memory
-from grpc.framework.base import interfaces # pylint: disable=unused-import
-from grpc.framework.foundation import logging_pool
-
-_POOL_SIZE_LIMIT = 5
-
-_MAXIMUM_TIMEOUT = 90
-
-
-class LinkedPair(six.with_metaclass(abc.ABCMeta)):
- """A Front and Back that are linked to one another.
-
- Attributes:
- front: An interfaces.Front.
- back: An interfaces.Back.
- """
-
- @abc.abstractmethod
- def shut_down(self):
- """Shuts down this object and releases its resources."""
- raise NotImplementedError()
-
-
-class _LinkedPair(LinkedPair):
-
- def __init__(self, front, back, pools):
- self.front = front
- self.back = back
- self._pools = pools
-
- def shut_down(self):
- _base_util.wait_for_idle(self.front)
- _base_util.wait_for_idle(self.back)
-
- for pool in self._pools:
- pool.shutdown(wait=True)
-
-
-def linked_pair(servicer, default_timeout):
- """Creates a Server and Stub linked together for use."""
- link_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT)
- pools = (
- link_pool,
- front_work_pool, front_transmission_pool, front_utility_pool,
- back_work_pool, back_transmission_pool, back_utility_pool)
-
- link = in_memory.Link(link_pool)
- front = implementations.front_link(
- front_work_pool, front_transmission_pool, front_utility_pool)
- back = implementations.back_link(
- servicer, back_work_pool, back_transmission_pool, back_utility_pool,
- default_timeout, _MAXIMUM_TIMEOUT)
- front.join_rear_link(link)
- link.join_fore_link(front)
- back.join_fore_link(link)
- link.join_rear_link(back)
-
- return _LinkedPair(front, back, pools)
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py
deleted file mode 100644
index 2ebe1a32a4..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py
+++ /dev/null
@@ -1,224 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A test to verify an implementation of the Face layer of RPC Framework."""
-
-# unittest is referenced from specification in this module.
-import abc
-import unittest # pylint: disable=unused-import
-
-import six
-
-from grpc.framework.face import exceptions
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.face.testing import control
-from tests.unit.framework.face.testing import coverage
-from tests.unit.framework.face.testing import digest
-from tests.unit.framework.face.testing import stock_service
-from tests.unit.framework.face.testing import test_case
-
-
-class BlockingInvocationInlineServiceTestCase(
- six.with_metaclass(abc.ABCMeta,
- test_case.FaceTestCase, coverage.BlockingCoverage)):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must also extend unittest.TestCase.
- """
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self.control = control.PauseFailControl()
- self.digest = digest.digest(
- stock_service.STOCK_TEST_SERVICE, self.control, None)
-
- self.stub, self.memo = self.set_up_implementation(
- self.digest.name, self.digest.methods,
- self.digest.inline_method_implementations, None)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self.tear_down_implementation(self.memo)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response = self.stub.blocking_value_in_value_out(
- name, request, test_constants.LONG_TIMEOUT)
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- response = self.stub.blocking_stream_in_value_out(
- name, iter(requests), test_constants.LONG_TIMEOUT)
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), test_constants.LONG_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response = self.stub.blocking_value_in_value_out(
- name, first_request, test_constants.SHORT_TIMEOUT)
-
- test_messages.verify(first_request, first_response, self)
-
- second_response = self.stub.blocking_value_in_value_out(
- name, second_request, test_constants.SHORT_TIMEOUT)
-
- test_messages.verify(second_request, second_response, self)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.pause(), self.assertRaises(
- exceptions.ExpirationError):
- multi_callable = self.stub.unary_unary_multi_callable(name)
- multi_callable(request, test_constants.SHORT_TIMEOUT)
-
- def testExpiredUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.pause(), self.assertRaises(
- exceptions.ExpirationError):
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.SHORT_TIMEOUT)
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.pause(), self.assertRaises(
- exceptions.ExpirationError):
- multi_callable = self.stub.stream_unary_multi_callable(name)
- multi_callable(iter(requests), test_constants.SHORT_TIMEOUT)
-
- def testExpiredStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.pause(), self.assertRaises(
- exceptions.ExpirationError):
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- self.stub.blocking_value_in_value_out(name, request,
- test_constants.SHORT_TIMEOUT)
-
- def testFailedUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- self.stub.blocking_stream_in_value_out(name, iter(requests),
- test_constants.SHORT_TIMEOUT)
-
- def testFailedStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.fail(), self.assertRaises(exceptions.ServicerError):
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
- list(response_iterator)
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/callback.py b/src/python/grpcio/tests/unit/framework/face/testing/callback.py
deleted file mode 100644
index d0e63c8c56..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/callback.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A utility useful in tests of asynchronous, event-driven interfaces."""
-
-import threading
-
-from grpc.framework.foundation import stream
-
-
-class Callback(stream.Consumer):
- """A utility object useful in tests of asynchronous code."""
-
- def __init__(self):
- self._condition = threading.Condition()
- self._unary_response = None
- self._streamed_responses = []
- self._completed = False
- self._abortion = None
-
- def abort(self, abortion):
- with self._condition:
- self._abortion = abortion
- self._condition.notify_all()
-
- def complete(self, unary_response):
- with self._condition:
- self._unary_response = unary_response
- self._completed = True
- self._condition.notify_all()
-
- def consume(self, streamed_response):
- with self._condition:
- self._streamed_responses.append(streamed_response)
-
- def terminate(self):
- with self._condition:
- self._completed = True
- self._condition.notify_all()
-
- def consume_and_terminate(self, streamed_response):
- with self._condition:
- self._streamed_responses.append(streamed_response)
- self._completed = True
- self._condition.notify_all()
-
- def block_until_terminated(self):
- with self._condition:
- while self._abortion is None and not self._completed:
- self._condition.wait()
-
- def response(self):
- with self._condition:
- if self._abortion is None:
- return self._unary_response
- else:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
-
- def responses(self):
- with self._condition:
- if self._abortion is None:
- return list(self._streamed_responses)
- else:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
-
- def abortion(self):
- with self._condition:
- return self._abortion
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/control.py b/src/python/grpcio/tests/unit/framework/face/testing/control.py
deleted file mode 100644
index 8425affcc9..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/control.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Code for instructing systems under test to block or fail."""
-
-import abc
-import contextlib
-import threading
-
-import six
-
-
-class Control(six.with_metaclass(abc.ABCMeta)):
- """An object that accepts program control from a system under test.
-
- Systems under test passed a Control should call its control() method
- frequently during execution. The control() method may block, raise an
- exception, or do nothing, all according to the enclosing test's desire for
- the system under test to simulate hanging, failing, or functioning.
- """
-
- @abc.abstractmethod
- def control(self):
- """Potentially does anything."""
- raise NotImplementedError()
-
-
-class PauseFailControl(Control):
- """A Control that can be used to pause or fail code under control."""
-
- def __init__(self):
- self._condition = threading.Condition()
- self._paused = False
- self._fail = False
-
- def control(self):
- with self._condition:
- if self._fail:
- raise ValueError()
-
- while self._paused:
- self._condition.wait()
-
- @contextlib.contextmanager
- def pause(self):
- """Pauses code under control while controlling code is in context."""
- with self._condition:
- self._paused = True
- yield
- with self._condition:
- self._paused = False
- self._condition.notify_all()
-
- @contextlib.contextmanager
- def fail(self):
- """Fails code under control while controlling code is in context."""
- with self._condition:
- self._fail = True
- yield
- with self._condition:
- self._fail = False
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/coverage.py b/src/python/grpcio/tests/unit/framework/face/testing/coverage.py
deleted file mode 100644
index 3c88b7841a..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/coverage.py
+++ /dev/null
@@ -1,121 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Governs coverage for the tests of the Face layer of RPC Framework."""
-
-import abc
-
-import six
-
-# These classes are only valid when inherited by unittest.TestCases.
-# pylint: disable=invalid-name
-
-
-class BlockingCoverage(six.with_metaclass(abc.ABCMeta)):
- """Specification of test coverage for blocking behaviors."""
-
- @abc.abstractmethod
- def testSuccessfulUnaryRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testSuccessfulUnaryRequestStreamResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testSuccessfulStreamRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testSuccessfulStreamRequestStreamResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testSequentialInvocations(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testExpiredUnaryRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testExpiredUnaryRequestStreamResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testExpiredStreamRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testExpiredStreamRequestStreamResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testFailedUnaryRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testFailedUnaryRequestStreamResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testFailedStreamRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testFailedStreamRequestStreamResponse(self):
- raise NotImplementedError()
-
-
-class FullCoverage(six.with_metaclass(abc.ABCMeta, BlockingCoverage)):
- """Specification of test coverage for non-blocking behaviors."""
-
- @abc.abstractmethod
- def testParallelInvocations(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testCancelledUnaryRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testCancelledUnaryRequestStreamResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testCancelledStreamRequestUnaryResponse(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def testCancelledStreamRequestStreamResponse(self):
- raise NotImplementedError()
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/digest.py b/src/python/grpcio/tests/unit/framework/face/testing/digest.py
deleted file mode 100644
index 2b45aded20..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/digest.py
+++ /dev/null
@@ -1,452 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Code for making a service.TestService more amenable to use in tests."""
-
-import collections
-import threading
-
-import six
-
-# testing_control, interfaces, and testing_service are referenced from
-# specification in this module.
-from grpc.framework.common import cardinality
-from grpc.framework.common import style
-from grpc.framework.face import exceptions
-from grpc.framework.face import interfaces as face_interfaces
-from grpc.framework.foundation import stream
-from grpc.framework.foundation import stream_util
-from tests.unit.framework.face.testing import control as testing_control # pylint: disable=unused-import
-from tests.unit.framework.face.testing import interfaces # pylint: disable=unused-import
-from tests.unit.framework.face.testing import service as testing_service # pylint: disable=unused-import
-
-_IDENTITY = lambda x: x
-
-
-class TestServiceDigest(
- collections.namedtuple(
- 'TestServiceDigest',
- ['name',
- 'methods',
- 'inline_method_implementations',
- 'event_method_implementations',
- 'multi_method_implementation',
- 'unary_unary_messages_sequences',
- 'unary_stream_messages_sequences',
- 'stream_unary_messages_sequences',
- 'stream_stream_messages_sequences'])):
- """A transformation of a service.TestService.
-
- Attributes:
- name: The RPC service name to be used in the test.
- methods: A sequence of interfaces.Method objects describing the RPC
- methods that will be called during the test.
- inline_method_implementations: A dict from RPC method name to
- face_interfaces.MethodImplementation object to be used in tests of
- in-line calls to behaviors under test.
- event_method_implementations: A dict from RPC method name to
- face_interfaces.MethodImplementation object to be used in tests of
- event-driven calls to behaviors under test.
- multi_method_implementation: A face_interfaces.MultiMethodImplementation to
- be used in tests of generic calls to behaviors under test.
- unary_unary_messages_sequences: A dict from method name to sequence of
- service.UnaryUnaryTestMessages objects to be used to test the method
- with the given name.
- unary_stream_messages_sequences: A dict from method name to sequence of
- service.UnaryStreamTestMessages objects to be used to test the method
- with the given name.
- stream_unary_messages_sequences: A dict from method name to sequence of
- service.StreamUnaryTestMessages objects to be used to test the method
- with the given name.
- stream_stream_messages_sequences: A dict from method name to sequence of
- service.StreamStreamTestMessages objects to be used to test the
- method with the given name.
- serialization: A serial.Serialization object describing serialization
- behaviors for all the RPC methods.
- """
-
-
-class _BufferingConsumer(stream.Consumer):
- """A trivial Consumer that dumps what it consumes in a user-mutable buffer."""
-
- def __init__(self):
- self.consumed = []
- self.terminated = False
-
- def consume(self, value):
- self.consumed.append(value)
-
- def terminate(self):
- self.terminated = True
-
- def consume_and_terminate(self, value):
- self.consumed.append(value)
- self.terminated = True
-
-
-class _InlineUnaryUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_unary_test_method, control):
- self._test_method = unary_unary_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.INLINE
-
- def unary_unary_inline(self, request, context):
- response_list = []
- self._test_method.service(
- request, response_list.append, context, self._control)
- return response_list.pop(0)
-
-
-class _EventUnaryUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_unary_test_method, control, pool):
- self._test_method = unary_unary_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.UNARY_UNARY
- self.style = style.Service.EVENT
-
- def unary_unary_event(self, request, response_callback, context):
- if self._pool is None:
- self._test_method.service(
- request, response_callback, context, self._control)
- else:
- self._pool.submit(
- self._test_method.service, request, response_callback, context,
- self._control)
-
-
-class _InlineUnaryStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_stream_test_method, control):
- self._test_method = unary_stream_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.INLINE
-
- def unary_stream_inline(self, request, context):
- response_consumer = _BufferingConsumer()
- self._test_method.service(
- request, response_consumer, context, self._control)
- for response in response_consumer.consumed:
- yield response
-
-
-class _EventUnaryStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, unary_stream_test_method, control, pool):
- self._test_method = unary_stream_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.UNARY_STREAM
- self.style = style.Service.EVENT
-
- def unary_stream_event(self, request, response_consumer, context):
- if self._pool is None:
- self._test_method.service(
- request, response_consumer, context, self._control)
- else:
- self._pool.submit(
- self._test_method.service, request, response_consumer, context,
- self._control)
-
-
-class _InlineStreamUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_unary_test_method, control):
- self._test_method = stream_unary_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.INLINE
-
- def stream_unary_inline(self, request_iterator, context):
- response_list = []
- request_consumer = self._test_method.service(
- response_list.append, context, self._control)
- for request in request_iterator:
- request_consumer.consume(request)
- request_consumer.terminate()
- return response_list.pop(0)
-
-
-class _EventStreamUnaryMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_unary_test_method, control, pool):
- self._test_method = stream_unary_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.STREAM_UNARY
- self.style = style.Service.EVENT
-
- def stream_unary_event(self, response_callback, context):
- request_consumer = self._test_method.service(
- response_callback, context, self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
-
-
-class _InlineStreamStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_stream_test_method, control):
- self._test_method = stream_stream_test_method
- self._control = control
-
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.INLINE
-
- def stream_stream_inline(self, request_iterator, context):
- response_consumer = _BufferingConsumer()
- request_consumer = self._test_method.service(
- response_consumer, context, self._control)
-
- for request in request_iterator:
- request_consumer.consume(request)
- while response_consumer.consumed:
- yield response_consumer.consumed.pop(0)
- response_consumer.terminate()
-
-
-class _EventStreamStreamMethod(face_interfaces.MethodImplementation):
-
- def __init__(self, stream_stream_test_method, control, pool):
- self._test_method = stream_stream_test_method
- self._control = control
- self._pool = pool
-
- self.cardinality = cardinality.Cardinality.STREAM_STREAM
- self.style = style.Service.EVENT
-
- def stream_stream_event(self, response_consumer, context):
- request_consumer = self._test_method.service(
- response_consumer, context, self._control)
- if self._pool is None:
- return request_consumer
- else:
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
-
-
-class _UnaryConsumer(stream.Consumer):
- """A Consumer that only allows consumption of exactly one value."""
-
- def __init__(self, action):
- self._lock = threading.Lock()
- self._action = action
- self._consumed = False
- self._terminated = False
-
- def consume(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
-
- self._action(value)
-
- def terminate(self):
- with self._lock:
- if not self._consumed:
- raise ValueError('Unary consumer hasn\'t yet consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._terminated = True
-
- def consume_and_terminate(self, value):
- with self._lock:
- if self._consumed:
- raise ValueError('Unary consumer already consumed!')
- elif self._terminated:
- raise ValueError('Unary consumer already terminated!')
- else:
- self._consumed = True
- self._terminated = True
-
- self._action(value)
-
-
-class _UnaryUnaryAdaptation(object):
-
- def __init__(self, unary_unary_test_method):
- self._method = unary_unary_test_method
-
- def service(self, response_consumer, context, control):
- def action(request):
- self._method.service(
- request, response_consumer.consume_and_terminate, context, control)
- return _UnaryConsumer(action)
-
-
-class _UnaryStreamAdaptation(object):
-
- def __init__(self, unary_stream_test_method):
- self._method = unary_stream_test_method
-
- def service(self, response_consumer, context, control):
- def action(request):
- self._method.service(request, response_consumer, context, control)
- return _UnaryConsumer(action)
-
-
-class _StreamUnaryAdaptation(object):
-
- def __init__(self, stream_unary_test_method):
- self._method = stream_unary_test_method
-
- def service(self, response_consumer, context, control):
- return self._method.service(
- response_consumer.consume_and_terminate, context, control)
-
-
-class _MultiMethodImplementation(face_interfaces.MultiMethodImplementation):
-
- def __init__(self, methods, control, pool):
- self._methods = methods
- self._control = control
- self._pool = pool
-
- def service(self, name, response_consumer, context):
- method = self._methods.get(name, None)
- if method is None:
- raise exceptions.NoSuchMethodError(name)
- elif self._pool is None:
- return method(response_consumer, context, self._control)
- else:
- request_consumer = method(response_consumer, context, self._control)
- return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool)
-
-
-class _Assembly(
- collections.namedtuple(
- '_Assembly',
- ['methods', 'inlines', 'events', 'adaptations', 'messages'])):
- """An intermediate structure created when creating a TestServiceDigest."""
-
-
-def _assemble(
- scenarios, names, inline_method_constructor, event_method_constructor,
- adapter, control, pool):
- """Creates an _Assembly from the given scenarios."""
- methods = []
- inlines = {}
- events = {}
- adaptations = {}
- messages = {}
- for name, scenario in six.iteritems(scenarios):
- if name in names:
- raise ValueError('Repeated name "%s"!' % name)
-
- test_method = scenario[0]
- inline_method = inline_method_constructor(test_method, control)
- event_method = event_method_constructor(test_method, control, pool)
- adaptation = adapter(test_method)
-
- methods.append(test_method)
- inlines[name] = inline_method
- events[name] = event_method
- adaptations[name] = adaptation
- messages[name] = scenario[1]
-
- return _Assembly(methods, inlines, events, adaptations, messages)
-
-
-def digest(service, control, pool):
- """Creates a TestServiceDigest from a TestService.
-
- Args:
- service: A testing_service.TestService.
- control: A testing_control.Control.
- pool: If RPC methods should be serviced in a separate thread, a thread pool.
- None if RPC methods should be serviced in the thread belonging to the
- run-time that calls for their service.
-
- Returns:
- A TestServiceDigest synthesized from the given service.TestService.
- """
- names = set()
-
- unary_unary = _assemble(
- service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod,
- _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool)
- names.update(set(unary_unary.inlines))
-
- unary_stream = _assemble(
- service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod,
- _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool)
- names.update(set(unary_stream.inlines))
-
- stream_unary = _assemble(
- service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod,
- _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool)
- names.update(set(stream_unary.inlines))
-
- stream_stream = _assemble(
- service.stream_stream_scenarios(), names, _InlineStreamStreamMethod,
- _EventStreamStreamMethod, _IDENTITY, control, pool)
- names.update(set(stream_stream.inlines))
-
- methods = list(unary_unary.methods)
- methods.extend(unary_stream.methods)
- methods.extend(stream_unary.methods)
- methods.extend(stream_stream.methods)
- adaptations = dict(unary_unary.adaptations)
- adaptations.update(unary_stream.adaptations)
- adaptations.update(stream_unary.adaptations)
- adaptations.update(stream_stream.adaptations)
- inlines = dict(unary_unary.inlines)
- inlines.update(unary_stream.inlines)
- inlines.update(stream_unary.inlines)
- inlines.update(stream_stream.inlines)
- events = dict(unary_unary.events)
- events.update(unary_stream.events)
- events.update(stream_unary.events)
- events.update(stream_stream.events)
-
- return TestServiceDigest(
- service.name(),
- methods,
- inlines,
- events,
- _MultiMethodImplementation(adaptations, control, pool),
- unary_unary.messages,
- unary_stream.messages,
- stream_unary.messages,
- stream_stream.messages)
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
deleted file mode 100644
index 98b61e492c..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py
+++ /dev/null
@@ -1,378 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A test to verify an implementation of the Face layer of RPC Framework."""
-
-import abc
-import unittest
-
-import six
-
-from grpc.framework.face import interfaces
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.face.testing import callback as testing_callback
-from tests.unit.framework.face.testing import control
-from tests.unit.framework.face.testing import coverage
-from tests.unit.framework.face.testing import digest
-from tests.unit.framework.face.testing import stock_service
-from tests.unit.framework.face.testing import test_case
-
-
-class EventInvocationSynchronousEventServiceTestCase(
- six.with_metaclass(abc.ABCMeta,
- test_case.FaceTestCase, coverage.FullCoverage)):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must also extend unittest.TestCase.
- """
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self.control = control.PauseFailControl()
- self.digest = digest.digest(
- stock_service.STOCK_TEST_SERVICE, self.control, None)
-
- self.stub, self.memo = self.set_up_implementation(
- self.digest.name, self.digest.methods,
- self.digest.event_method_implementations, None)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self.tear_down_implementation(self.memo)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
- response = callback.response()
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
- responses = callback.responses()
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = testing_callback.Callback()
-
- unused_call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- for request in requests:
- request_consumer.consume(request)
- request_consumer.terminate()
- callback.block_until_terminated()
- response = callback.response()
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = testing_callback.Callback()
-
- unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
- for request in requests:
- request_consumer.consume(request)
- request_consumer.terminate()
- callback.block_until_terminated()
- responses = callback.responses()
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- # pylint: disable=cell-var-from-loop
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
- first_callback = testing_callback.Callback()
- second_callback = testing_callback.Callback()
-
- def make_second_invocation(first_response):
- first_callback.complete(first_response)
- self.stub.event_value_in_value_out(
- name, second_request, second_callback.complete,
- second_callback.abort, test_constants.SHORT_TIMEOUT)
-
- self.stub.event_value_in_value_out(
- name, first_request, make_second_invocation, first_callback.abort,
- test_constants.SHORT_TIMEOUT)
- second_callback.block_until_terminated()
-
- first_response = first_callback.response()
- second_response = second_callback.response()
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- with self.control.pause():
- self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
-
- def testExpiredUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- with self.control.pause():
- self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
-
- def testExpiredStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for unused_test_messages in test_messages_sequence:
- callback = testing_callback.Callback()
-
- self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
-
- def testExpiredStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = testing_callback.Callback()
-
- unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
- for request in requests:
- request_consumer.consume(request)
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
-
- def testFailedUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- with self.control.fail():
- self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
- callback.abortion())
-
- def testFailedUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- with self.control.fail():
- self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort,
- test_constants.SHORT_TIMEOUT)
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
- callback.abortion())
-
- def testFailedStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = testing_callback.Callback()
-
- with self.control.fail():
- unused_call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- for request in requests:
- request_consumer.consume(request)
- request_consumer.terminate()
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE,
- callback.abortion())
-
- def testFailedStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = testing_callback.Callback()
-
- with self.control.fail():
- unused_call, request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
- for request in requests:
- request_consumer.consume(request)
- request_consumer.terminate()
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
-
- def testParallelInvocations(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- first_callback = testing_callback.Callback()
- second_request = test_messages.request()
- second_callback = testing_callback.Callback()
-
- self.stub.event_value_in_value_out(
- name, first_request, first_callback.complete, first_callback.abort,
- test_constants.SHORT_TIMEOUT)
- self.stub.event_value_in_value_out(
- name, second_request, second_callback.complete,
- second_callback.abort, test_constants.SHORT_TIMEOUT)
- first_callback.block_until_terminated()
- second_callback.block_until_terminated()
-
- first_response = first_callback.response()
- second_response = second_callback.response()
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- with self.control.pause():
- call = self.stub.event_value_in_value_out(
- name, request, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- call.cancel()
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
-
- def testCancelledUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
- callback = testing_callback.Callback()
-
- call = self.stub.event_value_in_stream_out(
- name, request, callback, callback.abort,
- test_constants.SHORT_TIMEOUT)
- call.cancel()
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
-
- def testCancelledStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- callback = testing_callback.Callback()
-
- call, request_consumer = self.stub.event_stream_in_value_out(
- name, callback.complete, callback.abort,
- test_constants.SHORT_TIMEOUT)
- for request in requests:
- request_consumer.consume(request)
- call.cancel()
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
-
- def testCancelledStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for unused_test_messages in test_messages_sequence:
- callback = testing_callback.Callback()
-
- call, unused_request_consumer = self.stub.event_stream_in_stream_out(
- name, callback, callback.abort, test_constants.SHORT_TIMEOUT)
- call.cancel()
- callback.block_until_terminated()
-
- self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
deleted file mode 100644
index cae791af97..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
+++ /dev/null
@@ -1,384 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""A test to verify an implementation of the Face layer of RPC Framework."""
-
-import abc
-import contextlib
-import threading
-import unittest
-
-import six
-
-from grpc.framework.face import exceptions
-from grpc.framework.foundation import future
-from grpc.framework.foundation import logging_pool
-from tests.unit.framework.common import test_constants
-from tests.unit.framework.face.testing import control
-from tests.unit.framework.face.testing import coverage
-from tests.unit.framework.face.testing import digest
-from tests.unit.framework.face.testing import stock_service
-from tests.unit.framework.face.testing import test_case
-
-_MAXIMUM_POOL_SIZE = 10
-
-
-class _PauseableIterator(object):
-
- def __init__(self, upstream):
- self._upstream = upstream
- self._condition = threading.Condition()
- self._paused = False
-
- @contextlib.contextmanager
- def pause(self):
- with self._condition:
- self._paused = True
- yield
- with self._condition:
- self._paused = False
- self._condition.notify_all()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- return self.next()
-
- def next(self):
- with self._condition:
- while self._paused:
- self._condition.wait()
- return next(self._upstream)
-
-
-class FutureInvocationAsynchronousEventServiceTestCase(
- six.with_metaclass(abc.ABCMeta,
- test_case.FaceTestCase, coverage.FullCoverage)):
- """A test of the Face layer of RPC Framework.
-
- Concrete subclasses must also extend unittest.TestCase.
- """
-
- def setUp(self):
- """See unittest.TestCase.setUp for full specification.
-
- Overriding implementations must call this implementation.
- """
- self.control = control.PauseFailControl()
- self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE)
- self.digest = digest.digest(
- stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool)
-
- self.stub, self.memo = self.set_up_implementation(
- self.digest.name, self.digest.methods,
- self.digest.event_method_implementations, None)
-
- def tearDown(self):
- """See unittest.TestCase.tearDown for full specification.
-
- Overriding implementations must call this implementation.
- """
- self.tear_down_implementation(self.memo)
- self.digest_pool.shutdown(wait=True)
-
- def testSuccessfulUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_future = self.stub.future_value_in_value_out(
- name, request, test_constants.SHORT_TIMEOUT)
- response = response_future.result()
-
- test_messages.verify(request, response, self)
-
- def testSuccessfulUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.SHORT_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(request, responses, self)
-
- def testSuccessfulStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_future = self.stub.future_stream_in_value_out(
- name, request_iterator, test_constants.SHORT_TIMEOUT)
- response = response_future.result()
-
- test_messages.verify(requests, response, self)
-
- def testSuccessfulStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
- request_iterator = _PauseableIterator(iter(requests))
-
- # Use of a paused iterator of requests allows us to test that control is
- # returned to calling code before the iterator yields any requests.
- with request_iterator.pause():
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, request_iterator, test_constants.SHORT_TIMEOUT)
- responses = list(response_iterator)
-
- test_messages.verify(requests, responses, self)
-
- def testSequentialInvocations(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self.stub.future_value_in_value_out(
- name, first_request, test_constants.SHORT_TIMEOUT)
- first_response = first_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
-
- second_response_future = self.stub.future_value_in_value_out(
- name, second_request, test_constants.SHORT_TIMEOUT)
- second_response = second_response_future.result()
-
- test_messages.verify(second_request, second_response, self)
-
- def testExpiredUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.pause():
- multi_callable = self.stub.unary_unary_multi_callable(name)
- response_future = multi_callable.future(request,
- test_constants.SHORT_TIMEOUT)
- self.assertIsInstance(
- response_future.exception(), exceptions.ExpirationError)
- with self.assertRaises(exceptions.ExpirationError):
- response_future.result()
-
- def testExpiredUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.pause():
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.SHORT_TIMEOUT)
- with self.assertRaises(exceptions.ExpirationError):
- list(response_iterator)
-
- def testExpiredStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.pause():
- multi_callable = self.stub.stream_unary_multi_callable(name)
- response_future = multi_callable.future(iter(requests),
- test_constants.SHORT_TIMEOUT)
- self.assertIsInstance(
- response_future.exception(), exceptions.ExpirationError)
- with self.assertRaises(exceptions.ExpirationError):
- response_future.result()
-
- def testExpiredStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.pause():
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
- with self.assertRaises(exceptions.ExpirationError):
- list(response_iterator)
-
- def testFailedUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.fail():
- response_future = self.stub.future_value_in_value_out(
- name, request, test_constants.SHORT_TIMEOUT)
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(
- response_future.exception(), exceptions.ExpirationError)
- with self.assertRaises(exceptions.ExpirationError):
- response_future.result()
-
- def testFailedUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.SHORT_TIMEOUT)
- list(response_iterator)
-
- def testFailedStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.fail():
- response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is
- # indistinguishable from simply not having called its
- # response_callback before the expiration of the RPC.
- self.assertIsInstance(
- response_future.exception(), exceptions.ExpirationError)
- with self.assertRaises(exceptions.ExpirationError):
- response_future.result()
-
- def testFailedStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- # Because the servicer fails outside of the thread from which the
- # servicer-side runtime called into it its failure is indistinguishable
- # from simply not having called its response_consumer before the
- # expiration of the RPC.
- with self.control.fail(), self.assertRaises(exceptions.ExpirationError):
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
- list(response_iterator)
-
- def testParallelInvocations(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- # TODO(bug 2039): use LONG_TIMEOUT instead
- first_response_future = self.stub.future_value_in_value_out(
- name, first_request, test_constants.SHORT_TIMEOUT)
- second_response_future = self.stub.future_value_in_value_out(
- name, second_request, test_constants.SHORT_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
-
- def testCancelledUnaryRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.pause():
- response_future = self.stub.future_value_in_value_out(
- name, request, test_constants.SHORT_TIMEOUT)
- cancel_method_return_value = response_future.cancel()
-
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
-
- def testCancelledUnaryRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.unary_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- request = test_messages.request()
-
- with self.control.pause():
- response_iterator = self.stub.inline_value_in_stream_out(
- name, request, test_constants.SHORT_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(future.CancelledError):
- next(response_iterator)
-
- def testCancelledStreamRequestUnaryResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_unary_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.pause():
- response_future = self.stub.future_stream_in_value_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
- cancel_method_return_value = response_future.cancel()
-
- self.assertFalse(cancel_method_return_value)
- self.assertTrue(response_future.cancelled())
-
- def testCancelledStreamRequestStreamResponse(self):
- for name, test_messages_sequence in (
- six.iteritems(self.digest.stream_stream_messages_sequences)):
- for test_messages in test_messages_sequence:
- requests = test_messages.requests()
-
- with self.control.pause():
- response_iterator = self.stub.inline_stream_in_stream_out(
- name, iter(requests), test_constants.SHORT_TIMEOUT)
- response_iterator.cancel()
-
- with self.assertRaises(future.CancelledError):
- next(response_iterator)
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/interfaces.py b/src/python/grpcio/tests/unit/framework/face/testing/interfaces.py
deleted file mode 100644
index 8a25f89c88..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/interfaces.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Interfaces implemented by data sets used in Face-layer tests."""
-
-import abc
-
-import six
-
-# cardinality is referenced from specification in this module.
-from grpc.framework.common import cardinality # pylint: disable=unused-import
-
-
-class Method(six.with_metaclass(abc.ABCMeta)):
- """An RPC method to be used in tests of RPC implementations."""
-
- @abc.abstractmethod
- def name(self):
- """Identify the name of the method.
-
- Returns:
- The name of the method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def cardinality(self):
- """Identify the cardinality of the method.
-
- Returns:
- A cardinality.Cardinality value describing the streaming semantics of the
- method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def request_class(self):
- """Identify the class used for the method's request objects.
-
- Returns:
- The class object of the class to which the method's request objects
- belong.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def response_class(self):
- """Identify the class used for the method's response objects.
-
- Returns:
- The class object of the class to which the method's response objects
- belong.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def serialize_request(self, request):
- """Serialize the given request object.
-
- Args:
- request: A request object appropriate for this method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def deserialize_request(self, serialized_request):
- """Synthesize a request object from a given bytestring.
-
- Args:
- serialized_request: A bytestring deserializable into a request object
- appropriate for this method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def serialize_response(self, response):
- """Serialize the given response object.
-
- Args:
- response: A response object appropriate for this method.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def deserialize_response(self, serialized_response):
- """Synthesize a response object from a given bytestring.
-
- Args:
- serialized_response: A bytestring deserializable into a response object
- appropriate for this method.
- """
- raise NotImplementedError()
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/service.py b/src/python/grpcio/tests/unit/framework/face/testing/service.py
deleted file mode 100644
index 3e4228cc07..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/service.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Private interfaces implemented by data sets used in Face-layer tests."""
-
-import abc
-
-import six
-
-# interfaces is referenced from specification in this module.
-from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
-from tests.unit.framework.face.testing import interfaces
-
-
-class UnaryUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, interfaces.Method)):
- """A controllable implementation of a unary-unary RPC method."""
-
- @abc.abstractmethod
- def service(self, request, response_callback, context, control):
- """Services an RPC that accepts one message and produces one message.
-
- Args:
- request: The single request message for the RPC.
- response_callback: A callback to be called to accept the response message
- of the RPC.
- context: An face_interfaces.RpcContext object.
- control: A test_control.Control to control execution of this method.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class UnaryUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for unary-request-unary-response message pairings."""
-
- @abc.abstractmethod
- def request(self):
- """Affords a request message.
-
- Implementations of this method should return a different message with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A request message.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, request, response, test_case):
- """Verifies that the computed response matches the given request.
-
- Args:
- request: A request message.
- response: A response message.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the request and response do not match, indicating that
- there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class UnaryStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, interfaces.Method)):
- """A controllable implementation of a unary-stream RPC method."""
-
- @abc.abstractmethod
- def service(self, request, response_consumer, context, control):
- """Services an RPC that takes one message and produces a stream of messages.
-
- Args:
- request: The single request message for the RPC.
- response_consumer: A stream.Consumer to be called to accept the response
- messages of the RPC.
- context: A face_interfaces.RpcContext object.
- control: A test_control.Control to control execution of this method.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class UnaryStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for unary-request-stream-response message pairings."""
-
- @abc.abstractmethod
- def request(self):
- """Affords a request message.
-
- Implementations of this method should return a different message with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A request message.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, request, responses, test_case):
- """Verifies that the computed responses match the given request.
-
- Args:
- request: A request message.
- responses: A sequence of response messages.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the request and responses do not match, indicating that
- there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class StreamUnaryTestMethodImplementation(six.with_metaclass(abc.ABCMeta, interfaces.Method)):
- """A controllable implementation of a stream-unary RPC method."""
-
- @abc.abstractmethod
- def service(self, response_callback, context, control):
- """Services an RPC that takes a stream of messages and produces one message.
-
- Args:
- response_callback: A callback to be called to accept the response message
- of the RPC.
- context: A face_interfaces.RpcContext object.
- control: A test_control.Control to control execution of this method.
-
- Returns:
- A stream.Consumer with which to accept the request messages of the RPC.
- The consumer returned from this method may or may not be invoked to
- completion: in the case of RPC abortion, RPC Framework will simply stop
- passing messages to this object. Implementations must not assume that
- this object will be called to completion of the request stream or even
- called at all.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class StreamUnaryTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for stream-request-unary-response message pairings."""
-
- @abc.abstractmethod
- def requests(self):
- """Affords a sequence of request messages.
-
- Implementations of this method should return a different sequences with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A sequence of request messages.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, requests, response, test_case):
- """Verifies that the computed response matches the given requests.
-
- Args:
- requests: A sequence of request messages.
- response: A response message.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the requests and response do not match, indicating that
- there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class StreamStreamTestMethodImplementation(six.with_metaclass(abc.ABCMeta, interfaces.Method)):
- """A controllable implementation of a stream-stream RPC method."""
-
- @abc.abstractmethod
- def service(self, response_consumer, context, control):
- """Services an RPC that accepts and produces streams of messages.
-
- Args:
- response_consumer: A stream.Consumer to be called to accept the response
- messages of the RPC.
- context: A face_interfaces.RpcContext object.
- control: A test_control.Control to control execution of this method.
-
- Returns:
- A stream.Consumer with which to accept the request messages of the RPC.
- The consumer returned from this method may or may not be invoked to
- completion: in the case of RPC abortion, RPC Framework will simply stop
- passing messages to this object. Implementations must not assume that
- this object will be called to completion of the request stream or even
- called at all.
-
- Raises:
- abandonment.Abandoned: May or may not be raised when the RPC has been
- aborted.
- """
- raise NotImplementedError()
-
-
-class StreamStreamTestMessages(six.with_metaclass(abc.ABCMeta)):
- """A type for stream-request-stream-response message pairings."""
-
- @abc.abstractmethod
- def requests(self):
- """Affords a sequence of request messages.
-
- Implementations of this method should return a different sequences with each
- call so that multiple test executions of the test method may be made with
- different inputs.
-
- Returns:
- A sequence of request messages.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def verify(self, requests, responses, test_case):
- """Verifies that the computed response matches the given requests.
-
- Args:
- requests: A sequence of request messages.
- responses: A sequence of response messages.
- test_case: A unittest.TestCase object affording useful assertion methods.
-
- Raises:
- AssertionError: If the requests and responses do not match, indicating
- that there was some problem executing the RPC under test.
- """
- raise NotImplementedError()
-
-
-class TestService(six.with_metaclass(abc.ABCMeta)):
- """A specification of implemented RPC methods to use in tests."""
-
- @abc.abstractmethod
- def name(self):
- """Identifies the RPC service name used during the test.
-
- Returns:
- The RPC service name to be used for the test.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def unary_unary_scenarios(self):
- """Affords unary-request-unary-response test methods and their messages.
-
- Returns:
- A dict from method name to pair. The first element of the pair
- is a UnaryUnaryTestMethodImplementation object and the second element
- is a sequence of UnaryUnaryTestMethodMessages objects.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def unary_stream_scenarios(self):
- """Affords unary-request-stream-response test methods and their messages.
-
- Returns:
- A dict from method name to pair. The first element of the pair is a
- UnaryStreamTestMethodImplementation object and the second element is a
- sequence of UnaryStreamTestMethodMessages objects.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stream_unary_scenarios(self):
- """Affords stream-request-unary-response test methods and their messages.
-
- Returns:
- A dict from method name to pair. The first element of the pair is a
- StreamUnaryTestMethodImplementation object and the second element is a
- sequence of StreamUnaryTestMethodMessages objects.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stream_stream_scenarios(self):
- """Affords stream-request-stream-response test methods and their messages.
-
- Returns:
- A dict from method name to pair. The first element of the pair is a
- StreamStreamTestMethodImplementation object and the second element is a
- sequence of StreamStreamTestMethodMessages objects.
- """
- raise NotImplementedError()
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/stock_service.py b/src/python/grpcio/tests/unit/framework/face/testing/stock_service.py
deleted file mode 100644
index 117c723f79..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/stock_service.py
+++ /dev/null
@@ -1,374 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Examples of Python implementations of the stock.proto Stock service."""
-
-from grpc.framework.common import cardinality
-from grpc.framework.foundation import abandonment
-from grpc.framework.foundation import stream
-from grpc.framework.foundation import stream_util
-from tests.unit.framework.face.testing import service
-from tests.unit._junkdrawer import stock_pb2
-
-SYMBOL_FORMAT = 'test symbol:%03d'
-STREAM_LENGTH = 400
-
-# A test-appropriate security-pricing function. :-P
-_price = lambda symbol_name: float(hash(symbol_name) % 4096)
-
-
-def _get_last_trade_price(stock_request, stock_reply_callback, control, active):
- """A unary-request, unary-response test method."""
- control.control()
- if active():
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=_price(stock_request.symbol)))
- else:
- raise abandonment.Abandoned()
-
-
-def _get_last_trade_price_multiple(stock_reply_consumer, control, active):
- """A stream-request, stream-response test method."""
- def stock_reply_for_stock_request(stock_request):
- control.control()
- if active():
- return stock_pb2.StockReply(
- symbol=stock_request.symbol, price=_price(stock_request.symbol))
- else:
- raise abandonment.Abandoned()
- return stream_util.TransformingConsumer(
- stock_reply_for_stock_request, stock_reply_consumer)
-
-
-def _watch_future_trades(stock_request, stock_reply_consumer, control, active):
- """A unary-request, stream-response test method."""
- base_price = _price(stock_request.symbol)
- for index in range(stock_request.num_trades_to_watch):
- control.control()
- if active():
- stock_reply_consumer.consume(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=base_price + index))
- else:
- raise abandonment.Abandoned()
- stock_reply_consumer.terminate()
-
-
-def _get_highest_trade_price(stock_reply_callback, control, active):
- """A stream-request, unary-response test method."""
-
- class StockRequestConsumer(stream.Consumer):
- """Keeps an ongoing record of the most valuable symbol yet consumed."""
-
- def __init__(self):
- self._symbol = None
- self._price = None
-
- def consume(self, stock_request):
- control.control()
- if active():
- if self._price is None:
- self._symbol = stock_request.symbol
- self._price = _price(stock_request.symbol)
- else:
- candidate_price = _price(stock_request.symbol)
- if self._price < candidate_price:
- self._symbol = stock_request.symbol
- self._price = candidate_price
-
- def terminate(self):
- control.control()
- if active():
- if self._symbol is None:
- raise ValueError()
- else:
- stock_reply_callback(
- stock_pb2.StockReply(symbol=self._symbol, price=self._price))
- self._symbol = None
- self._price = None
-
- def consume_and_terminate(self, stock_request):
- control.control()
- if active():
- if self._price is None:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol,
- price=_price(stock_request.symbol)))
- else:
- candidate_price = _price(stock_request.symbol)
- if self._price < candidate_price:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=stock_request.symbol, price=candidate_price))
- else:
- stock_reply_callback(
- stock_pb2.StockReply(
- symbol=self._symbol, price=self._price))
-
- self._symbol = None
- self._price = None
-
- return StockRequestConsumer()
-
-
-class GetLastTradePrice(service.UnaryUnaryTestMethodImplementation):
- """GetLastTradePrice for use in tests."""
-
- def name(self):
- return 'GetLastTradePrice'
-
- def cardinality(self):
- return cardinality.Cardinality.UNARY_UNARY
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, request, response_callback, context, control):
- _get_last_trade_price(
- request, response_callback, control, context.is_active)
-
-
-class GetLastTradePriceMessages(service.UnaryUnaryTestMessages):
-
- def __init__(self):
- self._index = 0
-
- def request(self):
- symbol = SYMBOL_FORMAT % self._index
- self._index += 1
- return stock_pb2.StockRequest(symbol=symbol)
-
- def verify(self, request, response, test_case):
- test_case.assertEqual(request.symbol, response.symbol)
- test_case.assertEqual(_price(request.symbol), response.price)
-
-
-class GetLastTradePriceMultiple(service.StreamStreamTestMethodImplementation):
- """GetLastTradePriceMultiple for use in tests."""
-
- def name(self):
- return 'GetLastTradePriceMultiple'
-
- def cardinality(self):
- return cardinality.Cardinality.STREAM_STREAM
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, response_consumer, context, control):
- return _get_last_trade_price_multiple(
- response_consumer, control, context.is_active)
-
-
-class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages):
- """Pairs of message streams for use with GetLastTradePriceMultiple."""
-
- def __init__(self):
- self._index = 0
-
- def requests(self):
- base_index = self._index
- self._index += 1
- return [
- stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index))
- for index in range(STREAM_LENGTH)]
-
- def verify(self, requests, responses, test_case):
- test_case.assertEqual(len(requests), len(responses))
- for stock_request, stock_reply in zip(requests, responses):
- test_case.assertEqual(stock_request.symbol, stock_reply.symbol)
- test_case.assertEqual(_price(stock_request.symbol), stock_reply.price)
-
-
-class WatchFutureTrades(service.UnaryStreamTestMethodImplementation):
- """WatchFutureTrades for use in tests."""
-
- def name(self):
- return 'WatchFutureTrades'
-
- def cardinality(self):
- return cardinality.Cardinality.UNARY_STREAM
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, request, response_consumer, context, control):
- _watch_future_trades(request, response_consumer, control, context.is_active)
-
-
-class WatchFutureTradesMessages(service.UnaryStreamTestMessages):
- """Pairs of a single request message and a sequence of response messages."""
-
- def __init__(self):
- self._index = 0
-
- def request(self):
- symbol = SYMBOL_FORMAT % self._index
- self._index += 1
- return stock_pb2.StockRequest(
- symbol=symbol, num_trades_to_watch=STREAM_LENGTH)
-
- def verify(self, request, responses, test_case):
- test_case.assertEqual(STREAM_LENGTH, len(responses))
- base_price = _price(request.symbol)
- for index, response in enumerate(responses):
- test_case.assertEqual(base_price + index, response.price)
-
-
-class GetHighestTradePrice(service.StreamUnaryTestMethodImplementation):
- """GetHighestTradePrice for use in tests."""
-
- def name(self):
- return 'GetHighestTradePrice'
-
- def cardinality(self):
- return cardinality.Cardinality.STREAM_UNARY
-
- def request_class(self):
- return stock_pb2.StockRequest
-
- def response_class(self):
- return stock_pb2.StockReply
-
- def serialize_request(self, request):
- return request.SerializeToString()
-
- def deserialize_request(self, serialized_request):
- return stock_pb2.StockRequest.FromString(serialized_request)
-
- def serialize_response(self, response):
- return response.SerializeToString()
-
- def deserialize_response(self, serialized_response):
- return stock_pb2.StockReply.FromString(serialized_response)
-
- def service(self, response_callback, context, control):
- return _get_highest_trade_price(
- response_callback, control, context.is_active)
-
-
-class GetHighestTradePriceMessages(service.StreamUnaryTestMessages):
-
- def requests(self):
- return [
- stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index)
- for index in range(STREAM_LENGTH)]
-
- def verify(self, requests, response, test_case):
- price = None
- symbol = None
- for stock_request in requests:
- current_symbol = stock_request.symbol
- current_price = _price(current_symbol)
- if price is None or price < current_price:
- price = current_price
- symbol = current_symbol
- test_case.assertEqual(price, response.price)
- test_case.assertEqual(symbol, response.symbol)
-
-
-class StockTestService(service.TestService):
- """A corpus of test data with one method of each RPC cardinality."""
-
- def name(self):
- return 'Stock'
-
- def unary_unary_scenarios(self):
- return {
- 'GetLastTradePrice': (
- GetLastTradePrice(), [GetLastTradePriceMessages()]),
- }
-
- def unary_stream_scenarios(self):
- return {
- 'WatchFutureTrades': (
- WatchFutureTrades(), [WatchFutureTradesMessages()]),
- }
-
- def stream_unary_scenarios(self):
- return {
- 'GetHighestTradePrice': (
- GetHighestTradePrice(), [GetHighestTradePriceMessages()])
- }
-
- def stream_stream_scenarios(self):
- return {
- 'GetLastTradePriceMultiple': (
- GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]),
- }
-
-
-STOCK_TEST_SERVICE = StockTestService()
diff --git a/src/python/grpcio/tests/unit/framework/face/testing/test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/test_case.py
deleted file mode 100644
index f29d400844..0000000000
--- a/src/python/grpcio/tests/unit/framework/face/testing/test_case.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Tools for creating tests of implementations of the Face layer."""
-
-import abc
-
-import six
-
-# face_interfaces and interfaces are referenced in specification in this module.
-from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
-from tests.unit.framework.face.testing import interfaces # pylint: disable=unused-import
-
-
-class FaceTestCase(six.with_metaclass(abc.ABCMeta)):
- """Describes a test of the Face Layer of RPC Framework.
-
- Concrete subclasses must also inherit from unittest.TestCase and from at least
- one class that defines test methods.
- """
-
- @abc.abstractmethod
- def set_up_implementation(
- self, name, methods, method_implementations,
- multi_method_implementation):
- """Instantiates the Face Layer implementation under test.
-
- Args:
- name: The service name to be used in the test.
- methods: A sequence of interfaces.Method objects describing the RPC
- methods that will be called during the test.
- method_implementations: A dictionary from string RPC method name to
- face_interfaces.MethodImplementation object specifying
- implementation of an RPC method.
- multi_method_implementation: An face_interfaces.MultiMethodImplementation
- or None.
-
- Returns:
- A sequence of length two the first element of which is a
- face_interfaces.GenericStub (backed by the given method
- implementations), and the second element of which is an arbitrary memo
- object to be kept and passed to tearDownImplementation at the conclusion
- of the test.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def tear_down_implementation(self, memo):
- """Destroys the Face layer implementation under test.
-
- Args:
- memo: The object from the second position of the return value of
- set_up_implementation.
- """
- raise NotImplementedError()