diff options
author | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-10-21 20:29:23 -0700 |
---|---|---|
committer | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-12-03 16:59:14 -0800 |
commit | 7566c9a85d03d8bedea4fc56ce7a370374b2de0e (patch) | |
tree | d029eb32664a3eb24be7f3637456714770f6b931 /src/python/grpcio/tests/unit/framework | |
parent | 31c16e526f9f267d1f0869e39b6a0e4b87480fc2 (diff) |
Make Python testing predictable again
This reorganizes the Python code, scraps the current testing
infrastructure, and implements a simple test discovery and run script
based on the standard Python unittest library so we can trust that our
tests are running.
Diffstat (limited to 'src/python/grpcio/tests/unit/framework')
48 files changed, 8433 insertions, 0 deletions
diff --git a/src/python/grpcio/tests/unit/framework/__init__.py b/src/python/grpcio/tests/unit/framework/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py new file mode 100644 index 0000000000..360ecc95d5 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/_crust_over_core_face_interface_test.py @@ -0,0 +1,111 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests Face interface compliance of the crust-over-core stack.""" + +import collections +import unittest + +from grpc.framework.core import implementations as core_implementations +from grpc.framework.crust import implementations as crust_implementations +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.links import utilities +from tests.unit.framework.common import test_constants +from tests.unit.framework.interfaces.face import test_cases +from tests.unit.framework.interfaces.face import test_interfaces +from tests.unit.framework.interfaces.links import test_utilities + + +class _Implementation(test_interfaces.Implementation): + + def instantiate( + self, methods, method_implementations, multi_method_implementation): + pool = logging_pool.pool(test_constants.POOL_SIZE) + servicer = crust_implementations.servicer( + method_implementations, multi_method_implementation, pool) + + service_end_link = core_implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + invocation_end_link = core_implementations.invocation_end_link() + invocation_end_link.join_link(service_end_link) + service_end_link.join_link(invocation_end_link) + service_end_link.start() + invocation_end_link.start() + + generic_stub = crust_implementations.generic_stub(invocation_end_link, pool) + # TODO(nathaniel): Add a "groups" attribute to _digest.TestServiceDigest. + group = next(iter(methods))[0] + # TODO(nathaniel): Add a "cardinalities_by_group" attribute to + # _digest.TestServiceDigest. + cardinalities = { + method: method_object.cardinality() + for (group, method), method_object in methods.iteritems()} + dynamic_stub = crust_implementations.dynamic_stub( + invocation_end_link, group, cardinalities, pool) + + return generic_stub, {group: dynamic_stub}, ( + invocation_end_link, service_end_link, pool) + + def destantiate(self, memo): + invocation_end_link, service_end_link, pool = memo + invocation_end_link.stop(0).wait() + service_end_link.stop(0).wait() + invocation_end_link.join_link(utilities.NULL_LINK) + service_end_link.join_link(utilities.NULL_LINK) + pool.shutdown(wait=True) + + def invocation_metadata(self): + return object() + + def initial_metadata(self): + return object() + + def terminal_metadata(self): + return object() + + def code(self): + return object() + + def details(self): + return object() + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return original_metadata is transmitted_metadata + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/common/__init__.py b/src/python/grpcio/tests/unit/framework/common/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/common/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/common/test_constants.py b/src/python/grpcio/tests/unit/framework/common/test_constants.py new file mode 100644 index 0000000000..e1d3c2709d --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/common/test_constants.py @@ -0,0 +1,53 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Constants shared among tests throughout RPC Framework.""" + +# Value for maximum duration in seconds that a test is allowed for its actual +# behavioral logic, excluding all time spent deliberately waiting in the test. +TIME_ALLOWANCE = 10 +# Value for maximum duration in seconds of RPCs that may time out as part of a +# test. +SHORT_TIMEOUT = 4 +# Absurdly large value for maximum duration in seconds for should-not-time-out +# RPCs made during tests. +LONG_TIMEOUT = 3000 +# Values to supply on construction of an object that will service RPCs; these +# should not be used as the actual timeout values of any RPCs made during tests. +DEFAULT_TIMEOUT = 300 +MAXIMUM_TIMEOUT = 3600 + +# The number of payloads to transmit in streaming tests. +STREAM_LENGTH = 200 + +# The size of payloads to transmit in tests. +PAYLOAD_SIZE = 256 * 1024 + 17 + +# The size of thread pools to use in tests. +POOL_SIZE = 10 diff --git a/src/python/grpcio/tests/unit/framework/common/test_control.py b/src/python/grpcio/tests/unit/framework/common/test_control.py new file mode 100644 index 0000000000..8d6eba5c2c --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/common/test_control.py @@ -0,0 +1,95 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for instructing systems under test to block or fail.""" + +import abc +import contextlib +import threading + + +class Defect(Exception): + """Simulates a programming defect raised into in a system under test. + + Use of a standard exception type is too easily misconstrued as an actual + defect in either the test infrastructure or the system under test. + """ + + +class Control(object): + """An object that accepts program control from a system under test. + + Systems under test passed a Control should call its control() method + frequently during execution. The control() method may block, raise an + exception, or do nothing, all according to the enclosing test's desire for + the system under test to simulate hanging, failing, or functioning. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def control(self): + """Potentially does anything.""" + raise NotImplementedError() + + +class PauseFailControl(Control): + """A Control that can be used to pause or fail code under control.""" + + def __init__(self): + self._condition = threading.Condition() + self._paused = False + self._fail = False + + def control(self): + with self._condition: + if self._fail: + raise Defect() + + while self._paused: + self._condition.wait() + + @contextlib.contextmanager + def pause(self): + """Pauses code under control while controlling code is in context.""" + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): + """Fails code under control while controlling code is in context.""" + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False diff --git a/src/python/grpcio/tests/unit/framework/common/test_coverage.py b/src/python/grpcio/tests/unit/framework/common/test_coverage.py new file mode 100644 index 0000000000..a7ed3582c4 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/common/test_coverage.py @@ -0,0 +1,116 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Governs coverage for tests of RPCs throughout RPC Framework.""" + +import abc + +# This code is designed for use with the unittest module. +# pylint: disable=invalid-name + + +class Coverage(object): + """Specification of test coverage.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testSuccessfulUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSequentialInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestStreamResponse(self): + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/core/__init__.py b/src/python/grpcio/tests/unit/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py b/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py new file mode 100644 index 0000000000..1310292306 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/core/_base_interface_test.py @@ -0,0 +1,96 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import logging +import random +import time +import unittest + +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from tests.unit.framework.common import test_constants +from tests.unit.framework.interfaces.base import test_cases +from tests.unit.framework.interfaces.base import test_interfaces + + +class _Implementation(test_interfaces.Implementation): + + def __init__(self): + self._invocation_initial_metadata = object() + self._service_initial_metadata = object() + self._invocation_terminal_metadata = object() + self._service_terminal_metadata = object() + + def instantiate(self, serializations, servicer): + invocation = implementations.invocation_end_link() + service = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + invocation.join_link(service) + service.join_link(invocation) + return invocation, service, None + + def destantiate(self, memo): + pass + + def invocation_initial_metadata(self): + return self._invocation_initial_metadata + + def service_initial_metadata(self): + return self._service_initial_metadata + + def invocation_completion(self): + return utilities.completion(self._invocation_terminal_metadata, None, None) + + def service_completion(self): + return utilities.completion(self._service_terminal_metadata, None, None) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return transmitted_metadata is original_metadata + + def completion_transmitted(self, original_completion, transmitted_completion): + return ( + (original_completion.terminal_metadata is + transmitted_completion.terminal_metadata) and + original_completion.code is transmitted_completion.code and + original_completion.message is transmitted_completion.message + ) + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/face/__init__.py b/src/python/grpcio/tests/unit/framework/face/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/face/testing/__init__.py b/src/python/grpcio/tests/unit/framework/face/testing/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/face/testing/base_util.py b/src/python/grpcio/tests/unit/framework/face/testing/base_util.py new file mode 100644 index 0000000000..1df1529b27 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/base_util.py @@ -0,0 +1,102 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for creating Base-layer objects for use in Face-layer tests.""" + +import abc + +# interfaces is referenced from specification in this module. +from grpc.framework.base import util as _base_util +from grpc.framework.base import implementations +from grpc.framework.base import in_memory +from grpc.framework.base import interfaces # pylint: disable=unused-import +from grpc.framework.foundation import logging_pool + +_POOL_SIZE_LIMIT = 5 + +_MAXIMUM_TIMEOUT = 90 + + +class LinkedPair(object): + """A Front and Back that are linked to one another. + + Attributes: + front: An interfaces.Front. + back: An interfaces.Back. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def shut_down(self): + """Shuts down this object and releases its resources.""" + raise NotImplementedError() + + +class _LinkedPair(LinkedPair): + + def __init__(self, front, back, pools): + self.front = front + self.back = back + self._pools = pools + + def shut_down(self): + _base_util.wait_for_idle(self.front) + _base_util.wait_for_idle(self.back) + + for pool in self._pools: + pool.shutdown(wait=True) + + +def linked_pair(servicer, default_timeout): + """Creates a Server and Stub linked together for use.""" + link_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + front_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_work_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_transmission_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + back_utility_pool = logging_pool.pool(_POOL_SIZE_LIMIT) + pools = ( + link_pool, + front_work_pool, front_transmission_pool, front_utility_pool, + back_work_pool, back_transmission_pool, back_utility_pool) + + link = in_memory.Link(link_pool) + front = implementations.front_link( + front_work_pool, front_transmission_pool, front_utility_pool) + back = implementations.back_link( + servicer, back_work_pool, back_transmission_pool, back_utility_pool, + default_timeout, _MAXIMUM_TIMEOUT) + front.join_rear_link(link) + link.join_fore_link(front) + back.join_fore_link(link) + link.join_rear_link(back) + + return _LinkedPair(front, back, pools) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py new file mode 100644 index 0000000000..0613516421 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/blocking_invocation_inline_service_test_case.py @@ -0,0 +1,222 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +# unittest is referenced from specification in this module. +import abc +import unittest # pylint: disable=unused-import + +from grpc.framework.face import exceptions +from tests.unit.framework.common import test_constants +from tests.unit.framework.face.testing import control +from tests.unit.framework.face.testing import coverage +from tests.unit.framework.face.testing import digest +from tests.unit.framework.face.testing import stock_service +from tests.unit.framework.face.testing import test_case + + +class BlockingInvocationInlineServiceTestCase( + test_case.FaceTestCase, coverage.BlockingCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, None) + + self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + self.digest.inline_method_implementations, None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response = self.stub.blocking_value_in_value_out( + name, request, test_constants.LONG_TIMEOUT) + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response = self.stub.blocking_stream_in_value_out( + name, iter(requests), test_constants.LONG_TIMEOUT) + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response = self.stub.blocking_value_in_value_out( + name, first_request, test_constants.SHORT_TIMEOUT) + + test_messages.verify(first_request, first_response, self) + + second_response = self.stub.blocking_value_in_value_out( + name, second_request, test_constants.SHORT_TIMEOUT) + + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + multi_callable = self.stub.unary_unary_multi_callable(name) + multi_callable(request, test_constants.SHORT_TIMEOUT) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + multi_callable = self.stub.stream_unary_multi_callable(name) + multi_callable(iter(requests), test_constants.SHORT_TIMEOUT) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(), self.assertRaises( + exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + self.stub.blocking_value_in_value_out(name, request, + test_constants.SHORT_TIMEOUT) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + self.stub.blocking_stream_in_value_out(name, iter(requests), + test_constants.SHORT_TIMEOUT) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(), self.assertRaises(exceptions.ServicerError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/callback.py b/src/python/grpcio/tests/unit/framework/face/testing/callback.py new file mode 100644 index 0000000000..d0e63c8c56 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/callback.py @@ -0,0 +1,94 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A utility useful in tests of asynchronous, event-driven interfaces.""" + +import threading + +from grpc.framework.foundation import stream + + +class Callback(stream.Consumer): + """A utility object useful in tests of asynchronous code.""" + + def __init__(self): + self._condition = threading.Condition() + self._unary_response = None + self._streamed_responses = [] + self._completed = False + self._abortion = None + + def abort(self, abortion): + with self._condition: + self._abortion = abortion + self._condition.notify_all() + + def complete(self, unary_response): + with self._condition: + self._unary_response = unary_response + self._completed = True + self._condition.notify_all() + + def consume(self, streamed_response): + with self._condition: + self._streamed_responses.append(streamed_response) + + def terminate(self): + with self._condition: + self._completed = True + self._condition.notify_all() + + def consume_and_terminate(self, streamed_response): + with self._condition: + self._streamed_responses.append(streamed_response) + self._completed = True + self._condition.notify_all() + + def block_until_terminated(self): + with self._condition: + while self._abortion is None and not self._completed: + self._condition.wait() + + def response(self): + with self._condition: + if self._abortion is None: + return self._unary_response + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def responses(self): + with self._condition: + if self._abortion is None: + return list(self._streamed_responses) + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def abortion(self): + with self._condition: + return self._abortion diff --git a/src/python/grpcio/tests/unit/framework/face/testing/control.py b/src/python/grpcio/tests/unit/framework/face/testing/control.py new file mode 100644 index 0000000000..3960c4e649 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/control.py @@ -0,0 +1,87 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for instructing systems under test to block or fail.""" + +import abc +import contextlib +import threading + + +class Control(object): + """An object that accepts program control from a system under test. + + Systems under test passed a Control should call its control() method + frequently during execution. The control() method may block, raise an + exception, or do nothing, all according to the enclosing test's desire for + the system under test to simulate hanging, failing, or functioning. + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def control(self): + """Potentially does anything.""" + raise NotImplementedError() + + +class PauseFailControl(Control): + """A Control that can be used to pause or fail code under control.""" + + def __init__(self): + self._condition = threading.Condition() + self._paused = False + self._fail = False + + def control(self): + with self._condition: + if self._fail: + raise ValueError() + + while self._paused: + self._condition.wait() + + @contextlib.contextmanager + def pause(self): + """Pauses code under control while controlling code is in context.""" + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + @contextlib.contextmanager + def fail(self): + """Fails code under control while controlling code is in context.""" + with self._condition: + self._fail = True + yield + with self._condition: + self._fail = False diff --git a/src/python/grpcio/tests/unit/framework/face/testing/coverage.py b/src/python/grpcio/tests/unit/framework/face/testing/coverage.py new file mode 100644 index 0000000000..f3aca113fe --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/coverage.py @@ -0,0 +1,123 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Governs coverage for the tests of the Face layer of RPC Framework.""" + +import abc + +# These classes are only valid when inherited by unittest.TestCases. +# pylint: disable=invalid-name + + +class BlockingCoverage(object): + """Specification of test coverage for blocking behaviors.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testSuccessfulUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSuccessfulStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testSequentialInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testExpiredStreamRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testFailedStreamRequestStreamResponse(self): + raise NotImplementedError() + + +class FullCoverage(BlockingCoverage): + """Specification of test coverage for non-blocking behaviors.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def testParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @abc.abstractmethod + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/face/testing/digest.py b/src/python/grpcio/tests/unit/framework/face/testing/digest.py new file mode 100644 index 0000000000..39f28b9657 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/digest.py @@ -0,0 +1,450 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for making a service.TestService more amenable to use in tests.""" + +import collections +import threading + +# testing_control, interfaces, and testing_service are referenced from +# specification in this module. +from grpc.framework.common import cardinality +from grpc.framework.common import style +from grpc.framework.face import exceptions +from grpc.framework.face import interfaces as face_interfaces +from grpc.framework.foundation import stream +from grpc.framework.foundation import stream_util +from tests.unit.framework.face.testing import control as testing_control # pylint: disable=unused-import +from tests.unit.framework.face.testing import interfaces # pylint: disable=unused-import +from tests.unit.framework.face.testing import service as testing_service # pylint: disable=unused-import + +_IDENTITY = lambda x: x + + +class TestServiceDigest( + collections.namedtuple( + 'TestServiceDigest', + ['name', + 'methods', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences'])): + """A transformation of a service.TestService. + + Attributes: + name: The RPC service name to be used in the test. + methods: A sequence of interfaces.Method objects describing the RPC + methods that will be called during the test. + inline_method_implementations: A dict from RPC method name to + face_interfaces.MethodImplementation object to be used in tests of + in-line calls to behaviors under test. + event_method_implementations: A dict from RPC method name to + face_interfaces.MethodImplementation object to be used in tests of + event-driven calls to behaviors under test. + multi_method_implementation: A face_interfaces.MultiMethodImplementation to + be used in tests of generic calls to behaviors under test. + unary_unary_messages_sequences: A dict from method name to sequence of + service.UnaryUnaryTestMessages objects to be used to test the method + with the given name. + unary_stream_messages_sequences: A dict from method name to sequence of + service.UnaryStreamTestMessages objects to be used to test the method + with the given name. + stream_unary_messages_sequences: A dict from method name to sequence of + service.StreamUnaryTestMessages objects to be used to test the method + with the given name. + stream_stream_messages_sequences: A dict from method name to sequence of + service.StreamStreamTestMessages objects to be used to test the + method with the given name. + serialization: A serial.Serialization object describing serialization + behaviors for all the RPC methods. + """ + + +class _BufferingConsumer(stream.Consumer): + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + + def __init__(self): + self.consumed = [] + self.terminated = False + + def consume(self, value): + self.consumed.append(value) + + def terminate(self): + self.terminated = True + + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True + + +class _InlineUnaryUnaryMethod(face_interfaces.MethodImplementation): + + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.INLINE + + def unary_unary_inline(self, request, context): + response_list = [] + self._test_method.service( + request, response_list.append, context, self._control) + return response_list.pop(0) + + +class _EventUnaryUnaryMethod(face_interfaces.MethodImplementation): + + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.EVENT + + def unary_unary_event(self, request, response_callback, context): + if self._pool is None: + self._test_method.service( + request, response_callback, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_callback, context, + self._control) + + +class _InlineUnaryStreamMethod(face_interfaces.MethodImplementation): + + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.INLINE + + def unary_stream_inline(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service( + request, response_consumer, context, self._control) + for response in response_consumer.consumed: + yield response + + +class _EventUnaryStreamMethod(face_interfaces.MethodImplementation): + + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.EVENT + + def unary_stream_event(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service( + request, response_consumer, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_consumer, context, + self._control) + + +class _InlineStreamUnaryMethod(face_interfaces.MethodImplementation): + + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.INLINE + + def stream_unary_inline(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service( + response_list.append, context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) + + +class _EventStreamUnaryMethod(face_interfaces.MethodImplementation): + + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.EVENT + + def stream_unary_event(self, response_callback, context): + request_consumer = self._test_method.service( + response_callback, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _InlineStreamStreamMethod(face_interfaces.MethodImplementation): + + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.INLINE + + def stream_stream_inline(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service( + response_consumer, context, self._control) + + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() + + +class _EventStreamStreamMethod(face_interfaces.MethodImplementation): + + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.EVENT + + def stream_stream_event(self, response_consumer, context): + request_consumer = self._test_method.service( + response_consumer, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _UnaryConsumer(stream.Consumer): + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) + + +class _UnaryUnaryAdaptation(object): + + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service( + request, response_consumer.consume_and_terminate, context, control) + return _UnaryConsumer(action) + + +class _UnaryStreamAdaptation(object): + + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service(request, response_consumer, context, control) + return _UnaryConsumer(action) + + +class _StreamUnaryAdaptation(object): + + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method + + def service(self, response_consumer, context, control): + return self._method.service( + response_consumer.consume_and_terminate, context, control) + + +class _MultiMethodImplementation(face_interfaces.MultiMethodImplementation): + + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool + + def service(self, name, response_consumer, context): + method = self._methods.get(name, None) + if method is None: + raise exceptions.NoSuchMethodError(name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _Assembly( + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble( + scenarios, names, inline_method_constructor, event_method_constructor, + adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = [] + inlines = {} + events = {} + adaptations = {} + messages = {} + for name, scenario in scenarios.iteritems(): + if name in names: + raise ValueError('Repeated name "%s"!' % name) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods.append(test_method) + inlines[name] = inline_method + events[name] = event_method + adaptations[name] = adaptation + messages[name] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) + + +def digest(service, control, pool): + """Creates a TestServiceDigest from a TestService. + + Args: + service: A testing_service.TestService. + control: A testing_control.Control. + pool: If RPC methods should be serviced in a separate thread, a thread pool. + None if RPC methods should be serviced in the thread belonging to the + run-time that calls for their service. + + Returns: + A TestServiceDigest synthesized from the given service.TestService. + """ + names = set() + + unary_unary = _assemble( + service.unary_unary_scenarios(), names, _InlineUnaryUnaryMethod, + _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) + names.update(set(unary_unary.inlines)) + + unary_stream = _assemble( + service.unary_stream_scenarios(), names, _InlineUnaryStreamMethod, + _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) + names.update(set(unary_stream.inlines)) + + stream_unary = _assemble( + service.stream_unary_scenarios(), names, _InlineStreamUnaryMethod, + _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) + names.update(set(stream_unary.inlines)) + + stream_stream = _assemble( + service.stream_stream_scenarios(), names, _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, pool) + names.update(set(stream_stream.inlines)) + + methods = list(unary_unary.methods) + methods.extend(unary_stream.methods) + methods.extend(stream_unary.methods) + methods.extend(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + inlines = dict(unary_unary.inlines) + inlines.update(unary_stream.inlines) + inlines.update(stream_unary.inlines) + inlines.update(stream_stream.inlines) + events = dict(unary_unary.events) + events.update(unary_stream.events) + events.update(stream_unary.events) + events.update(stream_stream.events) + + return TestServiceDigest( + service.name(), + methods, + inlines, + events, + _MultiMethodImplementation(adaptations, control, pool), + unary_unary.messages, + unary_stream.messages, + stream_unary.messages, + stream_stream.messages) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py new file mode 100644 index 0000000000..179f3a2f67 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -0,0 +1,376 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +import abc +import unittest + +from grpc.framework.face import interfaces +from tests.unit.framework.common import test_constants +from tests.unit.framework.face.testing import callback as testing_callback +from tests.unit.framework.face.testing import control +from tests.unit.framework.face.testing import coverage +from tests.unit.framework.face.testing import digest +from tests.unit.framework.face.testing import stock_service +from tests.unit.framework.face.testing import test_case + + +class EventInvocationSynchronousEventServiceTestCase( + test_case.FaceTestCase, coverage.FullCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, None) + + self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + self.digest.event_method_implementations, None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + response = callback.response() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + responses = callback.responses() + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + response = callback.response() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + responses = callback.responses() + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + # pylint: disable=cell-var-from-loop + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + first_callback = testing_callback.Callback() + second_callback = testing_callback.Callback() + + def make_second_invocation(first_response): + first_callback.complete(first_response) + self.stub.event_value_in_value_out( + name, second_request, second_callback.complete, + second_callback.abort, test_constants.SHORT_TIMEOUT) + + self.stub.event_value_in_value_out( + name, first_request, make_second_invocation, first_callback.abort, + test_constants.SHORT_TIMEOUT) + second_callback.block_until_terminated() + + first_response = first_callback.response() + second_response = second_callback.response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + callback = testing_callback.Callback() + + self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) + for request in requests: + request_consumer.consume(request) + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.fail(): + self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.fail(): + self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + with self.control.fail(): + unused_call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, + callback.abortion()) + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + with self.control.fail(): + unused_call, request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) + for request in requests: + request_consumer.consume(request) + request_consumer.terminate() + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) + + def testParallelInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + first_callback = testing_callback.Callback() + second_request = test_messages.request() + second_callback = testing_callback.Callback() + + self.stub.event_value_in_value_out( + name, first_request, first_callback.complete, first_callback.abort, + test_constants.SHORT_TIMEOUT) + self.stub.event_value_in_value_out( + name, second_request, second_callback.complete, + second_callback.abort, test_constants.SHORT_TIMEOUT) + first_callback.block_until_terminated() + second_callback.block_until_terminated() + + first_response = first_callback.response() + second_response = second_callback.response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + with self.control.pause(): + call = self.stub.event_value_in_value_out( + name, request, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) + + def testCancelledUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = testing_callback.Callback() + + call = self.stub.event_value_in_stream_out( + name, request, callback, callback.abort, + test_constants.SHORT_TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) + + def testCancelledStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = testing_callback.Callback() + + call, request_consumer = self.stub.event_stream_in_value_out( + name, callback.complete, callback.abort, + test_constants.SHORT_TIMEOUT) + for request in requests: + request_consumer.consume(request) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) + + def testCancelledStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + callback = testing_callback.Callback() + + call, unused_request_consumer = self.stub.event_stream_in_stream_out( + name, callback, callback.abort, test_constants.SHORT_TIMEOUT) + call.cancel() + callback.block_until_terminated() + + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py new file mode 100644 index 0000000000..485524a356 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -0,0 +1,379 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test to verify an implementation of the Face layer of RPC Framework.""" + +import abc +import contextlib +import threading +import unittest + +from grpc.framework.face import exceptions +from grpc.framework.foundation import future +from grpc.framework.foundation import logging_pool +from tests.unit.framework.common import test_constants +from tests.unit.framework.face.testing import control +from tests.unit.framework.face.testing import coverage +from tests.unit.framework.face.testing import digest +from tests.unit.framework.face.testing import stock_service +from tests.unit.framework.face.testing import test_case + +_MAXIMUM_POOL_SIZE = 10 + + +class _PauseableIterator(object): + + def __init__(self, upstream): + self._upstream = upstream + self._condition = threading.Condition() + self._paused = False + + @contextlib.contextmanager + def pause(self): + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while self._paused: + self._condition.wait() + return next(self._upstream) + + +class FutureInvocationAsynchronousEventServiceTestCase( + test_case.FaceTestCase, coverage.FullCoverage): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must also extend unittest.TestCase. + """ + __metaclass__ = abc.ABCMeta + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self.control = control.PauseFailControl() + self.digest_pool = logging_pool.pool(_MAXIMUM_POOL_SIZE) + self.digest = digest.digest( + stock_service.STOCK_TEST_SERVICE, self.control, self.digest_pool) + + self.stub, self.memo = self.set_up_implementation( + self.digest.name, self.digest.methods, + self.digest.event_method_implementations, None) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self.tear_down_implementation(self.memo) + self.digest_pool.shutdown(wait=True) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_future = self.stub.future_value_in_value_out( + name, request, test_constants.SHORT_TIMEOUT) + response = response_future.result() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.SHORT_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_future = self.stub.future_stream_in_value_out( + name, request_iterator, test_constants.SHORT_TIMEOUT) + response = response_future.result() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, request_iterator, test_constants.SHORT_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self.stub.future_value_in_value_out( + name, first_request, test_constants.SHORT_TIMEOUT) + first_response = first_response_future.result() + + test_messages.verify(first_request, first_response, self) + + second_response_future = self.stub.future_value_in_value_out( + name, second_request, test_constants.SHORT_TIMEOUT) + second_response = second_response_future.result() + + test_messages.verify(second_request, second_response, self) + + def testExpiredUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + multi_callable = self.stub.unary_unary_multi_callable(name) + response_future = multi_callable.future(request, + test_constants.SHORT_TIMEOUT) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + def testExpiredUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + multi_callable = self.stub.stream_unary_multi_callable(name) + response_future = multi_callable.future(iter(requests), + test_constants.SHORT_TIMEOUT) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + def testExpiredStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.fail(): + response_future = self.stub.future_value_in_value_out( + name, request, test_constants.SHORT_TIMEOUT) + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + def testFailedUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self.control.fail(), self.assertRaises(exceptions.ExpirationError): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.fail(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() + + def testFailedStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self.control.fail(), self.assertRaises(exceptions.ExpirationError): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + list(response_iterator) + + def testParallelInvocations(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + # TODO(bug 2039): use LONG_TIMEOUT instead + first_response_future = self.stub.future_value_in_value_out( + name, first_request, test_constants.SHORT_TIMEOUT) + second_response_future = self.stub.future_value_in_value_out( + name, second_request, test_constants.SHORT_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_future = self.stub.future_value_in_value_out( + name, request, test_constants.SHORT_TIMEOUT) + cancel_method_return_value = response_future.cancel() + + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + + def testCancelledUnaryRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self.control.pause(): + response_iterator = self.stub.inline_value_in_stream_out( + name, request, test_constants.SHORT_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(future.CancelledError): + next(response_iterator) + + def testCancelledStreamRequestUnaryResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_future = self.stub.future_stream_in_value_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + cancel_method_return_value = response_future.cancel() + + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + + def testCancelledStreamRequestStreamResponse(self): + for name, test_messages_sequence in ( + self.digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self.control.pause(): + response_iterator = self.stub.inline_stream_in_stream_out( + name, iter(requests), test_constants.SHORT_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(future.CancelledError): + next(response_iterator) diff --git a/src/python/grpcio/tests/unit/framework/face/testing/interfaces.py b/src/python/grpcio/tests/unit/framework/face/testing/interfaces.py new file mode 100644 index 0000000000..5932dabf1e --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/interfaces.py @@ -0,0 +1,117 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# cardinality is referenced from specification in this module. +from grpc.framework.common import cardinality # pylint: disable=unused-import + + +class Method(object): + """An RPC method to be used in tests of RPC implementations.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Identify the name of the method. + + Returns: + The name of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cardinality(self): + """Identify the cardinality of the method. + + Returns: + A cardinality.Cardinality value describing the streaming semantics of the + method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def request_class(self): + """Identify the class used for the method's request objects. + + Returns: + The class object of the class to which the method's request objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def response_class(self): + """Identify the class used for the method's response objects. + + Returns: + The class object of the class to which the method's response objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serialize the given request object. + + Args: + request: A request object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Synthesize a request object from a given bytestring. + + Args: + serialized_request: A bytestring deserializable into a request object + appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serialize the given response object. + + Args: + response: A response object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Synthesize a response object from a given bytestring. + + Args: + serialized_response: A bytestring deserializable into a response object + appropriate for this method. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/face/testing/service.py b/src/python/grpcio/tests/unit/framework/face/testing/service.py new file mode 100644 index 0000000000..ac0b89b6ee --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/service.py @@ -0,0 +1,337 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# interfaces is referenced from specification in this module. +from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from tests.unit.framework.face.testing import interfaces + + +class UnaryUnaryTestMethodImplementation(interfaces.Method): + """A controllable implementation of a unary-unary RPC method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context, control): + """Services an RPC that accepts one message and produces one message. + + Args: + request: The single request message for the RPC. + response_callback: A callback to be called to accept the response message + of the RPC. + context: An face_interfaces.RpcContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryUnaryTestMessages(object): + """A type for unary-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, response, test_case): + """Verifies that the computed response matches the given request. + + Args: + request: A request message. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class UnaryStreamTestMethodImplementation(interfaces.Method): + """A controllable implementation of a unary-stream RPC method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context, control): + """Services an RPC that takes one message and produces a stream of messages. + + Args: + request: The single request message for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: A face_interfaces.RpcContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryStreamTestMessages(object): + """A type for unary-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, responses, test_case): + """Verifies that the computed responses match the given request. + + Args: + request: A request message. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and responses do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamUnaryTestMethodImplementation(interfaces.Method): + """A controllable implementation of a stream-unary RPC method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context, control): + """Services an RPC that takes a stream of messages and produces one message. + + Args: + response_callback: A callback to be called to accept the response message + of the RPC. + context: A face_interfaces.RpcContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamUnaryTestMessages(object): + """A type for stream-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, response, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamStreamTestMethodImplementation(interfaces.Method): + """A controllable implementation of a stream-stream RPC method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context, control): + """Services an RPC that accepts and produces streams of messages. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: A face_interfaces.RpcContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamStreamTestMessages(object): + """A type for stream-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, responses, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and responses do not match, indicating + that there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class TestService(object): + """A specification of implemented RPC methods to use in tests.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Identifies the RPC service name used during the test. + + Returns: + The RPC service name to be used for the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_unary_scenarios(self): + """Affords unary-request-unary-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair + is a UnaryUnaryTestMethodImplementation object and the second element + is a sequence of UnaryUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream_scenarios(self): + """Affords unary-request-stream-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + UnaryStreamTestMethodImplementation object and the second element is a + sequence of UnaryStreamTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary_scenarios(self): + """Affords stream-request-unary-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + StreamUnaryTestMethodImplementation object and the second element is a + sequence of StreamUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_stream_scenarios(self): + """Affords stream-request-stream-response test methods and their messages. + + Returns: + A dict from method name to pair. The first element of the pair is a + StreamStreamTestMethodImplementation object and the second element is a + sequence of StreamStreamTestMethodMessages objects. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/face/testing/stock_service.py b/src/python/grpcio/tests/unit/framework/face/testing/stock_service.py new file mode 100644 index 0000000000..117c723f79 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/stock_service.py @@ -0,0 +1,374 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Examples of Python implementations of the stock.proto Stock service.""" + +from grpc.framework.common import cardinality +from grpc.framework.foundation import abandonment +from grpc.framework.foundation import stream +from grpc.framework.foundation import stream_util +from tests.unit.framework.face.testing import service +from tests.unit._junkdrawer import stock_pb2 + +SYMBOL_FORMAT = 'test symbol:%03d' +STREAM_LENGTH = 400 + +# A test-appropriate security-pricing function. :-P +_price = lambda symbol_name: float(hash(symbol_name) % 4096) + + +def _get_last_trade_price(stock_request, stock_reply_callback, control, active): + """A unary-request, unary-response test method.""" + control.control() + if active(): + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol))) + else: + raise abandonment.Abandoned() + + +def _get_last_trade_price_multiple(stock_reply_consumer, control, active): + """A stream-request, stream-response test method.""" + def stock_reply_for_stock_request(stock_request): + control.control() + if active(): + return stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol)) + else: + raise abandonment.Abandoned() + return stream_util.TransformingConsumer( + stock_reply_for_stock_request, stock_reply_consumer) + + +def _watch_future_trades(stock_request, stock_reply_consumer, control, active): + """A unary-request, stream-response test method.""" + base_price = _price(stock_request.symbol) + for index in range(stock_request.num_trades_to_watch): + control.control() + if active(): + stock_reply_consumer.consume( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=base_price + index)) + else: + raise abandonment.Abandoned() + stock_reply_consumer.terminate() + + +def _get_highest_trade_price(stock_reply_callback, control, active): + """A stream-request, unary-response test method.""" + + class StockRequestConsumer(stream.Consumer): + """Keeps an ongoing record of the most valuable symbol yet consumed.""" + + def __init__(self): + self._symbol = None + self._price = None + + def consume(self, stock_request): + control.control() + if active(): + if self._price is None: + self._symbol = stock_request.symbol + self._price = _price(stock_request.symbol) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + self._symbol = stock_request.symbol + self._price = candidate_price + + def terminate(self): + control.control() + if active(): + if self._symbol is None: + raise ValueError() + else: + stock_reply_callback( + stock_pb2.StockReply(symbol=self._symbol, price=self._price)) + self._symbol = None + self._price = None + + def consume_and_terminate(self, stock_request): + control.control() + if active(): + if self._price is None: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=candidate_price)) + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + + self._symbol = None + self._price = None + + return StockRequestConsumer() + + +class GetLastTradePrice(service.UnaryUnaryTestMethodImplementation): + """GetLastTradePrice for use in tests.""" + + def name(self): + return 'GetLastTradePrice' + + def cardinality(self): + return cardinality.Cardinality.UNARY_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_callback, context, control): + _get_last_trade_price( + request, response_callback, control, context.is_active) + + +class GetLastTradePriceMessages(service.UnaryUnaryTestMessages): + + def __init__(self): + self._index = 0 + + def request(self): + symbol = SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest(symbol=symbol) + + def verify(self, request, response, test_case): + test_case.assertEqual(request.symbol, response.symbol) + test_case.assertEqual(_price(request.symbol), response.price) + + +class GetLastTradePriceMultiple(service.StreamStreamTestMethodImplementation): + """GetLastTradePriceMultiple for use in tests.""" + + def name(self): + return 'GetLastTradePriceMultiple' + + def cardinality(self): + return cardinality.Cardinality.STREAM_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_consumer, context, control): + return _get_last_trade_price_multiple( + response_consumer, control, context.is_active) + + +class GetLastTradePriceMultipleMessages(service.StreamStreamTestMessages): + """Pairs of message streams for use with GetLastTradePriceMultiple.""" + + def __init__(self): + self._index = 0 + + def requests(self): + base_index = self._index + self._index += 1 + return [ + stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % (base_index + index)) + for index in range(STREAM_LENGTH)] + + def verify(self, requests, responses, test_case): + test_case.assertEqual(len(requests), len(responses)) + for stock_request, stock_reply in zip(requests, responses): + test_case.assertEqual(stock_request.symbol, stock_reply.symbol) + test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) + + +class WatchFutureTrades(service.UnaryStreamTestMethodImplementation): + """WatchFutureTrades for use in tests.""" + + def name(self): + return 'WatchFutureTrades' + + def cardinality(self): + return cardinality.Cardinality.UNARY_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_consumer, context, control): + _watch_future_trades(request, response_consumer, control, context.is_active) + + +class WatchFutureTradesMessages(service.UnaryStreamTestMessages): + """Pairs of a single request message and a sequence of response messages.""" + + def __init__(self): + self._index = 0 + + def request(self): + symbol = SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest( + symbol=symbol, num_trades_to_watch=STREAM_LENGTH) + + def verify(self, request, responses, test_case): + test_case.assertEqual(STREAM_LENGTH, len(responses)) + base_price = _price(request.symbol) + for index, response in enumerate(responses): + test_case.assertEqual(base_price + index, response.price) + + +class GetHighestTradePrice(service.StreamUnaryTestMethodImplementation): + """GetHighestTradePrice for use in tests.""" + + def name(self): + return 'GetHighestTradePrice' + + def cardinality(self): + return cardinality.Cardinality.STREAM_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_callback, context, control): + return _get_highest_trade_price( + response_callback, control, context.is_active) + + +class GetHighestTradePriceMessages(service.StreamUnaryTestMessages): + + def requests(self): + return [ + stock_pb2.StockRequest(symbol=SYMBOL_FORMAT % index) + for index in range(STREAM_LENGTH)] + + def verify(self, requests, response, test_case): + price = None + symbol = None + for stock_request in requests: + current_symbol = stock_request.symbol + current_price = _price(current_symbol) + if price is None or price < current_price: + price = current_price + symbol = current_symbol + test_case.assertEqual(price, response.price) + test_case.assertEqual(symbol, response.symbol) + + +class StockTestService(service.TestService): + """A corpus of test data with one method of each RPC cardinality.""" + + def name(self): + return 'Stock' + + def unary_unary_scenarios(self): + return { + 'GetLastTradePrice': ( + GetLastTradePrice(), [GetLastTradePriceMessages()]), + } + + def unary_stream_scenarios(self): + return { + 'WatchFutureTrades': ( + WatchFutureTrades(), [WatchFutureTradesMessages()]), + } + + def stream_unary_scenarios(self): + return { + 'GetHighestTradePrice': ( + GetHighestTradePrice(), [GetHighestTradePriceMessages()]) + } + + def stream_stream_scenarios(self): + return { + 'GetLastTradePriceMultiple': ( + GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]), + } + + +STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio/tests/unit/framework/face/testing/test_case.py b/src/python/grpcio/tests/unit/framework/face/testing/test_case.py new file mode 100644 index 0000000000..23d4d919c2 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/face/testing/test_case.py @@ -0,0 +1,80 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tools for creating tests of implementations of the Face layer.""" + +import abc + +# face_interfaces and interfaces are referenced in specification in this module. +from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import +from tests.unit.framework.face.testing import interfaces # pylint: disable=unused-import + + +class FaceTestCase(object): + """Describes a test of the Face Layer of RPC Framework. + + Concrete subclasses must also inherit from unittest.TestCase and from at least + one class that defines test methods. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_up_implementation( + self, name, methods, method_implementations, + multi_method_implementation): + """Instantiates the Face Layer implementation under test. + + Args: + name: The service name to be used in the test. + methods: A sequence of interfaces.Method objects describing the RPC + methods that will be called during the test. + method_implementations: A dictionary from string RPC method name to + face_interfaces.MethodImplementation object specifying + implementation of an RPC method. + multi_method_implementation: An face_interfaces.MultiMethodImplementation + or None. + + Returns: + A sequence of length two the first element of which is a + face_interfaces.GenericStub (backed by the given method + implementations), and the second element of which is an arbitrary memo + object to be kept and passed to tearDownImplementation at the conclusion + of the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def tear_down_implementation(self, memo): + """Destroys the Face layer implementation under test. + + Args: + memo: The object from the second position of the return value of + set_up_implementation. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/foundation/__init__.py b/src/python/grpcio/tests/unit/framework/foundation/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/foundation/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/foundation/_later_test.py b/src/python/grpcio/tests/unit/framework/foundation/_later_test.py new file mode 100644 index 0000000000..6c2459e185 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/foundation/_later_test.py @@ -0,0 +1,151 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of the later module.""" + +import threading +import time +import unittest + +from grpc.framework.foundation import later + +TICK = 0.1 + + +class LaterTest(unittest.TestCase): + + def test_simple_delay(self): + lock = threading.Lock() + cell = [0] + return_value = object() + + def computation(): + with lock: + cell[0] += 1 + return return_value + computation_future = later.later(TICK * 2, computation) + + self.assertFalse(computation_future.done()) + self.assertFalse(computation_future.cancelled()) + time.sleep(TICK) + self.assertFalse(computation_future.done()) + self.assertFalse(computation_future.cancelled()) + with lock: + self.assertEqual(0, cell[0]) + time.sleep(TICK * 2) + self.assertTrue(computation_future.done()) + self.assertFalse(computation_future.cancelled()) + with lock: + self.assertEqual(1, cell[0]) + self.assertEqual(return_value, computation_future.result()) + + def test_callback(self): + lock = threading.Lock() + cell = [0] + callback_called = [False] + future_passed_to_callback = [None] + def computation(): + with lock: + cell[0] += 1 + computation_future = later.later(TICK * 2, computation) + def callback(outcome): + with lock: + callback_called[0] = True + future_passed_to_callback[0] = outcome + computation_future.add_done_callback(callback) + time.sleep(TICK) + with lock: + self.assertFalse(callback_called[0]) + time.sleep(TICK * 2) + with lock: + self.assertTrue(callback_called[0]) + self.assertTrue(future_passed_to_callback[0].done()) + + callback_called[0] = False + future_passed_to_callback[0] = None + + computation_future.add_done_callback(callback) + with lock: + self.assertTrue(callback_called[0]) + self.assertTrue(future_passed_to_callback[0].done()) + + def test_cancel(self): + lock = threading.Lock() + cell = [0] + callback_called = [False] + future_passed_to_callback = [None] + def computation(): + with lock: + cell[0] += 1 + computation_future = later.later(TICK * 2, computation) + def callback(outcome): + with lock: + callback_called[0] = True + future_passed_to_callback[0] = outcome + computation_future.add_done_callback(callback) + time.sleep(TICK) + with lock: + self.assertFalse(callback_called[0]) + computation_future.cancel() + self.assertTrue(computation_future.cancelled()) + self.assertFalse(computation_future.running()) + self.assertTrue(computation_future.done()) + with lock: + self.assertTrue(callback_called[0]) + self.assertTrue(future_passed_to_callback[0].cancelled()) + + def test_result(self): + lock = threading.Lock() + cell = [0] + callback_called = [False] + future_passed_to_callback_cell = [None] + return_value = object() + + def computation(): + with lock: + cell[0] += 1 + return return_value + computation_future = later.later(TICK * 2, computation) + + def callback(future_passed_to_callback): + with lock: + callback_called[0] = True + future_passed_to_callback_cell[0] = future_passed_to_callback + computation_future.add_done_callback(callback) + returned_value = computation_future.result() + self.assertEqual(return_value, returned_value) + + # The callback may not yet have been called! Sleep a tick. + time.sleep(TICK) + with lock: + self.assertTrue(callback_called[0]) + self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py new file mode 100644 index 0000000000..452802da6a --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py @@ -0,0 +1,64 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests for grpc.framework.foundation.logging_pool.""" + +import unittest + +from grpc.framework.foundation import logging_pool + +_POOL_SIZE = 16 + + +class LoggingPoolTest(unittest.TestCase): + + def testUpAndDown(self): + pool = logging_pool.pool(_POOL_SIZE) + pool.shutdown(wait=True) + + with logging_pool.pool(_POOL_SIZE) as pool: + self.assertIsNotNone(pool) + + def testTaskExecuted(self): + test_list = [] + + with logging_pool.pool(_POOL_SIZE) as pool: + pool.submit(lambda: test_list.append(object())).result() + + self.assertTrue(test_list) + + def testException(self): + with logging_pool.pool(_POOL_SIZE) as pool: + raised_exception = pool.submit(lambda: 1/0).exception() + + self.assertIsNotNone(raised_exception) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/foundation/stream_testing.py b/src/python/grpcio/tests/unit/framework/foundation/stream_testing.py new file mode 100644 index 0000000000..098a53d5e7 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/foundation/stream_testing.py @@ -0,0 +1,73 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for testing stream-related code.""" + +from grpc.framework.foundation import stream + + +class TestConsumer(stream.Consumer): + """A stream.Consumer instrumented for testing. + + Attributes: + calls: A sequence of value-termination pairs describing the history of calls + made on this object. + """ + + def __init__(self): + self.calls = [] + + def consume(self, value): + """See stream.Consumer.consume for specification.""" + self.calls.append((value, False)) + + def terminate(self): + """See stream.Consumer.terminate for specification.""" + self.calls.append((None, True)) + + def consume_and_terminate(self, value): + """See stream.Consumer.consume_and_terminate for specification.""" + self.calls.append((value, True)) + + def is_legal(self): + """Reports whether or not a legal sequence of calls has been made.""" + terminated = False + for value, terminal in self.calls: + if terminated: + return False + elif terminal: + terminated = True + elif value is None: + return False + else: # pylint: disable=useless-else-on-loop + return True + + def values(self): + """Returns the sequence of values that have been passed to this Consumer.""" + return [value for value, _ in self.calls if value] diff --git a/src/python/grpcio/tests/unit/framework/interfaces/__init__.py b/src/python/grpcio/tests/unit/framework/interfaces/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/__init__.py b/src/python/grpcio/tests/unit/framework/interfaces/base/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py b/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py new file mode 100644 index 0000000000..38102b198a --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/_control.py @@ -0,0 +1,568 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Part of the tests of the base interface of RPC Framework.""" + +import abc +import collections +import enum +import random # pylint: disable=unused-import +import threading +import time + +from grpc.framework.interfaces.base import base +from tests.unit.framework.common import test_constants +from tests.unit.framework.interfaces.base import _sequence +from tests.unit.framework.interfaces.base import _state +from tests.unit.framework.interfaces.base import test_interfaces # pylint: disable=unused-import + +_GROUP = 'base test cases test group' +_METHOD = 'base test cases test method' + +_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE = test_constants.PAYLOAD_SIZE / 20 +_MINIMUM_PAYLOAD_SIZE = test_constants.PAYLOAD_SIZE / 600 + + +def _create_payload(randomness): + length = randomness.randint( + _MINIMUM_PAYLOAD_SIZE, test_constants.PAYLOAD_SIZE) + random_section_length = randomness.randint( + 0, min(_PAYLOAD_RANDOM_SECTION_MAXIMUM_SIZE, length)) + random_section = bytes( + bytearray( + randomness.getrandbits(8) for _ in range(random_section_length))) + sevens_section = '\x07' * (length - random_section_length) + return b''.join(randomness.sample((random_section, sevens_section), 2)) + + +def _anything_in_flight(state): + return ( + state.invocation_initial_metadata_in_flight is not None or + state.invocation_payloads_in_flight or + state.invocation_completion_in_flight is not None or + state.service_initial_metadata_in_flight is not None or + state.service_payloads_in_flight or + state.service_completion_in_flight is not None or + 0 < state.invocation_allowance_in_flight or + 0 < state.service_allowance_in_flight + ) + + +def _verify_service_advance_and_update_state( + initial_metadata, payload, completion, allowance, state, implementation): + if initial_metadata is not None: + if state.invocation_initial_metadata_received: + return 'Later invocation initial metadata received: %s' % ( + initial_metadata,) + if state.invocation_payloads_received: + return 'Invocation initial metadata received after payloads: %s' % ( + state.invocation_payloads_received) + if state.invocation_completion_received: + return 'Invocation initial metadata received after invocation completion!' + if not implementation.metadata_transmitted( + state.invocation_initial_metadata_in_flight, initial_metadata): + return 'Invocation initial metadata maltransmitted: %s, %s' % ( + state.invocation_initial_metadata_in_flight, initial_metadata) + else: + state.invocation_initial_metadata_in_flight = None + state.invocation_initial_metadata_received = True + + if payload is not None: + if state.invocation_completion_received: + return 'Invocation payload received after invocation completion!' + elif not state.invocation_payloads_in_flight: + return 'Invocation payload "%s" received but not in flight!' % (payload,) + elif state.invocation_payloads_in_flight[0] != payload: + return 'Invocation payload mismatch: %s, %s' % ( + state.invocation_payloads_in_flight[0], payload) + elif state.service_side_invocation_allowance < 1: + return 'Disallowed invocation payload!' + else: + state.invocation_payloads_in_flight.pop(0) + state.invocation_payloads_received += 1 + state.service_side_invocation_allowance -= 1 + + if completion is not None: + if state.invocation_completion_received: + return 'Later invocation completion received: %s' % (completion,) + elif not implementation.completion_transmitted( + state.invocation_completion_in_flight, completion): + return 'Invocation completion maltransmitted: %s, %s' % ( + state.invocation_completion_in_flight, completion) + else: + state.invocation_completion_in_flight = None + state.invocation_completion_received = True + + if allowance is not None: + if allowance <= 0: + return 'Illegal allowance value: %s' % (allowance,) + else: + state.service_allowance_in_flight -= allowance + state.service_side_service_allowance += allowance + + +def _verify_invocation_advance_and_update_state( + initial_metadata, payload, completion, allowance, state, implementation): + if initial_metadata is not None: + if state.service_initial_metadata_received: + return 'Later service initial metadata received: %s' % (initial_metadata,) + if state.service_payloads_received: + return 'Service initial metadata received after service payloads: %s' % ( + state.service_payloads_received) + if state.service_completion_received: + return 'Service initial metadata received after service completion!' + if not implementation.metadata_transmitted( + state.service_initial_metadata_in_flight, initial_metadata): + return 'Service initial metadata maltransmitted: %s, %s' % ( + state.service_initial_metadata_in_flight, initial_metadata) + else: + state.service_initial_metadata_in_flight = None + state.service_initial_metadata_received = True + + if payload is not None: + if state.service_completion_received: + return 'Service payload received after service completion!' + elif not state.service_payloads_in_flight: + return 'Service payload "%s" received but not in flight!' % (payload,) + elif state.service_payloads_in_flight[0] != payload: + return 'Service payload mismatch: %s, %s' % ( + state.invocation_payloads_in_flight[0], payload) + elif state.invocation_side_service_allowance < 1: + return 'Disallowed service payload!' + else: + state.service_payloads_in_flight.pop(0) + state.service_payloads_received += 1 + state.invocation_side_service_allowance -= 1 + + if completion is not None: + if state.service_completion_received: + return 'Later service completion received: %s' % (completion,) + elif not implementation.completion_transmitted( + state.service_completion_in_flight, completion): + return 'Service completion maltransmitted: %s, %s' % ( + state.service_completion_in_flight, completion) + else: + state.service_completion_in_flight = None + state.service_completion_received = True + + if allowance is not None: + if allowance <= 0: + return 'Illegal allowance value: %s' % (allowance,) + else: + state.invocation_allowance_in_flight -= allowance + state.invocation_side_service_allowance += allowance + + +class Invocation( + collections.namedtuple( + 'Invocation', + ('group', 'method', 'subscription_kind', 'timeout', 'initial_metadata', + 'payload', 'completion',))): + """A description of operation invocation. + + Attributes: + group: The group identifier for the operation. + method: The method identifier for the operation. + subscription_kind: A base.Subscription.Kind value describing the kind of + subscription to use for the operation. + timeout: A duration in seconds to pass as the timeout value for the + operation. + initial_metadata: An object to pass as the initial metadata for the + operation or None. + payload: An object to pass as a payload value for the operation or None. + completion: An object to pass as a completion value for the operation or + None. + """ + + +class OnAdvance( + collections.namedtuple( + 'OnAdvance', + ('kind', 'initial_metadata', 'payload', 'completion', 'allowance'))): + """Describes action to be taken in a test in response to an advance call. + + Attributes: + kind: A Kind value describing the overall kind of response. + initial_metadata: An initial metadata value to pass to a call of the advance + method of the operator under test. Only valid if kind is Kind.ADVANCE and + may be None. + payload: A payload value to pass to a call of the advance method of the + operator under test. Only valid if kind is Kind.ADVANCE and may be None. + completion: A base.Completion value to pass to a call of the advance method + of the operator under test. Only valid if kind is Kind.ADVANCE and may be + None. + allowance: An allowance value to pass to a call of the advance method of the + operator under test. Only valid if kind is Kind.ADVANCE and may be None. + """ + + @enum.unique + class Kind(enum.Enum): + ADVANCE = 'advance' + DEFECT = 'defect' + IDLE = 'idle' + + +_DEFECT_ON_ADVANCE = OnAdvance(OnAdvance.Kind.DEFECT, None, None, None, None) +_IDLE_ON_ADVANCE = OnAdvance(OnAdvance.Kind.IDLE, None, None, None, None) + + +class Instruction( + collections.namedtuple( + 'Instruction', + ('kind', 'advance_args', 'advance_kwargs', 'conclude_success', + 'conclude_message', 'conclude_invocation_outcome_kind', + 'conclude_service_outcome_kind',))): + """""" + + @enum.unique + class Kind(enum.Enum): + ADVANCE = 'ADVANCE' + CANCEL = 'CANCEL' + CONCLUDE = 'CONCLUDE' + + +class Controller(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def failed(self, message): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def invocation(self): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def poll(self): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def on_service_advance( + self, initial_metadata, payload, completion, allowance): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def on_invocation_advance( + self, initial_metadata, payload, completion, allowance): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def service_on_termination(self, outcome): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def invocation_on_termination(self, outcome): + """""" + raise NotImplementedError() + + +class ControllerCreator(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """""" + raise NotImplementedError() + + @abc.abstractmethod + def controller(self, implementation, randomness): + """""" + raise NotImplementedError() + + +class _Remainder( + collections.namedtuple( + '_Remainder', + ('invocation_payloads', 'service_payloads', 'invocation_completion', + 'service_completion',))): + """Describes work remaining to be done in a portion of a test. + + Attributes: + invocation_payloads: The number of payloads to be sent from the invocation + side of the operation to the service side of the operation. + service_payloads: The number of payloads to be sent from the service side of + the operation to the invocation side of the operation. + invocation_completion: Whether or not completion from the invocation side of + the operation should be indicated and has yet to be indicated. + service_completion: Whether or not completion from the service side of the + operation should be indicated and has yet to be indicated. + """ + + +class _SequenceController(Controller): + + def __init__(self, sequence, implementation, randomness): + """Constructor. + + Args: + sequence: A _sequence.Sequence describing the steps to be taken in the + test at a relatively high level. + implementation: A test_interfaces.Implementation encapsulating the + base interface implementation that is the system under test. + randomness: A random.Random instance for use in the test. + """ + self._condition = threading.Condition() + self._sequence = sequence + self._implementation = implementation + self._randomness = randomness + + self._until = None + self._remaining_elements = None + self._poll_next = None + self._message = None + + self._state = _state.OperationState() + self._todo = None + + # called with self._condition + def _failed(self, message): + self._message = message + self._condition.notify_all() + + def _passed(self, invocation_outcome, service_outcome): + self._poll_next = Instruction( + Instruction.Kind.CONCLUDE, None, None, True, None, invocation_outcome, + service_outcome) + self._condition.notify_all() + + def failed(self, message): + with self._condition: + self._failed(message) + + def serialize_request(self, request): + return request + request + + def deserialize_request(self, serialized_request): + return serialized_request[:len(serialized_request) / 2] + + def serialize_response(self, response): + return response * 3 + + def deserialize_response(self, serialized_response): + return serialized_response[2 * len(serialized_response) / 3:] + + def invocation(self): + with self._condition: + self._until = time.time() + self._sequence.maximum_duration + self._remaining_elements = list(self._sequence.elements) + if self._sequence.invocation.initial_metadata: + initial_metadata = self._implementation.invocation_initial_metadata() + self._state.invocation_initial_metadata_in_flight = initial_metadata + else: + initial_metadata = None + if self._sequence.invocation.payload: + payload = _create_payload(self._randomness) + self._state.invocation_payloads_in_flight.append(payload) + else: + payload = None + if self._sequence.invocation.complete: + completion = self._implementation.invocation_completion() + self._state.invocation_completion_in_flight = completion + else: + completion = None + return Invocation( + _GROUP, _METHOD, base.Subscription.Kind.FULL, + self._sequence.invocation.timeout, initial_metadata, payload, + completion) + + def poll(self): + with self._condition: + while True: + if self._message is not None: + return Instruction( + Instruction.Kind.CONCLUDE, None, None, False, self._message, None, + None) + elif self._poll_next: + poll_next = self._poll_next + self._poll_next = None + return poll_next + elif self._until < time.time(): + return Instruction( + Instruction.Kind.CONCLUDE, None, None, False, + 'overran allotted time!', None, None) + else: + self._condition.wait(timeout=self._until-time.time()) + + def on_service_advance( + self, initial_metadata, payload, completion, allowance): + with self._condition: + message = _verify_service_advance_and_update_state( + initial_metadata, payload, completion, allowance, self._state, + self._implementation) + if message is not None: + self._failed(message) + if self._todo is not None: + raise ValueError('TODO!!!') + elif _anything_in_flight(self._state): + return _IDLE_ON_ADVANCE + elif self._remaining_elements: + element = self._remaining_elements.pop(0) + if element.kind is _sequence.Element.Kind.SERVICE_TRANSMISSION: + if element.transmission.initial_metadata: + initial_metadata = self._implementation.service_initial_metadata() + self._state.service_initial_metadata_in_flight = initial_metadata + else: + initial_metadata = None + if element.transmission.payload: + payload = _create_payload(self._randomness) + self._state.service_payloads_in_flight.append(payload) + self._state.service_side_service_allowance -= 1 + else: + payload = None + if element.transmission.complete: + completion = self._implementation.service_completion() + self._state.service_completion_in_flight = completion + else: + completion = None + if (not self._state.invocation_completion_received and + 0 <= self._state.service_side_invocation_allowance): + allowance = 1 + self._state.service_side_invocation_allowance += 1 + self._state.invocation_allowance_in_flight += 1 + else: + allowance = None + return OnAdvance( + OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion, + allowance) + else: + raise ValueError('TODO!!!') + else: + return _IDLE_ON_ADVANCE + + def on_invocation_advance( + self, initial_metadata, payload, completion, allowance): + with self._condition: + message = _verify_invocation_advance_and_update_state( + initial_metadata, payload, completion, allowance, self._state, + self._implementation) + if message is not None: + self._failed(message) + if self._todo is not None: + raise ValueError('TODO!!!') + elif _anything_in_flight(self._state): + return _IDLE_ON_ADVANCE + elif self._remaining_elements: + element = self._remaining_elements.pop(0) + if element.kind is _sequence.Element.Kind.INVOCATION_TRANSMISSION: + if element.transmission.initial_metadata: + initial_metadata = self._implementation.invocation_initial_metadata() + self._state.invocation_initial_metadata_in_fight = initial_metadata + else: + initial_metadata = None + if element.transmission.payload: + payload = _create_payload(self._randomness) + self._state.invocation_payloads_in_flight.append(payload) + self._state.invocation_side_invocation_allowance -= 1 + else: + payload = None + if element.transmission.complete: + completion = self._implementation.invocation_completion() + self._state.invocation_completion_in_flight = completion + else: + completion = None + if (not self._state.service_completion_received and + 0 <= self._state.invocation_side_service_allowance): + allowance = 1 + self._state.invocation_side_service_allowance += 1 + self._state.service_allowance_in_flight += 1 + else: + allowance = None + return OnAdvance( + OnAdvance.Kind.ADVANCE, initial_metadata, payload, completion, + allowance) + else: + raise ValueError('TODO!!!') + else: + return _IDLE_ON_ADVANCE + + def service_on_termination(self, outcome): + with self._condition: + self._state.service_side_outcome = outcome + if self._todo is not None or self._remaining_elements: + self._failed('Premature service-side outcome %s!' % (outcome,)) + elif outcome.kind is not self._sequence.outcome_kinds.service: + self._failed( + 'Incorrect service-side outcome kind: %s should have been %s' % ( + outcome.kind, self._sequence.outcome_kinds.service)) + elif self._state.invocation_side_outcome is not None: + self._passed(self._state.invocation_side_outcome.kind, outcome.kind) + + def invocation_on_termination(self, outcome): + with self._condition: + self._state.invocation_side_outcome = outcome + if self._todo is not None or self._remaining_elements: + self._failed('Premature invocation-side outcome %s!' % (outcome,)) + elif outcome.kind is not self._sequence.outcome_kinds.invocation: + self._failed( + 'Incorrect invocation-side outcome kind: %s should have been %s' % ( + outcome.kind, self._sequence.outcome_kinds.invocation)) + elif self._state.service_side_outcome is not None: + self._passed(outcome.kind, self._state.service_side_outcome.kind) + + +class _SequenceControllerCreator(ControllerCreator): + + def __init__(self, sequence): + self._sequence = sequence + + def name(self): + return self._sequence.name + + def controller(self, implementation, randomness): + return _SequenceController(self._sequence, implementation, randomness) + + +CONTROLLER_CREATORS = tuple( + _SequenceControllerCreator(sequence) for sequence in _sequence.SEQUENCES) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/_sequence.py b/src/python/grpcio/tests/unit/framework/interfaces/base/_sequence.py new file mode 100644 index 0000000000..571d0e1e63 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/_sequence.py @@ -0,0 +1,171 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Part of the tests of the base interface of RPC Framework.""" + +import collections +import enum + +from grpc.framework.interfaces.base import base +from tests.unit.framework.common import test_constants + + +class Invocation( + collections.namedtuple( + 'Invocation', ('timeout', 'initial_metadata', 'payload', 'complete',))): + """A recipe for operation invocation. + + Attributes: + timeout: A duration in seconds to pass to the system under test as the + operation's timeout value. + initial_metadata: A boolean indicating whether or not to pass initial + metadata when invoking the operation. + payload: A boolean indicating whether or not to pass a payload when + invoking the operation. + complete: A boolean indicating whether or not to indicate completion of + transmissions from the invoking side of the operation when invoking the + operation. + """ + + +class Transmission( + collections.namedtuple( + 'Transmission', ('initial_metadata', 'payload', 'complete',))): + """A recipe for a single transmission in an operation. + + Attributes: + initial_metadata: A boolean indicating whether or not to pass initial + metadata as part of the transmission. + payload: A boolean indicating whether or not to pass a payload as part of + the transmission. + complete: A boolean indicating whether or not to indicate completion of + transmission from the transmitting side of the operation as part of the + transmission. + """ + + +class Intertransmission( + collections.namedtuple('Intertransmission', ('invocation', 'service',))): + """A recipe for multiple transmissions in an operation. + + Attributes: + invocation: An integer describing the number of payloads to send from the + invocation side of the operation to the service side. + service: An integer describing the number of payloads to send from the + service side of the operation to the invocation side. + """ + + +class Element(collections.namedtuple('Element', ('kind', 'transmission',))): + """A sum type for steps to perform when testing an operation. + + Attributes: + kind: A Kind value describing the kind of step to perform in the test. + transmission: Only valid for kinds Kind.INVOCATION_TRANSMISSION and + Kind.SERVICE_TRANSMISSION, a Transmission value describing the details of + the transmission to be made. + """ + + @enum.unique + class Kind(enum.Enum): + INVOCATION_TRANSMISSION = 'invocation transmission' + SERVICE_TRANSMISSION = 'service transmission' + INTERTRANSMISSION = 'intertransmission' + INVOCATION_CANCEL = 'invocation cancel' + SERVICE_CANCEL = 'service cancel' + INVOCATION_FAILURE = 'invocation failure' + SERVICE_FAILURE = 'service failure' + + +class OutcomeKinds( + collections.namedtuple('Outcome', ('invocation', 'service',))): + """A description of the expected outcome of an operation test. + + Attributes: + invocation: The base.Outcome.Kind value expected on the invocation side of + the operation. + service: The base.Outcome.Kind value expected on the service side of the + operation. + """ + + +class Sequence( + collections.namedtuple( + 'Sequence', + ('name', 'maximum_duration', 'invocation', 'elements', + 'outcome_kinds',))): + """Describes at a high level steps to perform in a test. + + Attributes: + name: The string name of the sequence. + maximum_duration: A length of time in seconds to allow for the test before + declaring it to have failed. + invocation: An Invocation value describing how to invoke the operation + under test. + elements: A sequence of Element values describing at coarse granularity + actions to take during the operation under test. + outcome_kinds: An OutcomeKinds value describing the expected outcome kinds + of the test. + """ + +_EASY = Sequence( + 'Easy', + test_constants.TIME_ALLOWANCE, + Invocation(test_constants.LONG_TIMEOUT, True, True, True), + ( + Element( + Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, True)), + ), + OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED)) + +_PEASY = Sequence( + 'Peasy', + test_constants.TIME_ALLOWANCE, + Invocation(test_constants.LONG_TIMEOUT, True, True, False), + ( + Element( + Element.Kind.SERVICE_TRANSMISSION, Transmission(True, True, False)), + Element( + Element.Kind.INVOCATION_TRANSMISSION, + Transmission(False, True, True)), + Element( + Element.Kind.SERVICE_TRANSMISSION, Transmission(False, True, True)), + ), + OutcomeKinds(base.Outcome.Kind.COMPLETED, base.Outcome.Kind.COMPLETED)) + + +# TODO(issue 2959): Finish this test suite. This tuple of sequences should +# contain at least the values in the Cartesian product of (half-duplex, +# full-duplex) * (zero payloads, one payload, test_constants.STREAM_LENGTH +# payloads) * (completion, cancellation, expiration, programming defect in +# servicer code). +SEQUENCES = ( + _EASY, + _PEASY, +) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/_state.py b/src/python/grpcio/tests/unit/framework/interfaces/base/_state.py new file mode 100644 index 0000000000..21cf33aeb6 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/_state.py @@ -0,0 +1,55 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Part of the tests of the base interface of RPC Framework.""" + + +class OperationState(object): + + def __init__(self): + self.invocation_initial_metadata_in_flight = None + self.invocation_initial_metadata_received = False + self.invocation_payloads_in_flight = [] + self.invocation_payloads_received = 0 + self.invocation_completion_in_flight = None + self.invocation_completion_received = False + self.service_initial_metadata_in_flight = None + self.service_initial_metadata_received = False + self.service_payloads_in_flight = [] + self.service_payloads_received = 0 + self.service_completion_in_flight = None + self.service_completion_received = False + self.invocation_side_invocation_allowance = 1 + self.invocation_side_service_allowance = 1 + self.service_side_invocation_allowance = 1 + self.service_side_service_allowance = 1 + self.invocation_allowance_in_flight = 0 + self.service_allowance_in_flight = 0 + self.invocation_side_outcome = None + self.service_side_outcome = None diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py new file mode 100644 index 0000000000..4f8e26c9a2 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/test_cases.py @@ -0,0 +1,277 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of the base interface of RPC Framework.""" + +import logging +import random +import threading +import time +import unittest + +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.base import utilities +from tests.unit.framework.common import test_constants +from tests.unit.framework.interfaces.base import _control +from tests.unit.framework.interfaces.base import test_interfaces + +_SYNCHRONICITY_VARIATION = (('Sync', False), ('Async', True)) + +_EMPTY_OUTCOME_KIND_DICT = { + outcome_kind: 0 for outcome_kind in base.Outcome.Kind} + + +class _Serialization(test_interfaces.Serialization): + + def serialize_request(self, request): + return request + request + + def deserialize_request(self, serialized_request): + return serialized_request[:len(serialized_request) / 2] + + def serialize_response(self, response): + return response * 3 + + def deserialize_response(self, serialized_response): + return serialized_response[2 * len(serialized_response) / 3:] + + +def _advance(quadruples, operator, controller): + try: + for quadruple in quadruples: + operator.advance( + initial_metadata=quadruple[0], payload=quadruple[1], + completion=quadruple[2], allowance=quadruple[3]) + except Exception as e: # pylint: disable=broad-except + controller.failed('Exception on advance: %e' % e) + + +class _Operator(base.Operator): + + def __init__(self, controller, on_advance, pool, operator_under_test): + self._condition = threading.Condition() + self._controller = controller + self._on_advance = on_advance + self._pool = pool + self._operator_under_test = operator_under_test + self._pending_advances = [] + + def set_operator_under_test(self, operator_under_test): + with self._condition: + self._operator_under_test = operator_under_test + pent_advances = self._pending_advances + self._pending_advances = [] + pool = self._pool + controller = self._controller + + if pool is None: + _advance(pent_advances, operator_under_test, controller) + else: + pool.submit(_advance, pent_advances, operator_under_test, controller) + + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + on_advance = self._on_advance( + initial_metadata, payload, completion, allowance) + if on_advance.kind is _control.OnAdvance.Kind.ADVANCE: + with self._condition: + pool = self._pool + operator_under_test = self._operator_under_test + controller = self._controller + + quadruple = ( + on_advance.initial_metadata, on_advance.payload, + on_advance.completion, on_advance.allowance) + if pool is None: + _advance((quadruple,), operator_under_test, controller) + else: + pool.submit(_advance, (quadruple,), operator_under_test, controller) + elif on_advance.kind is _control.OnAdvance.Kind.DEFECT: + raise ValueError( + 'Deliberately raised exception from Operator.advance (in a test)!') + + +class _ProtocolReceiver(base.ProtocolReceiver): + + def __init__(self): + self._condition = threading.Condition() + self._contexts = [] + + def context(self, protocol_context): + with self._condition: + self._contexts.append(protocol_context) + + +class _Servicer(base.Servicer): + """A base.Servicer with instrumented for testing.""" + + def __init__(self, group, method, controllers, pool): + self._condition = threading.Condition() + self._group = group + self._method = method + self._pool = pool + self._controllers = list(controllers) + + def service(self, group, method, context, output_operator): + with self._condition: + controller = self._controllers.pop(0) + if group != self._group or method != self._method: + controller.fail( + '%s != %s or %s != %s' % (group, self._group, method, self._method)) + raise base.NoSuchMethodError(None, None) + else: + operator = _Operator( + controller, controller.on_service_advance, self._pool, + output_operator) + outcome = context.add_termination_callback( + controller.service_on_termination) + if outcome is not None: + controller.service_on_termination(outcome) + return utilities.full_subscription(operator, _ProtocolReceiver()) + + +class _OperationTest(unittest.TestCase): + + def setUp(self): + if self._synchronicity_variation: + self._pool = logging_pool.pool(test_constants.POOL_SIZE) + else: + self._pool = None + self._controller = self._controller_creator.controller( + self._implementation, self._randomness) + + def tearDown(self): + if self._synchronicity_variation: + self._pool.shutdown(wait=True) + else: + self._pool = None + + def test_operation(self): + invocation = self._controller.invocation() + if invocation.subscription_kind is base.Subscription.Kind.FULL: + test_operator = _Operator( + self._controller, self._controller.on_invocation_advance, + self._pool, None) + subscription = utilities.full_subscription( + test_operator, _ProtocolReceiver()) + else: + # TODO(nathaniel): support and test other subscription kinds. + self.fail('Non-full subscriptions not yet supported!') + + servicer = _Servicer( + invocation.group, invocation.method, (self._controller,), self._pool) + + invocation_end, service_end, memo = self._implementation.instantiate( + {(invocation.group, invocation.method): _Serialization()}, servicer) + + try: + invocation_end.start() + service_end.start() + operation_context, operator_under_test = invocation_end.operate( + invocation.group, invocation.method, subscription, invocation.timeout, + initial_metadata=invocation.initial_metadata, payload=invocation.payload, + completion=invocation.completion) + test_operator.set_operator_under_test(operator_under_test) + outcome = operation_context.add_termination_callback( + self._controller.invocation_on_termination) + if outcome is not None: + self._controller.invocation_on_termination(outcome) + except Exception as e: # pylint: disable=broad-except + self._controller.failed('Exception on invocation: %s' % e) + self.fail(e) + + while True: + instruction = self._controller.poll() + if instruction.kind is _control.Instruction.Kind.ADVANCE: + try: + test_operator.advance( + *instruction.advance_args, **instruction.advance_kwargs) + except Exception as e: # pylint: disable=broad-except + self._controller.failed('Exception on instructed advance: %s' % e) + elif instruction.kind is _control.Instruction.Kind.CANCEL: + try: + operation_context.cancel() + except Exception as e: # pylint: disable=broad-except + self._controller.failed('Exception on cancel: %s' % e) + elif instruction.kind is _control.Instruction.Kind.CONCLUDE: + break + + invocation_stop_event = invocation_end.stop(0) + service_stop_event = service_end.stop(0) + invocation_stop_event.wait() + service_stop_event.wait() + invocation_stats = invocation_end.operation_stats() + service_stats = service_end.operation_stats() + + self._implementation.destantiate(memo) + + self.assertTrue( + instruction.conclude_success, msg=instruction.conclude_message) + + expected_invocation_stats = dict(_EMPTY_OUTCOME_KIND_DICT) + expected_invocation_stats[ + instruction.conclude_invocation_outcome_kind] += 1 + self.assertDictEqual(expected_invocation_stats, invocation_stats) + expected_service_stats = dict(_EMPTY_OUTCOME_KIND_DICT) + expected_service_stats[instruction.conclude_service_outcome_kind] += 1 + self.assertDictEqual(expected_service_stats, service_stats) + + +def test_cases(implementation): + """Creates unittest.TestCase classes for a given Base implementation. + + Args: + implementation: A test_interfaces.Implementation specifying creation and + destruction of the Base implementation under test. + + Returns: + A sequence of subclasses of unittest.TestCase defining tests of the + specified Base layer implementation. + """ + random_seed = hash(time.time()) + logging.warning('Random seed for this execution: %s', random_seed) + randomness = random.Random(x=random_seed) + + test_case_classes = [] + for synchronicity_variation in _SYNCHRONICITY_VARIATION: + for controller_creator in _control.CONTROLLER_CREATORS: + name = ''.join( + (synchronicity_variation[0], controller_creator.name(), 'Test',)) + test_case_classes.append( + type(name, (_OperationTest,), + {'_implementation': implementation, + '_randomness': randomness, + '_synchronicity_variation': synchronicity_variation[1], + '_controller_creator': controller_creator, + '__module__': implementation.__module__, + })) + + return test_case_classes diff --git a/src/python/grpcio/tests/unit/framework/interfaces/base/test_interfaces.py b/src/python/grpcio/tests/unit/framework/interfaces/base/test_interfaces.py new file mode 100644 index 0000000000..84afd24d47 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/base/test_interfaces.py @@ -0,0 +1,186 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces used in tests of implementations of the Base layer.""" + +import abc + +from grpc.framework.interfaces.base import base # pylint: disable=unused-import + + +class Serialization(object): + """Specifies serialization and deserialization of test payloads.""" + __metaclass__ = abc.ABCMeta + + def serialize_request(self, request): + """Serializes a request value used in a test. + + Args: + request: A request value created by a test. + + Returns: + A bytestring that is the serialization of the given request. + """ + raise NotImplementedError() + + def deserialize_request(self, serialized_request): + """Deserializes a request value used in a test. + + Args: + serialized_request: A bytestring that is the serialization of some request + used in a test. + + Returns: + The request value encoded by the given bytestring. + """ + raise NotImplementedError() + + def serialize_response(self, response): + """Serializes a response value used in a test. + + Args: + response: A response value created by a test. + + Returns: + A bytestring that is the serialization of the given response. + """ + raise NotImplementedError() + + def deserialize_response(self, serialized_response): + """Deserializes a response value used in a test. + + Args: + serialized_response: A bytestring that is the serialization of some + response used in a test. + + Returns: + The response value encoded by the given bytestring. + """ + raise NotImplementedError() + + +class Implementation(object): + """Specifies an implementation of the Base layer.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def instantiate(self, serializations, servicer): + """Instantiates the Base layer implementation to be used in a test. + + Args: + serializations: A dict from group-method pair to Serialization object + specifying how to serialize and deserialize payload values used in the + test. + servicer: A base.Servicer object to be called to service RPCs made during + the test. + + Returns: + A sequence of length three the first element of which is a + base.End to be used to invoke RPCs, the second element of which is a + base.End to be used to service invoked RPCs, and the third element of + which is an arbitrary memo object to be kept and passed to destantiate + at the conclusion of the test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def destantiate(self, memo): + """Destroys the Base layer implementation under test. + + Args: + memo: The object from the third position of the return value of a call to + instantiate. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invocation_initial_metadata(self): + """Provides an operation's invocation-side initial metadata. + + Returns: + A value to use for an operation's invocation-side initial metadata, or + None. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_initial_metadata(self): + """Provides an operation's service-side initial metadata. + + Returns: + A value to use for an operation's service-side initial metadata, or + None. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invocation_completion(self): + """Provides an operation's invocation-side completion. + + Returns: + A base.Completion to use for an operation's invocation-side completion. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_completion(self): + """Provides an operation's service-side completion. + + Returns: + A base.Completion to use for an operation's service-side completion. + """ + raise NotImplementedError() + + @abc.abstractmethod + def metadata_transmitted(self, original_metadata, transmitted_metadata): + """Identifies whether or not metadata was properly transmitted. + + Args: + original_metadata: A metadata value passed to the system under test. + transmitted_metadata: The same metadata value after having been + transmitted through the system under test. + + Returns: + Whether or not the metadata was properly transmitted. + """ + raise NotImplementedError() + + @abc.abstractmethod + def completion_transmitted(self, original_completion, transmitted_completion): + """Identifies whether or not a base.Completion was properly transmitted. + + Args: + original_completion: A base.Completion passed to the system under test. + transmitted_completion: The same completion value after having been + transmitted through the system under test. + + Returns: + Whether or not the completion was properly transmitted. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_3069_test_constant.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_3069_test_constant.py new file mode 100644 index 0000000000..1ea356c0bf --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_3069_test_constant.py @@ -0,0 +1,37 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A test constant working around issue 3069.""" + +# test_constants is referenced from specification in this module. +from tests.unit.framework.common import test_constants # pylint: disable=unused-import + +# TODO(issue 3069): Replace uses of this constant with +# test_constants.SHORT_TIMEOUT. +REALLY_SHORT_TIMEOUT = 0.1 diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/__init__.py b/src/python/grpcio/tests/unit/framework/interfaces/face/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py new file mode 100644 index 0000000000..3bcefa601d --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -0,0 +1,252 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test code for the Face layer of RPC Framework.""" + +import abc +import unittest + +# test_interfaces is referenced from specification in this module. +from grpc.framework.interfaces.face import face +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control +from tests.unit.framework.common import test_coverage +from tests.unit.framework.interfaces.face import _3069_test_constant +from tests.unit.framework.interfaces.face import _digest +from tests.unit.framework.interfaces.face import _stock_service +from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + + +class TestCase(test_coverage.Coverage, unittest.TestCase): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must have an "implementation" attribute of type + test_interfaces.Implementation and an "invoker_constructor" attribute of type + _invocation.InvokerConstructor. + """ + __metaclass__ = abc.ABCMeta + + NAME = 'BlockingInvocationInlineServiceTest' + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self._control = test_control.PauseFailControl() + self._digest = _digest.digest( + _stock_service.STOCK_TEST_SERVICE, self._control, None) + + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.inline_method_implementations, None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self._invoker = None + self.implementation.destantiate(self._memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response, call = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT, with_call=True) + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response, call = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT, with_call=True) + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response = self._invoker.blocking(group, method)( + first_request, test_constants.LONG_TIMEOUT) + + test_messages.verify(first_request, first_response, self) + + second_response = self._invoker.blocking(group, method)( + second_request, test_constants.LONG_TIMEOUT) + + test_messages.verify(second_request, second_response, self) + + @unittest.skip('Parallel invocations impossible with blocking control flow!') + def testParallelInvocations(self): + raise NotImplementedError() + + @unittest.skip('Parallel invocations impossible with blocking control flow!') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledUnaryRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledUnaryRequestStreamResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledStreamRequestUnaryResponse(self): + raise NotImplementedError() + + @unittest.skip('Cancellation impossible with blocking control flow!') + def testCancelledStreamRequestStreamResponse(self): + raise NotImplementedError() + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + self._invoker.blocking(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.blocking(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + self._invoker.blocking(group, method)( + iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(), self.assertRaises( + face.ExpirationError): + response_iterator = self._invoker.blocking(group, method)( + iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(), self.assertRaises(face.RemoteError): + self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.fail(), self.assertRaises(face.RemoteError): + response_iterator = self._invoker.blocking(group, method)( + request, test_constants.LONG_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(), self.assertRaises(face.RemoteError): + self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.fail(), self.assertRaises(face.RemoteError): + response_iterator = self._invoker.blocking(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py new file mode 100644 index 0000000000..9304b6b1db --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_digest.py @@ -0,0 +1,444 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Code for making a service.TestService more amenable to use in tests.""" + +import collections +import threading + +# test_control, _service, and test_interfaces are referenced from specification +# in this module. +from grpc.framework.common import cardinality +from grpc.framework.common import style +from grpc.framework.foundation import stream +from grpc.framework.foundation import stream_util +from grpc.framework.interfaces.face import face +from tests.unit.framework.common import test_control # pylint: disable=unused-import +from tests.unit.framework.interfaces.face import _service # pylint: disable=unused-import +from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + +_IDENTITY = lambda x: x + + +class TestServiceDigest( + collections.namedtuple( + 'TestServiceDigest', + ('methods', + 'inline_method_implementations', + 'event_method_implementations', + 'multi_method_implementation', + 'unary_unary_messages_sequences', + 'unary_stream_messages_sequences', + 'stream_unary_messages_sequences', + 'stream_stream_messages_sequences',))): + """A transformation of a service.TestService. + + Attributes: + methods: A dict from method group-name pair to test_interfaces.Method object + describing the RPC methods that may be called during the test. + inline_method_implementations: A dict from method group-name pair to + face.MethodImplementation object to be used in tests of in-line calls to + behaviors under test. + event_method_implementations: A dict from method group-name pair to + face.MethodImplementation object to be used in tests of event-driven calls + to behaviors under test. + multi_method_implementation: A face.MultiMethodImplementation to be used in + tests of generic calls to behaviors under test. + unary_unary_messages_sequences: A dict from method group-name pair to + sequence of service.UnaryUnaryTestMessages objects to be used to test the + identified method. + unary_stream_messages_sequences: A dict from method group-name pair to + sequence of service.UnaryStreamTestMessages objects to be used to test the + identified method. + stream_unary_messages_sequences: A dict from method group-name pair to + sequence of service.StreamUnaryTestMessages objects to be used to test the + identified method. + stream_stream_messages_sequences: A dict from method group-name pair to + sequence of service.StreamStreamTestMessages objects to be used to test + the identified method. + """ + + +class _BufferingConsumer(stream.Consumer): + """A trivial Consumer that dumps what it consumes in a user-mutable buffer.""" + + def __init__(self): + self.consumed = [] + self.terminated = False + + def consume(self, value): + self.consumed.append(value) + + def terminate(self): + self.terminated = True + + def consume_and_terminate(self, value): + self.consumed.append(value) + self.terminated = True + + +class _InlineUnaryUnaryMethod(face.MethodImplementation): + + def __init__(self, unary_unary_test_method, control): + self._test_method = unary_unary_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.INLINE + + def unary_unary_inline(self, request, context): + response_list = [] + self._test_method.service( + request, response_list.append, context, self._control) + return response_list.pop(0) + + +class _EventUnaryUnaryMethod(face.MethodImplementation): + + def __init__(self, unary_unary_test_method, control, pool): + self._test_method = unary_unary_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.UNARY_UNARY + self.style = style.Service.EVENT + + def unary_unary_event(self, request, response_callback, context): + if self._pool is None: + self._test_method.service( + request, response_callback, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_callback, context, + self._control) + + +class _InlineUnaryStreamMethod(face.MethodImplementation): + + def __init__(self, unary_stream_test_method, control): + self._test_method = unary_stream_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.INLINE + + def unary_stream_inline(self, request, context): + response_consumer = _BufferingConsumer() + self._test_method.service( + request, response_consumer, context, self._control) + for response in response_consumer.consumed: + yield response + + +class _EventUnaryStreamMethod(face.MethodImplementation): + + def __init__(self, unary_stream_test_method, control, pool): + self._test_method = unary_stream_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.UNARY_STREAM + self.style = style.Service.EVENT + + def unary_stream_event(self, request, response_consumer, context): + if self._pool is None: + self._test_method.service( + request, response_consumer, context, self._control) + else: + self._pool.submit( + self._test_method.service, request, response_consumer, context, + self._control) + + +class _InlineStreamUnaryMethod(face.MethodImplementation): + + def __init__(self, stream_unary_test_method, control): + self._test_method = stream_unary_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.INLINE + + def stream_unary_inline(self, request_iterator, context): + response_list = [] + request_consumer = self._test_method.service( + response_list.append, context, self._control) + for request in request_iterator: + request_consumer.consume(request) + request_consumer.terminate() + return response_list.pop(0) + + +class _EventStreamUnaryMethod(face.MethodImplementation): + + def __init__(self, stream_unary_test_method, control, pool): + self._test_method = stream_unary_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.STREAM_UNARY + self.style = style.Service.EVENT + + def stream_unary_event(self, response_callback, context): + request_consumer = self._test_method.service( + response_callback, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _InlineStreamStreamMethod(face.MethodImplementation): + + def __init__(self, stream_stream_test_method, control): + self._test_method = stream_stream_test_method + self._control = control + + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.INLINE + + def stream_stream_inline(self, request_iterator, context): + response_consumer = _BufferingConsumer() + request_consumer = self._test_method.service( + response_consumer, context, self._control) + + for request in request_iterator: + request_consumer.consume(request) + while response_consumer.consumed: + yield response_consumer.consumed.pop(0) + response_consumer.terminate() + + +class _EventStreamStreamMethod(face.MethodImplementation): + + def __init__(self, stream_stream_test_method, control, pool): + self._test_method = stream_stream_test_method + self._control = control + self._pool = pool + + self.cardinality = cardinality.Cardinality.STREAM_STREAM + self.style = style.Service.EVENT + + def stream_stream_event(self, response_consumer, context): + request_consumer = self._test_method.service( + response_consumer, context, self._control) + if self._pool is None: + return request_consumer + else: + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _UnaryConsumer(stream.Consumer): + """A Consumer that only allows consumption of exactly one value.""" + + def __init__(self, action): + self._lock = threading.Lock() + self._action = action + self._consumed = False + self._terminated = False + + def consume(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + + self._action(value) + + def terminate(self): + with self._lock: + if not self._consumed: + raise ValueError('Unary consumer hasn\'t yet consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._terminated = True + + def consume_and_terminate(self, value): + with self._lock: + if self._consumed: + raise ValueError('Unary consumer already consumed!') + elif self._terminated: + raise ValueError('Unary consumer already terminated!') + else: + self._consumed = True + self._terminated = True + + self._action(value) + + +class _UnaryUnaryAdaptation(object): + + def __init__(self, unary_unary_test_method): + self._method = unary_unary_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service( + request, response_consumer.consume_and_terminate, context, control) + return _UnaryConsumer(action) + + +class _UnaryStreamAdaptation(object): + + def __init__(self, unary_stream_test_method): + self._method = unary_stream_test_method + + def service(self, response_consumer, context, control): + def action(request): + self._method.service(request, response_consumer, context, control) + return _UnaryConsumer(action) + + +class _StreamUnaryAdaptation(object): + + def __init__(self, stream_unary_test_method): + self._method = stream_unary_test_method + + def service(self, response_consumer, context, control): + return self._method.service( + response_consumer.consume_and_terminate, context, control) + + +class _MultiMethodImplementation(face.MultiMethodImplementation): + + def __init__(self, methods, control, pool): + self._methods = methods + self._control = control + self._pool = pool + + def service(self, group, name, response_consumer, context): + method = self._methods.get(group, name, None) + if method is None: + raise face.NoSuchMethodError(group, name) + elif self._pool is None: + return method(response_consumer, context, self._control) + else: + request_consumer = method(response_consumer, context, self._control) + return stream_util.ThreadSwitchingConsumer(request_consumer, self._pool) + + +class _Assembly( + collections.namedtuple( + '_Assembly', + ['methods', 'inlines', 'events', 'adaptations', 'messages'])): + """An intermediate structure created when creating a TestServiceDigest.""" + + +def _assemble( + scenarios, identifiers, inline_method_constructor, event_method_constructor, + adapter, control, pool): + """Creates an _Assembly from the given scenarios.""" + methods = {} + inlines = {} + events = {} + adaptations = {} + messages = {} + for identifier, scenario in scenarios.iteritems(): + if identifier in identifiers: + raise ValueError('Repeated identifier "(%s, %s)"!' % identifier) + + test_method = scenario[0] + inline_method = inline_method_constructor(test_method, control) + event_method = event_method_constructor(test_method, control, pool) + adaptation = adapter(test_method) + + methods[identifier] = test_method + inlines[identifier] = inline_method + events[identifier] = event_method + adaptations[identifier] = adaptation + messages[identifier] = scenario[1] + + return _Assembly(methods, inlines, events, adaptations, messages) + + +def digest(service, control, pool): + """Creates a TestServiceDigest from a TestService. + + Args: + service: A _service.TestService. + control: A test_control.Control. + pool: If RPC methods should be serviced in a separate thread, a thread pool. + None if RPC methods should be serviced in the thread belonging to the + run-time that calls for their service. + + Returns: + A TestServiceDigest synthesized from the given service.TestService. + """ + identifiers = set() + + unary_unary = _assemble( + service.unary_unary_scenarios(), identifiers, _InlineUnaryUnaryMethod, + _EventUnaryUnaryMethod, _UnaryUnaryAdaptation, control, pool) + identifiers.update(unary_unary.inlines) + + unary_stream = _assemble( + service.unary_stream_scenarios(), identifiers, _InlineUnaryStreamMethod, + _EventUnaryStreamMethod, _UnaryStreamAdaptation, control, pool) + identifiers.update(unary_stream.inlines) + + stream_unary = _assemble( + service.stream_unary_scenarios(), identifiers, _InlineStreamUnaryMethod, + _EventStreamUnaryMethod, _StreamUnaryAdaptation, control, pool) + identifiers.update(stream_unary.inlines) + + stream_stream = _assemble( + service.stream_stream_scenarios(), identifiers, _InlineStreamStreamMethod, + _EventStreamStreamMethod, _IDENTITY, control, pool) + identifiers.update(stream_stream.inlines) + + methods = dict(unary_unary.methods) + methods.update(unary_stream.methods) + methods.update(stream_unary.methods) + methods.update(stream_stream.methods) + adaptations = dict(unary_unary.adaptations) + adaptations.update(unary_stream.adaptations) + adaptations.update(stream_unary.adaptations) + adaptations.update(stream_stream.adaptations) + inlines = dict(unary_unary.inlines) + inlines.update(unary_stream.inlines) + inlines.update(stream_unary.inlines) + inlines.update(stream_stream.inlines) + events = dict(unary_unary.events) + events.update(unary_stream.events) + events.update(stream_unary.events) + events.update(stream_stream.events) + + return TestServiceDigest( + methods, + inlines, + events, + _MultiMethodImplementation(adaptations, control, pool), + unary_unary.messages, + unary_stream.messages, + stream_unary.messages, + stream_stream.messages) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py new file mode 100644 index 0000000000..34db6c3e55 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_event_invocation_synchronous_event_service.py @@ -0,0 +1,381 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test code for the Face layer of RPC Framework.""" + +import abc +import unittest + +# test_interfaces is referenced from specification in this module. +from grpc.framework.interfaces.face import face +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control +from tests.unit.framework.common import test_coverage +from tests.unit.framework.interfaces.face import _3069_test_constant +from tests.unit.framework.interfaces.face import _digest +from tests.unit.framework.interfaces.face import _receiver +from tests.unit.framework.interfaces.face import _stock_service +from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + + +class TestCase(test_coverage.Coverage, unittest.TestCase): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must have an "implementation" attribute of type + test_interfaces.Implementation and an "invoker_constructor" attribute of type + _invocation.InvokerConstructor. + """ + __metaclass__ = abc.ABCMeta + + NAME = 'EventInvocationSynchronousEventServiceTest' + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self._control = test_control.PauseFailControl() + self._digest = _digest.digest( + _stock_service.STOCK_TEST_SERVICE, self._control, None) + + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.event_method_implementations, None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self._invoker = None + self.implementation.destantiate(self._memo) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + response = receiver.unary_response() + + test_messages.verify(request, response, self) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + responses = receiver.stream_responses() + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + response = receiver.unary_response() + + test_messages.verify(requests, response, self) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + responses = receiver.stream_responses() + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + # pylint: disable=cell-var-from-loop + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + second_receiver = _receiver.Receiver() + + def make_second_invocation(): + self._invoker.event(group, method)( + second_request, second_receiver, second_receiver.abort, + test_constants.LONG_TIMEOUT) + + class FirstReceiver(_receiver.Receiver): + + def complete(self, terminal_metadata, code, details): + super(FirstReceiver, self).complete( + terminal_metadata, code, details) + make_second_invocation() + + first_receiver = FirstReceiver() + + self._invoker.event(group, method)( + first_request, first_receiver, first_receiver.abort, + test_constants.LONG_TIMEOUT) + second_receiver.block_until_terminated() + + first_response = first_receiver.unary_response() + second_response = second_receiver.unary_response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + first_receiver = _receiver.Receiver() + second_request = test_messages.request() + second_receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + first_request, first_receiver, first_receiver.abort, + test_constants.LONG_TIMEOUT) + self._invoker.event(group, method)( + second_request, second_receiver, second_receiver.abort, + test_constants.LONG_TIMEOUT) + first_receiver.block_until_terminated() + second_receiver.block_until_terminated() + + first_response = first_receiver.unary_response() + second_response = second_receiver.unary_response() + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.pause(): + call = self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + call.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testCancelledUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + call = self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + call.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testCancelledStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testCancelledStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + call_consumer.cancel() + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.CANCELLED, receiver.abortion().kind) + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.pause(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, + _3069_test_constant.REALLY_SHORT_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.pause(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, + _3069_test_constant.REALLY_SHORT_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for unused_test_messages in test_messages_sequence: + receiver = _receiver.Receiver() + + self._invoker.event(group, method)( + receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, _3069_test_constant.REALLY_SHORT_TIMEOUT) + for request in requests: + call_consumer.consume(request) + receiver.block_until_terminated() + + self.assertIs(face.Abortion.Kind.EXPIRED, receiver.abortion().kind) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.fail(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + receiver = _receiver.Receiver() + + with self._control.fail(): + self._invoker.event(group, method)( + request, receiver, receiver.abort, test_constants.LONG_TIMEOUT) + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + with self._control.fail(): + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + receiver = _receiver.Receiver() + + with self._control.fail(): + call_consumer = self._invoker.event(group, method)( + receiver, receiver.abort, test_constants.LONG_TIMEOUT) + for request in requests: + call_consumer.consume(request) + call_consumer.terminate() + receiver.block_until_terminated() + + self.assertIs( + face.Abortion.Kind.REMOTE_FAILURE, receiver.abortion().kind) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py new file mode 100644 index 0000000000..c178f2f108 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -0,0 +1,435 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Test code for the Face layer of RPC Framework.""" + +import abc +import contextlib +import threading +import unittest + +# test_interfaces is referenced from specification in this module. +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.face import face +from tests.unit.framework.common import test_constants +from tests.unit.framework.common import test_control +from tests.unit.framework.common import test_coverage +from tests.unit.framework.interfaces.face import _3069_test_constant +from tests.unit.framework.interfaces.face import _digest +from tests.unit.framework.interfaces.face import _stock_service +from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + + +class _PauseableIterator(object): + + def __init__(self, upstream): + self._upstream = upstream + self._condition = threading.Condition() + self._paused = False + + @contextlib.contextmanager + def pause(self): + with self._condition: + self._paused = True + yield + with self._condition: + self._paused = False + self._condition.notify_all() + + def __iter__(self): + return self + + def next(self): + with self._condition: + while self._paused: + self._condition.wait() + return next(self._upstream) + + +class _Callback(object): + + def __init__(self): + self._condition = threading.Condition() + self._called = False + self._passed_future = None + self._passed_other_stuff = None + + def __call__(self, *args, **kwargs): + with self._condition: + self._called = True + if args: + self._passed_future = args[0] + if 1 < len(args) or kwargs: + self._passed_other_stuff = tuple(args[1:]), dict(kwargs) + self._condition.notify_all() + + def future(self): + with self._condition: + while True: + if self._passed_other_stuff is not None: + raise ValueError( + 'Test callback passed unexpected values: %s', + self._passed_other_stuff) + elif self._called: + return self._passed_future + else: + self._condition.wait() + + +class TestCase(test_coverage.Coverage, unittest.TestCase): + """A test of the Face layer of RPC Framework. + + Concrete subclasses must have an "implementation" attribute of type + test_interfaces.Implementation and an "invoker_constructor" attribute of type + _invocation.InvokerConstructor. + """ + __metaclass__ = abc.ABCMeta + + NAME = 'FutureInvocationAsynchronousEventServiceTest' + + def setUp(self): + """See unittest.TestCase.setUp for full specification. + + Overriding implementations must call this implementation. + """ + self._control = test_control.PauseFailControl() + self._digest_pool = logging_pool.pool(test_constants.POOL_SIZE) + self._digest = _digest.digest( + _stock_service.STOCK_TEST_SERVICE, self._control, self._digest_pool) + + generic_stub, dynamic_stubs, self._memo = self.implementation.instantiate( + self._digest.methods, self._digest.event_method_implementations, None) + self._invoker = self.invoker_constructor.construct_invoker( + generic_stub, dynamic_stubs, self._digest.methods) + + def tearDown(self): + """See unittest.TestCase.tearDown for full specification. + + Overriding implementations must call this implementation. + """ + self._invoker = None + self.implementation.destantiate(self._memo) + self._digest_pool.shutdown(wait=True) + + def testSuccessfulUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + response = response_future.result() + + test_messages.verify(request, response, self) + self.assertIs(callback.future(), response_future) + + def testSuccessfulUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + response_iterator = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(request, responses, self) + + def testSuccessfulStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + callback = _Callback() + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_future = self._invoker.future(group, method)( + request_iterator, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + future_passed_to_callback = callback.future() + response = future_passed_to_callback.result() + + test_messages.verify(requests, response, self) + self.assertIs(future_passed_to_callback, response_future) + + def testSuccessfulStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + request_iterator = _PauseableIterator(iter(requests)) + + # Use of a paused iterator of requests allows us to test that control is + # returned to calling code before the iterator yields any requests. + with request_iterator.pause(): + response_iterator = self._invoker.future(group, method)( + request_iterator, test_constants.LONG_TIMEOUT) + responses = list(response_iterator) + + test_messages.verify(requests, responses, self) + + def testSequentialInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + + test_messages.verify(first_request, first_response, self) + + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + second_response = second_response_future.result() + + test_messages.verify(second_request, second_response, self) + + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + + @unittest.skip('TODO(nathaniel): implement.') + def testWaitingForSomeButNotAllParallelInvocations(self): + raise NotImplementedError() + + def testCancelledUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + cancel_method_return_value = response_future.cancel() + + self.assertIs(callback.future(), response_future) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + + def testCancelledUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(face.CancellationError): + next(response_iterator) + + def testCancelledStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + response_future.add_done_callback(callback) + cancel_method_return_value = response_future.cancel() + + self.assertIs(callback.future(), response_future) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) + + def testCancelledStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + iter(requests), test_constants.LONG_TIMEOUT) + response_iterator.cancel() + + with self.assertRaises(face.CancellationError): + next(response_iterator) + + def testExpiredUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future( + group, method)(request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testExpiredUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + with self.assertRaises(face.ExpirationError): + list(response_iterator) + + def testExpiredStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = _Callback() + + with self._control.pause(): + response_future = self._invoker.future(group, method)( + iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + self.assertIs(callback.future(), response_future) + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testExpiredStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + with self._control.pause(): + response_iterator = self._invoker.future(group, method)( + iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + with self.assertRaises(face.ExpirationError): + list(response_iterator) + + def testFailedUnaryRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + callback = _Callback() + + with self._control.fail(): + response_future = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + + self.assertIs(callback.future(), response_future) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testFailedUnaryRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + request = test_messages.request() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self._control.fail(), self.assertRaises(face.ExpirationError): + response_iterator = self._invoker.future(group, method)( + request, _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) + + def testFailedStreamRequestUnaryResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + callback = _Callback() + + with self._control.fail(): + response_future = self._invoker.future(group, method)( + iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + response_future.add_done_callback(callback) + + self.assertIs(callback.future(), response_future) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), face.ExpirationError) + with self.assertRaises(face.ExpirationError): + response_future.result() + + def testFailedStreamRequestStreamResponse(self): + for (group, method), test_messages_sequence in ( + self._digest.stream_stream_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = test_messages.requests() + + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is indistinguishable + # from simply not having called its response_consumer before the + # expiration of the RPC. + with self._control.fail(), self.assertRaises(face.ExpirationError): + response_iterator = self._invoker.future(group, method)( + iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) + list(response_iterator) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_invocation.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_invocation.py new file mode 100644 index 0000000000..448e845a08 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_invocation.py @@ -0,0 +1,213 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Coverage across the Face layer's generic-to-dynamic range for invocation.""" + +import abc + +from grpc.framework.common import cardinality + +_CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR = { + cardinality.Cardinality.UNARY_UNARY: 'blocking_unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'blocking_stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', +} + +_CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR = { + cardinality.Cardinality.UNARY_UNARY: 'future_unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'inline_unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'future_stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'inline_stream_stream', +} + +_CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR = { + cardinality.Cardinality.UNARY_UNARY: 'event_unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'event_unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'event_stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'event_stream_stream', +} + +_CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE = { + cardinality.Cardinality.UNARY_UNARY: 'unary_unary', + cardinality.Cardinality.UNARY_STREAM: 'unary_stream', + cardinality.Cardinality.STREAM_UNARY: 'stream_unary', + cardinality.Cardinality.STREAM_STREAM: 'stream_stream', +} + + +class Invoker(object): + """A type used to invoke test RPCs.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def blocking(self, group, name): + """Invokes an RPC with blocking control flow.""" + raise NotImplementedError() + + @abc.abstractmethod + def future(self, group, name): + """Invokes an RPC with future control flow.""" + raise NotImplementedError() + + @abc.abstractmethod + def event(self, group, name): + """Invokes an RPC with event control flow.""" + raise NotImplementedError() + + +class InvokerConstructor(object): + """A type used to create Invokers.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def name(self): + """Specifies the name of the Invoker constructed by this object.""" + raise NotImplementedError() + + @abc.abstractmethod + def construct_invoker(self, generic_stub, dynamic_stubs, methods): + """Constructs an Invoker for the given stubs and methods.""" + raise NotImplementedError() + + +class _GenericInvoker(Invoker): + + def __init__(self, generic_stub, methods): + self._stub = generic_stub + self._methods = methods + + def _behavior(self, group, name, cardinality_to_generic_method): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, cardinality_to_generic_method[method_cardinality]) + return lambda *args, **kwargs: behavior(group, name, *args, **kwargs) + + def blocking(self, group, name): + return self._behavior( + group, name, _CARDINALITY_TO_GENERIC_BLOCKING_BEHAVIOR) + + def future(self, group, name): + return self._behavior(group, name, _CARDINALITY_TO_GENERIC_FUTURE_BEHAVIOR) + + def event(self, group, name): + return self._behavior(group, name, _CARDINALITY_TO_GENERIC_EVENT_BEHAVIOR) + + +class _GenericInvokerConstructor(InvokerConstructor): + + def name(self): + return 'GenericInvoker' + + def construct_invoker(self, generic_stub, dynamic_stub, methods): + return _GenericInvoker(generic_stub, methods) + + +class _MultiCallableInvoker(Invoker): + + def __init__(self, generic_stub, methods): + self._stub = generic_stub + self._methods = methods + + def _multi_callable(self, group, name): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, + _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) + return behavior(group, name) + + def blocking(self, group, name): + return self._multi_callable(group, name) + + def future(self, group, name): + method_cardinality = self._methods[group, name].cardinality() + behavior = getattr( + self._stub, + _CARDINALITY_TO_MULTI_CALLABLE_ATTRIBUTE[method_cardinality]) + if method_cardinality in ( + cardinality.Cardinality.UNARY_UNARY, + cardinality.Cardinality.STREAM_UNARY): + return behavior(group, name).future + else: + return behavior(group, name) + + def event(self, group, name): + return self._multi_callable(group, name).event + + +class _MultiCallableInvokerConstructor(InvokerConstructor): + + def name(self): + return 'MultiCallableInvoker' + + def construct_invoker(self, generic_stub, dynamic_stub, methods): + return _MultiCallableInvoker(generic_stub, methods) + + +class _DynamicInvoker(Invoker): + + def __init__(self, dynamic_stubs, methods): + self._stubs = dynamic_stubs + self._methods = methods + + def blocking(self, group, name): + return getattr(self._stubs[group], name) + + def future(self, group, name): + if self._methods[group, name].cardinality() in ( + cardinality.Cardinality.UNARY_UNARY, + cardinality.Cardinality.STREAM_UNARY): + return getattr(self._stubs[group], name).future + else: + return getattr(self._stubs[group], name) + + def event(self, group, name): + return getattr(self._stubs[group], name).event + + +class _DynamicInvokerConstructor(InvokerConstructor): + + def name(self): + return 'DynamicInvoker' + + def construct_invoker(self, generic_stub, dynamic_stubs, methods): + return _DynamicInvoker(dynamic_stubs, methods) + + +def invoker_constructors(): + """Creates a sequence of InvokerConstructors to use in tests of RPCs. + + Returns: + A sequence of InvokerConstructors. + """ + return ( + _GenericInvokerConstructor(), + _MultiCallableInvokerConstructor(), + _DynamicInvokerConstructor(), + ) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py new file mode 100644 index 0000000000..2e444ff09d --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py @@ -0,0 +1,95 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A utility useful in tests of asynchronous, event-driven interfaces.""" + +import threading + +from grpc.framework.interfaces.face import face + + +class Receiver(face.ResponseReceiver): + """A utility object useful in tests of asynchronous code.""" + + def __init__(self): + self._condition = threading.Condition() + self._initial_metadata = None + self._responses = [] + self._terminal_metadata = None + self._code = None + self._details = None + self._completed = False + self._abortion = None + + def abort(self, abortion): + with self._condition: + self._abortion = abortion + self._condition.notify_all() + + def initial_metadata(self, initial_metadata): + with self._condition: + self._initial_metadata = initial_metadata + + def response(self, response): + with self._condition: + self._responses.append(response) + + def complete(self, terminal_metadata, code, details): + with self._condition: + self._terminal_metadata = terminal_metadata + self._code = code + self._details = details + self._completed = True + self._condition.notify_all() + + def block_until_terminated(self): + with self._condition: + while self._abortion is None and not self._completed: + self._condition.wait() + + def unary_response(self): + with self._condition: + if self._abortion is not None: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + elif len(self._responses) != 1: + raise AssertionError( + '%d responses received, not exactly one!', len(self._responses)) + else: + return self._responses[0] + + def stream_responses(self): + with self._condition: + if self._abortion is None: + return list(self._responses) + else: + raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + + def abortion(self): + with self._condition: + return self._abortion diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_service.py new file mode 100644 index 0000000000..28941e2ad0 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_service.py @@ -0,0 +1,332 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private interfaces implemented by data sets used in Face-layer tests.""" + +import abc + +# face is referenced from specification in this module. +from grpc.framework.interfaces.face import face # pylint: disable=unused-import +from tests.unit.framework.interfaces.face import test_interfaces + + +class UnaryUnaryTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a unary-unary method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_callback, context, control): + """Services an RPC that accepts one message and produces one message. + + Args: + request: The single request message for the RPC. + response_callback: A callback to be called to accept the response message + of the RPC. + context: An face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryUnaryTestMessages(object): + """A type for unary-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, response, test_case): + """Verifies that the computed response matches the given request. + + Args: + request: A request message. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class UnaryStreamTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a unary-stream method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, request, response_consumer, context, control): + """Services an RPC that takes one message and produces a stream of messages. + + Args: + request: The single request message for the RPC. + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: A face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class UnaryStreamTestMessages(object): + """A type for unary-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def request(self): + """Affords a request message. + + Implementations of this method should return a different message with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A request message. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, request, responses, test_case): + """Verifies that the computed responses match the given request. + + Args: + request: A request message. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the request and responses do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamUnaryTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a stream-unary method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_callback, context, control): + """Services an RPC that takes a stream of messages and produces one message. + + Args: + response_callback: A callback to be called to accept the response message + of the RPC. + context: A face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamUnaryTestMessages(object): + """A type for stream-request-unary-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, response, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + response: A response message. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and response do not match, indicating that + there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class StreamStreamTestMethodImplementation(test_interfaces.Method): + """A controllable implementation of a stream-stream method.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def service(self, response_consumer, context, control): + """Services an RPC that accepts and produces streams of messages. + + Args: + response_consumer: A stream.Consumer to be called to accept the response + messages of the RPC. + context: A face.ServicerContext object. + control: A test_control.Control to control execution of this method. + + Returns: + A stream.Consumer with which to accept the request messages of the RPC. + The consumer returned from this method may or may not be invoked to + completion: in the case of RPC abortion, RPC Framework will simply stop + passing messages to this object. Implementations must not assume that + this object will be called to completion of the request stream or even + called at all. + + Raises: + abandonment.Abandoned: May or may not be raised when the RPC has been + aborted. + """ + raise NotImplementedError() + + +class StreamStreamTestMessages(object): + """A type for stream-request-stream-response message pairings.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def requests(self): + """Affords a sequence of request messages. + + Implementations of this method should return a different sequences with each + call so that multiple test executions of the test method may be made with + different inputs. + + Returns: + A sequence of request messages. + """ + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, requests, responses, test_case): + """Verifies that the computed response matches the given requests. + + Args: + requests: A sequence of request messages. + responses: A sequence of response messages. + test_case: A unittest.TestCase object affording useful assertion methods. + + Raises: + AssertionError: If the requests and responses do not match, indicating + that there was some problem executing the RPC under test. + """ + raise NotImplementedError() + + +class TestService(object): + """A specification of implemented methods to use in tests.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def unary_unary_scenarios(self): + """Affords unary-request-unary-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a UnaryUnaryTestMethodImplementation object + and the second element is a sequence of UnaryUnaryTestMethodMessages + objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def unary_stream_scenarios(self): + """Affords unary-request-stream-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a UnaryStreamTestMethodImplementation + object and the second element is a sequence of + UnaryStreamTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_unary_scenarios(self): + """Affords stream-request-unary-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a StreamUnaryTestMethodImplementation + object and the second element is a sequence of + StreamUnaryTestMethodMessages objects. + """ + raise NotImplementedError() + + @abc.abstractmethod + def stream_stream_scenarios(self): + """Affords stream-request-stream-response test methods and their messages. + + Returns: + A dict from method group-name pair to implementation/messages pair. The + first element of the pair is a StreamStreamTestMethodImplementation + object and the second element is a sequence of + StreamStreamTestMethodMessages objects. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_stock_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_stock_service.py new file mode 100644 index 0000000000..5299655bb3 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_stock_service.py @@ -0,0 +1,396 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Examples of Python implementations of the stock.proto Stock service.""" + +from grpc.framework.common import cardinality +from grpc.framework.foundation import abandonment +from grpc.framework.foundation import stream +from tests.unit.framework.common import test_constants +from tests.unit.framework.interfaces.face import _service +from tests.unit._junkdrawer import stock_pb2 + +_STOCK_GROUP_NAME = 'Stock' +_SYMBOL_FORMAT = 'test symbol:%03d' + +# A test-appropriate security-pricing function. :-P +_price = lambda symbol_name: float(hash(symbol_name) % 4096) + + +def _get_last_trade_price(stock_request, stock_reply_callback, control, active): + """A unary-request, unary-response test method.""" + control.control() + if active(): + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol))) + else: + raise abandonment.Abandoned() + + +def _get_last_trade_price_multiple(stock_reply_consumer, control, active): + """A stream-request, stream-response test method.""" + def stock_reply_for_stock_request(stock_request): + control.control() + if active(): + return stock_pb2.StockReply( + symbol=stock_request.symbol, price=_price(stock_request.symbol)) + else: + raise abandonment.Abandoned() + + class StockRequestConsumer(stream.Consumer): + + def consume(self, stock_request): + stock_reply_consumer.consume(stock_reply_for_stock_request(stock_request)) + + def terminate(self): + control.control() + stock_reply_consumer.terminate() + + def consume_and_terminate(self, stock_request): + stock_reply_consumer.consume_and_terminate( + stock_reply_for_stock_request(stock_request)) + + return StockRequestConsumer() + + +def _watch_future_trades(stock_request, stock_reply_consumer, control, active): + """A unary-request, stream-response test method.""" + base_price = _price(stock_request.symbol) + for index in range(stock_request.num_trades_to_watch): + control.control() + if active(): + stock_reply_consumer.consume( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=base_price + index)) + else: + raise abandonment.Abandoned() + stock_reply_consumer.terminate() + + +def _get_highest_trade_price(stock_reply_callback, control, active): + """A stream-request, unary-response test method.""" + + class StockRequestConsumer(stream.Consumer): + """Keeps an ongoing record of the most valuable symbol yet consumed.""" + + def __init__(self): + self._symbol = None + self._price = None + + def consume(self, stock_request): + control.control() + if active(): + if self._price is None: + self._symbol = stock_request.symbol + self._price = _price(stock_request.symbol) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + self._symbol = stock_request.symbol + self._price = candidate_price + + def terminate(self): + control.control() + if active(): + if self._symbol is None: + raise ValueError() + else: + stock_reply_callback( + stock_pb2.StockReply(symbol=self._symbol, price=self._price)) + self._symbol = None + self._price = None + + def consume_and_terminate(self, stock_request): + control.control() + if active(): + if self._price is None: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, + price=_price(stock_request.symbol))) + else: + candidate_price = _price(stock_request.symbol) + if self._price < candidate_price: + stock_reply_callback( + stock_pb2.StockReply( + symbol=stock_request.symbol, price=candidate_price)) + else: + stock_reply_callback( + stock_pb2.StockReply( + symbol=self._symbol, price=self._price)) + + self._symbol = None + self._price = None + + return StockRequestConsumer() + + +class GetLastTradePrice(_service.UnaryUnaryTestMethodImplementation): + """GetLastTradePrice for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'GetLastTradePrice' + + def cardinality(self): + return cardinality.Cardinality.UNARY_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_callback, context, control): + _get_last_trade_price( + request, response_callback, control, context.is_active) + + +class GetLastTradePriceMessages(_service.UnaryUnaryTestMessages): + + def __init__(self): + self._index = 0 + + def request(self): + symbol = _SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest(symbol=symbol) + + def verify(self, request, response, test_case): + test_case.assertEqual(request.symbol, response.symbol) + test_case.assertEqual(_price(request.symbol), response.price) + + +class GetLastTradePriceMultiple(_service.StreamStreamTestMethodImplementation): + """GetLastTradePriceMultiple for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'GetLastTradePriceMultiple' + + def cardinality(self): + return cardinality.Cardinality.STREAM_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_consumer, context, control): + return _get_last_trade_price_multiple( + response_consumer, control, context.is_active) + + +class GetLastTradePriceMultipleMessages(_service.StreamStreamTestMessages): + """Pairs of message streams for use with GetLastTradePriceMultiple.""" + + def __init__(self): + self._index = 0 + + def requests(self): + base_index = self._index + self._index += 1 + return [ + stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % (base_index + index)) + for index in range(test_constants.STREAM_LENGTH)] + + def verify(self, requests, responses, test_case): + test_case.assertEqual(len(requests), len(responses)) + for stock_request, stock_reply in zip(requests, responses): + test_case.assertEqual(stock_request.symbol, stock_reply.symbol) + test_case.assertEqual(_price(stock_request.symbol), stock_reply.price) + + +class WatchFutureTrades(_service.UnaryStreamTestMethodImplementation): + """WatchFutureTrades for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'WatchFutureTrades' + + def cardinality(self): + return cardinality.Cardinality.UNARY_STREAM + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, request, response_consumer, context, control): + _watch_future_trades(request, response_consumer, control, context.is_active) + + +class WatchFutureTradesMessages(_service.UnaryStreamTestMessages): + """Pairs of a single request message and a sequence of response messages.""" + + def __init__(self): + self._index = 0 + + def request(self): + symbol = _SYMBOL_FORMAT % self._index + self._index += 1 + return stock_pb2.StockRequest( + symbol=symbol, num_trades_to_watch=test_constants.STREAM_LENGTH) + + def verify(self, request, responses, test_case): + test_case.assertEqual(test_constants.STREAM_LENGTH, len(responses)) + base_price = _price(request.symbol) + for index, response in enumerate(responses): + test_case.assertEqual(base_price + index, response.price) + + +class GetHighestTradePrice(_service.StreamUnaryTestMethodImplementation): + """GetHighestTradePrice for use in tests.""" + + def group(self): + return _STOCK_GROUP_NAME + + def name(self): + return 'GetHighestTradePrice' + + def cardinality(self): + return cardinality.Cardinality.STREAM_UNARY + + def request_class(self): + return stock_pb2.StockRequest + + def response_class(self): + return stock_pb2.StockReply + + def serialize_request(self, request): + return request.SerializeToString() + + def deserialize_request(self, serialized_request): + return stock_pb2.StockRequest.FromString(serialized_request) + + def serialize_response(self, response): + return response.SerializeToString() + + def deserialize_response(self, serialized_response): + return stock_pb2.StockReply.FromString(serialized_response) + + def service(self, response_callback, context, control): + return _get_highest_trade_price( + response_callback, control, context.is_active) + + +class GetHighestTradePriceMessages(_service.StreamUnaryTestMessages): + + def requests(self): + return [ + stock_pb2.StockRequest(symbol=_SYMBOL_FORMAT % index) + for index in range(test_constants.STREAM_LENGTH)] + + def verify(self, requests, response, test_case): + price = None + symbol = None + for stock_request in requests: + current_symbol = stock_request.symbol + current_price = _price(current_symbol) + if price is None or price < current_price: + price = current_price + symbol = current_symbol + test_case.assertEqual(price, response.price) + test_case.assertEqual(symbol, response.symbol) + + +class StockTestService(_service.TestService): + """A corpus of test data with one method of each RPC cardinality.""" + + def unary_unary_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetLastTradePrice'): ( + GetLastTradePrice(), [GetLastTradePriceMessages()]), + } + + def unary_stream_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'WatchFutureTrades'): ( + WatchFutureTrades(), [WatchFutureTradesMessages()]), + } + + def stream_unary_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetHighestTradePrice'): ( + GetHighestTradePrice(), [GetHighestTradePriceMessages()]) + } + + def stream_stream_scenarios(self): + return { + (_STOCK_GROUP_NAME, 'GetLastTradePriceMultiple'): ( + GetLastTradePriceMultiple(), [GetLastTradePriceMultipleMessages()]), + } + + +STOCK_TEST_SERVICE = StockTestService() diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py new file mode 100644 index 0000000000..462829b660 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/test_cases.py @@ -0,0 +1,69 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tools for creating tests of implementations of the Face layer.""" + +# unittest is referenced from specification in this module. +import unittest # pylint: disable=unused-import + +# test_interfaces is referenced from specification in this module. +from tests.unit.framework.interfaces.face import _blocking_invocation_inline_service +from tests.unit.framework.interfaces.face import _event_invocation_synchronous_event_service +from tests.unit.framework.interfaces.face import _future_invocation_asynchronous_event_service +from tests.unit.framework.interfaces.face import _invocation +from tests.unit.framework.interfaces.face import test_interfaces # pylint: disable=unused-import + +_TEST_CASE_SUPERCLASSES = ( + _blocking_invocation_inline_service.TestCase, + _event_invocation_synchronous_event_service.TestCase, + _future_invocation_asynchronous_event_service.TestCase, +) + + +def test_cases(implementation): + """Creates unittest.TestCase classes for a given Face layer implementation. + + Args: + implementation: A test_interfaces.Implementation specifying creation and + destruction of a given Face layer implementation. + + Returns: + A sequence of subclasses of unittest.TestCase defining tests of the + specified Face layer implementation. + """ + test_case_classes = [] + for invoker_constructor in _invocation.invoker_constructors(): + for super_class in _TEST_CASE_SUPERCLASSES: + test_case_classes.append( + type(invoker_constructor.name() + super_class.NAME, (super_class,), + {'implementation': implementation, + 'invoker_constructor': invoker_constructor, + '__module__': implementation.__module__, + })) + return test_case_classes diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/test_interfaces.py b/src/python/grpcio/tests/unit/framework/interfaces/face/test_interfaces.py new file mode 100644 index 0000000000..b2b5c10fa6 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/test_interfaces.py @@ -0,0 +1,229 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces used in tests of implementations of the Face layer.""" + +import abc + +from grpc.framework.common import cardinality # pylint: disable=unused-import +from grpc.framework.interfaces.face import face # pylint: disable=unused-import + + +class Method(object): + """Specifies a method to be used in tests.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def group(self): + """Identify the group of the method. + + Returns: + The group of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def name(self): + """Identify the name of the method. + + Returns: + The name of the method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def cardinality(self): + """Identify the cardinality of the method. + + Returns: + A cardinality.Cardinality value describing the streaming semantics of the + method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def request_class(self): + """Identify the class used for the method's request objects. + + Returns: + The class object of the class to which the method's request objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def response_class(self): + """Identify the class used for the method's response objects. + + Returns: + The class object of the class to which the method's response objects + belong. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serialize the given request object. + + Args: + request: A request object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Synthesize a request object from a given bytestring. + + Args: + serialized_request: A bytestring deserializable into a request object + appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serialize the given response object. + + Args: + response: A response object appropriate for this method. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Synthesize a response object from a given bytestring. + + Args: + serialized_response: A bytestring deserializable into a response object + appropriate for this method. + """ + raise NotImplementedError() + + +class Implementation(object): + """Specifies an implementation of the Face layer.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def instantiate( + self, methods, method_implementations, + multi_method_implementation): + """Instantiates the Face layer implementation to be used in a test. + + Args: + methods: A sequence of Method objects describing the methods available to + be called during the test. + method_implementations: A dictionary from group-name pair to + face.MethodImplementation object specifying implementation of a method. + multi_method_implementation: A face.MultiMethodImplementation or None. + + Returns: + A sequence of length three the first element of which is a + face.GenericStub, the second element of which is dictionary from groups + to face.DynamicStubs affording invocation of the group's methods, and + the third element of which is an arbitrary memo object to be kept and + passed to destantiate at the conclusion of the test. The returned stubs + must be backed by the provided implementations. + """ + raise NotImplementedError() + + @abc.abstractmethod + def destantiate(self, memo): + """Destroys the Face layer implementation under test. + + Args: + memo: The object from the third position of the return value of a call to + instantiate. + """ + raise NotImplementedError() + + @abc.abstractmethod + def invocation_metadata(self): + """Provides the metadata to be used when invoking a test RPC. + + Returns: + An object to use as the supplied-at-invocation-time metadata in a test + RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def initial_metadata(self): + """Provides the metadata for use as a test RPC's first servicer metadata. + + Returns: + An object to use as the from-the-servicer-before-responses metadata in a + test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminal_metadata(self): + """Provides the metadata for use as a test RPC's second servicer metadata. + + Returns: + An object to use as the from-the-servicer-after-all-responses metadata in + a test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def code(self): + """Provides the value for use as a test RPC's code. + + Returns: + An object to use as the from-the-servicer code in a test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def details(self): + """Provides the value for use as a test RPC's details. + + Returns: + An object to use as the from-the-servicer details in a test RPC. + """ + raise NotImplementedError() + + @abc.abstractmethod + def metadata_transmitted(self, original_metadata, transmitted_metadata): + """Identifies whether or not metadata was properly transmitted. + + Args: + original_metadata: A metadata value passed to the Face interface + implementation under test. + transmitted_metadata: The same metadata value after having been + transmitted via an RPC performed by the Face interface implementation + under test. + + Returns: + Whether or not the metadata was properly transmitted by the Face interface + implementation under test. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/tests/unit/framework/interfaces/links/__init__.py b/src/python/grpcio/tests/unit/framework/interfaces/links/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/links/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/tests/unit/framework/interfaces/links/test_cases.py b/src/python/grpcio/tests/unit/framework/interfaces/links/test_cases.py new file mode 100644 index 0000000000..dace6c23f3 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/links/test_cases.py @@ -0,0 +1,326 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests of the links interface of RPC Framework.""" + +# unittest is referenced from specification in this module. +import abc +import unittest # pylint: disable=unused-import + +from grpc.framework.interfaces.links import links +from tests.unit.framework.common import test_constants +from tests.unit.framework.interfaces.links import test_utilities + + +def at_least_n_payloads_received_predicate(n): + def predicate(ticket_sequence): + payload_count = 0 + for ticket in ticket_sequence: + if ticket.payload is not None: + payload_count += 1 + if n <= payload_count: + return True + else: + return False + return predicate + + +def terminated(ticket_sequence): + return ticket_sequence and ticket_sequence[-1].termination is not None + +_TRANSMISSION_GROUP = 'test.Group' +_TRANSMISSION_METHOD = 'TestMethod' + + +class TransmissionTest(object): + """Tests ticket transmission between two connected links. + + This class must be mixed into a unittest.TestCase that implements the abstract + methods it provides. + """ + __metaclass__ = abc.ABCMeta + + # This is a unittest.TestCase mix-in. + # pylint: disable=invalid-name + + @abc.abstractmethod + def create_transmitting_links(self): + """Creates two connected links for use in this test. + + Returns: + Two links.Links, the first of which will be used on the invocation side + of RPCs and the second of which will be used on the service side of + RPCs. + """ + raise NotImplementedError() + + @abc.abstractmethod + def destroy_transmitting_links(self, invocation_side_link, service_side_link): + """Destroys the two connected links created for this test. + + + Args: + invocation_side_link: The link used on the invocation side of RPCs in + this test. + service_side_link: The link used on the service side of RPCs in this + test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_invocation_initial_metadata(self): + """Creates a value for use as invocation-side initial metadata. + + Returns: + A metadata value appropriate for use as invocation-side initial metadata + or None if invocation-side initial metadata transmission is not + supported by the links under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_invocation_terminal_metadata(self): + """Creates a value for use as invocation-side terminal metadata. + + Returns: + A metadata value appropriate for use as invocation-side terminal + metadata or None if invocation-side terminal metadata transmission is + not supported by the links under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_service_initial_metadata(self): + """Creates a value for use as service-side initial metadata. + + Returns: + A metadata value appropriate for use as service-side initial metadata or + None if service-side initial metadata transmission is not supported by + the links under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_service_terminal_metadata(self): + """Creates a value for use as service-side terminal metadata. + + Returns: + A metadata value appropriate for use as service-side terminal metadata or + None if service-side terminal metadata transmission is not supported by + the links under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_invocation_completion(self): + """Creates values for use as invocation-side code and message. + + Returns: + An invocation-side code value and an invocation-side message value. + Either or both may be None if invocation-side code and/or + invocation-side message transmission is not supported by the links + under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def create_service_completion(self): + """Creates values for use as service-side code and message. + + Returns: + A service-side code value and a service-side message value. Either or + both may be None if service-side code and/or service-side message + transmission is not supported by the links under test. + """ + raise NotImplementedError() + + @abc.abstractmethod + def assertMetadataTransmitted(self, original_metadata, transmitted_metadata): + """Asserts that transmitted_metadata contains original_metadata. + + Args: + original_metadata: A metadata object used in this test. + transmitted_metadata: A metadata object obtained after transmission + through the system under test. + + Raises: + AssertionError: if the transmitted_metadata object does not contain + original_metadata. + """ + raise NotImplementedError() + + def group_and_method(self): + """Returns the group and method used in this test case. + + Returns: + A pair of the group and method used in this test case. + """ + return _TRANSMISSION_GROUP, _TRANSMISSION_METHOD + + def serialize_request(self, request): + """Serializes a request value used in this test case. + + Args: + request: A request value created by this test case. + + Returns: + A bytestring that is the serialization of the given request. + """ + return request + + def deserialize_request(self, serialized_request): + """Deserializes a request value used in this test case. + + Args: + serialized_request: A bytestring that is the serialization of some request + used in this test case. + + Returns: + The request value encoded by the given bytestring. + """ + return serialized_request + + def serialize_response(self, response): + """Serializes a response value used in this test case. + + Args: + response: A response value created by this test case. + + Returns: + A bytestring that is the serialization of the given response. + """ + return response + + def deserialize_response(self, serialized_response): + """Deserializes a response value used in this test case. + + Args: + serialized_response: A bytestring that is the serialization of some + response used in this test case. + + Returns: + The response value encoded by the given bytestring. + """ + return serialized_response + + def _assert_is_valid_metadata_payload_sequence( + self, ticket_sequence, payloads, initial_metadata, terminal_metadata): + initial_metadata_seen = False + seen_payloads = [] + terminal_metadata_seen = False + + for ticket in ticket_sequence: + if ticket.initial_metadata is not None: + self.assertFalse(initial_metadata_seen) + self.assertFalse(seen_payloads) + self.assertFalse(terminal_metadata_seen) + self.assertMetadataTransmitted(initial_metadata, ticket.initial_metadata) + initial_metadata_seen = True + + if ticket.payload is not None: + self.assertFalse(terminal_metadata_seen) + seen_payloads.append(ticket.payload) + + if ticket.terminal_metadata is not None: + self.assertFalse(terminal_metadata_seen) + self.assertMetadataTransmitted(terminal_metadata, ticket.terminal_metadata) + terminal_metadata_seen = True + self.assertSequenceEqual(payloads, seen_payloads) + + def _assert_is_valid_invocation_sequence( + self, ticket_sequence, group, method, payloads, initial_metadata, + terminal_metadata, termination): + self.assertLess(0, len(ticket_sequence)) + self.assertEqual(group, ticket_sequence[0].group) + self.assertEqual(method, ticket_sequence[0].method) + self._assert_is_valid_metadata_payload_sequence( + ticket_sequence, payloads, initial_metadata, terminal_metadata) + self.assertIs(termination, ticket_sequence[-1].termination) + + def _assert_is_valid_service_sequence( + self, ticket_sequence, payloads, initial_metadata, terminal_metadata, + code, message, termination): + self.assertLess(0, len(ticket_sequence)) + self._assert_is_valid_metadata_payload_sequence( + ticket_sequence, payloads, initial_metadata, terminal_metadata) + self.assertEqual(code, ticket_sequence[-1].code) + self.assertEqual(message, ticket_sequence[-1].message) + self.assertIs(termination, ticket_sequence[-1].termination) + + def setUp(self): + self._invocation_link, self._service_link = self.create_transmitting_links() + self._invocation_mate = test_utilities.RecordingLink() + self._service_mate = test_utilities.RecordingLink() + self._invocation_link.join_link(self._invocation_mate) + self._service_link.join_link(self._service_mate) + + def tearDown(self): + self.destroy_transmitting_links(self._invocation_link, self._service_link) + + def testSimplestRoundTrip(self): + """Tests transmission of one ticket in each direction.""" + invocation_operation_id = object() + invocation_payload = b'\x07' * 1023 + timeout = test_constants.LONG_TIMEOUT + invocation_initial_metadata = self.create_invocation_initial_metadata() + invocation_terminal_metadata = self.create_invocation_terminal_metadata() + invocation_code, invocation_message = self.create_invocation_completion() + service_payload = b'\x08' * 1025 + service_initial_metadata = self.create_service_initial_metadata() + service_terminal_metadata = self.create_service_terminal_metadata() + service_code, service_message = self.create_service_completion() + + original_invocation_ticket = links.Ticket( + invocation_operation_id, 0, _TRANSMISSION_GROUP, _TRANSMISSION_METHOD, + links.Ticket.Subscription.FULL, timeout, 0, invocation_initial_metadata, + invocation_payload, invocation_terminal_metadata, invocation_code, + invocation_message, links.Ticket.Termination.COMPLETION, None) + self._invocation_link.accept_ticket(original_invocation_ticket) + + self._service_mate.block_until_tickets_satisfy( + at_least_n_payloads_received_predicate(1)) + service_operation_id = self._service_mate.tickets()[0].operation_id + + self._service_mate.block_until_tickets_satisfy(terminated) + self._assert_is_valid_invocation_sequence( + self._service_mate.tickets(), _TRANSMISSION_GROUP, _TRANSMISSION_METHOD, + (invocation_payload,), invocation_initial_metadata, + invocation_terminal_metadata, links.Ticket.Termination.COMPLETION) + + original_service_ticket = links.Ticket( + service_operation_id, 0, None, None, links.Ticket.Subscription.FULL, + timeout, 0, service_initial_metadata, service_payload, + service_terminal_metadata, service_code, service_message, + links.Ticket.Termination.COMPLETION, None) + self._service_link.accept_ticket(original_service_ticket) + self._invocation_mate.block_until_tickets_satisfy(terminated) + self._assert_is_valid_service_sequence( + self._invocation_mate.tickets(), (service_payload,), + service_initial_metadata, service_terminal_metadata, service_code, + service_message, links.Ticket.Termination.COMPLETION) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/links/test_utilities.py b/src/python/grpcio/tests/unit/framework/interfaces/links/test_utilities.py new file mode 100644 index 0000000000..39c7f2fc63 --- /dev/null +++ b/src/python/grpcio/tests/unit/framework/interfaces/links/test_utilities.py @@ -0,0 +1,167 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior appropriate for use in tests.""" + +import logging +import threading +import time + +from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +# A more-or-less arbitrary limit on the length of raw data values to be logged. +_UNCOMFORTABLY_LONG = 48 + + +def _safe_for_log_ticket(ticket): + """Creates a safe-for-printing-to-the-log ticket for a given ticket. + + Args: + ticket: Any links.Ticket. + + Returns: + A links.Ticket that is as much as can be equal to the given ticket but + possibly features values like the string "<payload of length 972321>" in + place of the actual values of the given ticket. + """ + if isinstance(ticket.payload, (basestring,)): + payload_length = len(ticket.payload) + else: + payload_length = -1 + if payload_length < _UNCOMFORTABLY_LONG: + return ticket + else: + return links.Ticket( + ticket.operation_id, ticket.sequence_number, + ticket.group, ticket.method, ticket.subscription, ticket.timeout, + ticket.allowance, ticket.initial_metadata, + '<payload of length {}>'.format(payload_length), + ticket.terminal_metadata, ticket.code, ticket.message, + ticket.termination, None) + + +class RecordingLink(links.Link): + """A Link that records every ticket passed to it.""" + + def __init__(self): + self._condition = threading.Condition() + self._tickets = [] + + def accept_ticket(self, ticket): + with self._condition: + self._tickets.append(ticket) + self._condition.notify_all() + + def join_link(self, link): + pass + + def block_until_tickets_satisfy(self, predicate): + """Blocks until the received tickets satisfy the given predicate. + + Args: + predicate: A callable that takes a sequence of tickets and returns a + boolean value. + """ + with self._condition: + while not predicate(self._tickets): + self._condition.wait() + + def tickets(self): + """Returns a copy of the list of all tickets received by this Link.""" + with self._condition: + return tuple(self._tickets) + + +class _Pipe(object): + """A conduit that logs all tickets passed through it.""" + + def __init__(self, name): + self._lock = threading.Lock() + self._name = name + self._left_mate = utilities.NULL_LINK + self._right_mate = utilities.NULL_LINK + + def accept_left_to_right_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving left to right through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._right_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def accept_right_to_left_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving right to left through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._left_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def join_left_mate(self, left_mate): + with self._lock: + self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate + + def join_right_mate(self, right_mate): + with self._lock: + self._right_mate = ( + utilities.NULL_LINK if right_mate is None else right_mate) + + +class _Facade(links.Link): + + def __init__(self, accept, join): + self._accept = accept + self._join = join + + def accept_ticket(self, ticket): + self._accept(ticket) + + def join_link(self, link): + self._join(link) + + +def logging_links(name): + """Creates a conduit that logs all tickets passed through it. + + Args: + name: A name to use for the conduit to identify itself in logging output. + + Returns: + Two links.Links, the first of which is the "left" side of the conduit + and the second of which is the "right" side of the conduit. + """ + pipe = _Pipe(name) + left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate) + right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate) + return left_facade, right_facade |